X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flov%2Flov_object.c;h=fc7e874f8e7b3af8c69d5e8ed6ced8b326abb950;hb=d6c13b87b922e7402f1cb3c94a7132688cddb717;hp=07ed284e66576a61108cf18ebcbfcd1ce80911c3;hpb=98060d83459ba10409f295898f0ec917f938b4d3;p=fs%2Flustre-release.git diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index 07ed284..fc7e874 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -68,7 +68,7 @@ struct lov_layout_operations { int (*llo_print)(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o); int (*llo_page_init)(const struct lu_env *env, struct cl_object *obj, - struct cl_page *page, cfs_page_t *vmpage); + struct cl_page *page, pgoff_t index); int (*llo_lock_init)(const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, const struct cl_io *io); @@ -123,14 +123,14 @@ static struct cl_object *lov_sub_find(const struct lu_env *env, } static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, - struct cl_object *stripe, - struct lov_layout_raid0 *r0, int idx) + struct cl_object *stripe, struct lov_layout_raid0 *r0, + int idx) { - struct cl_object_header *hdr; - struct cl_object_header *subhdr; - struct cl_object_header *parent; - struct lov_oinfo *oinfo; - int result; + struct cl_object_header *hdr; + struct cl_object_header *subhdr; + struct cl_object_header *parent; + struct lov_oinfo *oinfo; + int result; if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) { /* For sanity:test_206. @@ -143,9 +143,8 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, return -EIO; } - hdr = cl_object_header(lov2cl(lov)); - subhdr = cl_object_header(stripe); - parent = subhdr->coh_parent; + hdr = cl_object_header(lov2cl(lov)); + subhdr = cl_object_header(stripe); oinfo = lov->lo_lsm->lsm_oinfo[idx]; CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID @@ -154,19 +153,24 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi), oinfo->loi_ost_idx, oinfo->loi_ost_gen); - if (parent == NULL) { - subhdr->coh_parent = hdr; - subhdr->coh_nesting = hdr->coh_nesting + 1; - lu_object_ref_add(&stripe->co_lu, "lov-parent", lov); - r0->lo_sub[idx] = cl2lovsub(stripe); - r0->lo_sub[idx]->lso_super = lov; - r0->lo_sub[idx]->lso_index = idx; - result = 0; - } else { + /* reuse ->coh_attr_guard to protect coh_parent change */ + spin_lock(&subhdr->coh_attr_guard); + parent = subhdr->coh_parent; + if (parent == NULL) { + subhdr->coh_parent = hdr; + spin_unlock(&subhdr->coh_attr_guard); + subhdr->coh_nesting = hdr->coh_nesting + 1; + lu_object_ref_add(&stripe->co_lu, "lov-parent", lov); + r0->lo_sub[idx] = cl2lovsub(stripe); + r0->lo_sub[idx]->lso_super = lov; + r0->lo_sub[idx]->lso_index = idx; + result = 0; + } else { struct lu_object *old_obj; struct lov_object *old_lov; unsigned int mask = D_INODE; + spin_unlock(&subhdr->coh_attr_guard); old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type); LASSERT(old_obj != NULL); old_lov = cl2lov(lu2cl(old_obj)); @@ -185,8 +189,20 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, LU_OBJECT_DEBUG(mask, env, old_obj, "owned.\n"); LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n"); cl_object_put(env, stripe); - } - return result; + } + return result; +} + +static int lov_page_slice_fixup(struct lov_object *lov, + struct cl_object *stripe) +{ + struct cl_object_header *hdr = cl_object_header(&lov->lo_cl); + struct cl_object *o; + + cl_object_for_each(o, stripe) + o->co_slice_off += hdr->coh_page_bufsize; + + return cl_object_header(stripe)->coh_page_bufsize; } static int lov_init_raid0(const struct lu_env *env, @@ -215,12 +231,14 @@ static int lov_init_raid0(const struct lu_env *env, LASSERT(lov->lo_lsm == NULL); lov->lo_lsm = lsm_addref(lsm); r0->lo_nr = lsm->lsm_stripe_count; - LASSERT(r0->lo_nr <= lov_targets_nr(dev)); + LASSERT(r0->lo_nr <= lov_targets_nr(dev)); - OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]); - if (r0->lo_sub != NULL) { - result = 0; - subconf->coc_inode = conf->coc_inode; + OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]); + if (r0->lo_sub != NULL) { + int psz = 0; + + result = 0; + subconf->coc_inode = conf->coc_inode; spin_lock_init(&r0->lo_sub_lock); /* * Create stripe cl_objects. @@ -230,53 +248,77 @@ static int lov_init_raid0(const struct lu_env *env, struct lov_oinfo *oinfo = lsm->lsm_oinfo[i]; int ost_idx = oinfo->loi_ost_idx; - result = ostid_to_fid(ofid, &oinfo->loi_oi, + result = ostid_to_fid(ofid, &oinfo->loi_oi, oinfo->loi_ost_idx); if (result != 0) GOTO(out, result); - subdev = lovsub2cl_dev(dev->ld_target[ost_idx]); - subconf->u.coc_oinfo = oinfo; - LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx); + subdev = lovsub2cl_dev(dev->ld_target[ost_idx]); + subconf->u.coc_oinfo = oinfo; + LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx); /* In the function below, .hs_keycmp resolves to * lu_obj_hop_keycmp() */ /* coverity[overrun-buffer-val] */ - stripe = lov_sub_find(env, subdev, ofid, subconf); - if (!IS_ERR(stripe)) { - result = lov_init_sub(env, lov, stripe, r0, i); + stripe = lov_sub_find(env, subdev, ofid, subconf); + if (!IS_ERR(stripe)) { + result = lov_init_sub(env, lov, stripe, r0, i); if (result == -EAGAIN) { /* try again */ --i; result = 0; + continue; } - } else { - result = PTR_ERR(stripe); + } else { + result = PTR_ERR(stripe); + } + + if (result == 0) { + int sz = lov_page_slice_fixup(lov, stripe); + LASSERT(ergo(psz > 0, psz == sz)); + psz = sz; } } - } else - result = -ENOMEM; + if (result == 0) + cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz; + } else + result = -ENOMEM; out: - RETURN(result); + RETURN(result); +} + +static int lov_init_released(const struct lu_env *env, + struct lov_device *dev, struct lov_object *lov, + const struct cl_object_conf *conf, + union lov_layout_state *state) +{ + struct lov_stripe_md *lsm = conf->u.coc_md->lsm; + + LASSERT(lsm != NULL); + LASSERT(lsm_is_released(lsm)); + LASSERT(lov->lo_lsm == NULL); + + lov->lo_lsm = lsm_addref(lsm); + return 0; } static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state) { - LASSERT(lov->lo_type == LLT_EMPTY); + LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED); lov_layout_wait(env, lov); - cl_object_prune(env, &lov->lo_cl); + cl_locks_prune(env, &lov->lo_cl, 0); return 0; } static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, - struct lovsub_object *los, int idx) + struct lovsub_object *los, int idx) { - struct cl_object *sub; - struct lov_layout_raid0 *r0; - struct lu_site *site; - struct lu_site_bkt_data *bkt; - cfs_waitlink_t *waiter; + struct cl_object *sub; + struct lov_layout_raid0 *r0; + struct lu_site *site; + struct lu_site_bkt_data *bkt; + wait_queue_t *waiter; r0 = &lov->u.raid0; LASSERT(r0->lo_sub[idx] == los); @@ -292,28 +334,28 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, /* ... wait until it is actually destroyed---sub-object clears its * ->lo_sub[] slot in lovsub_object_fini() */ - if (r0->lo_sub[idx] == los) { - waiter = &lov_env_info(env)->lti_waiter; - cfs_waitlink_init(waiter); - cfs_waitq_add(&bkt->lsb_marche_funebre, waiter); - cfs_set_current_state(CFS_TASK_UNINT); - while (1) { - /* this wait-queue is signaled at the end of - * lu_object_free(). */ - cfs_set_current_state(CFS_TASK_UNINT); + if (r0->lo_sub[idx] == los) { + waiter = &lov_env_info(env)->lti_waiter; + init_waitqueue_entry_current(waiter); + add_wait_queue(&bkt->lsb_marche_funebre, waiter); + set_current_state(TASK_UNINTERRUPTIBLE); + while (1) { + /* this wait-queue is signaled at the end of + * lu_object_free(). */ + set_current_state(TASK_UNINTERRUPTIBLE); spin_lock(&r0->lo_sub_lock); if (r0->lo_sub[idx] == los) { spin_unlock(&r0->lo_sub_lock); - cfs_waitq_wait(waiter, CFS_TASK_UNINT); + waitq_wait(waiter, TASK_UNINTERRUPTIBLE); } else { spin_unlock(&r0->lo_sub_lock); - cfs_set_current_state(CFS_TASK_RUNNING); - break; - } - } - cfs_waitq_del(&bkt->lsb_marche_funebre, waiter); - } - LASSERT(r0->lo_sub[idx] == NULL); + set_current_state(TASK_RUNNING); + break; + } + } + remove_wait_queue(&bkt->lsb_marche_funebre, waiter); + } + LASSERT(r0->lo_sub[idx] == NULL); } static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, @@ -340,16 +382,16 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, */ lov_subobject_kill(env, lov, los, i); } - } - } - cl_object_prune(env, &lov->lo_cl); + } + } + cl_locks_prune(env, &lov->lo_cl, 0); RETURN(0); } static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state) { - LASSERT(lov->lo_type == LLT_EMPTY); + LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED); } static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov, @@ -369,6 +411,15 @@ static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov, EXIT; } +static void lov_fini_released(const struct lu_env *env, struct lov_object *lov, + union lov_layout_state *state) +{ + ENTRY; + dump_lsm(D_INODE, lov->lo_lsm); + lov_free_memmd(&lov->lo_lsm); + EXIT; +} + static int lov_print_empty(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o) { @@ -377,27 +428,42 @@ static int lov_print_empty(const struct lu_env *env, void *cookie, } static int lov_print_raid0(const struct lu_env *env, void *cookie, - lu_printer_t p, const struct lu_object *o) + lu_printer_t p, const struct lu_object *o) { - struct lov_object *lov = lu2lov(o); - struct lov_layout_raid0 *r0 = lov_r0(lov); - struct lov_stripe_md *lsm = lov->lo_lsm; - int i; + struct lov_object *lov = lu2lov(o); + struct lov_layout_raid0 *r0 = lov_r0(lov); + struct lov_stripe_md *lsm = lov->lo_lsm; + int i; - (*p)(env, cookie, "stripes: %d, %svalid, lsm{%p 0x%08X %d %u %u}: \n", - r0->lo_nr, lov->lo_layout_invalid ? "in" : "", lsm, + (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n", + r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm, lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc), lsm->lsm_stripe_count, lsm->lsm_layout_gen); - for (i = 0; i < r0->lo_nr; ++i) { - struct lu_object *sub; - - if (r0->lo_sub[i] != NULL) { - sub = lovsub2lu(r0->lo_sub[i]); - lu_object_print(env, cookie, p, sub); - } else - (*p)(env, cookie, "sub %d absent\n", i); - } - return 0; + for (i = 0; i < r0->lo_nr; ++i) { + struct lu_object *sub; + + if (r0->lo_sub[i] != NULL) { + sub = lovsub2lu(r0->lo_sub[i]); + lu_object_print(env, cookie, p, sub); + } else { + (*p)(env, cookie, "sub %d absent\n", i); + } + } + return 0; +} + +static int lov_print_released(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct lu_object *o) +{ + struct lov_object *lov = lu2lov(o); + struct lov_stripe_md *lsm = lov->lo_lsm; + + (*p)(env, cookie, + "released: %s, lsm{%p 0x%08X %d %u %u}:\n", + lov->lo_layout_invalid ? "invalid" : "valid", lsm, + lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc), + lsm->lsm_stripe_count, lsm->lsm_layout_gen); + return 0; } /** @@ -503,10 +569,20 @@ const static struct lov_layout_operations lov_dispatch[] = { .llo_lock_init = lov_lock_init_raid0, .llo_io_init = lov_io_init_raid0, .llo_getattr = lov_attr_get_raid0 + }, + [LLT_RELEASED] = { + .llo_init = lov_init_released, + .llo_delete = lov_delete_empty, + .llo_fini = lov_fini_released, + .llo_install = lov_install_empty, + .llo_print = lov_print_released, + .llo_page_init = lov_page_init_empty, + .llo_lock_init = lov_lock_init_empty, + .llo_io_init = lov_io_init_released, + .llo_getattr = lov_attr_get_empty } }; - /** * Performs a double-dispatch based on the layout type of an object. */ @@ -520,15 +596,27 @@ const static struct lov_layout_operations lov_dispatch[] = { lov_dispatch[__llt].op(__VA_ARGS__); \ }) +/** + * Return lov_layout_type associated with a given lsm + */ +enum lov_layout_type lov_type(struct lov_stripe_md *lsm) +{ + if (lsm == NULL) + return LLT_EMPTY; + if (lsm_is_released(lsm)) + return LLT_RELEASED; + return LLT_RAID0; +} + static inline void lov_conf_freeze(struct lov_object *lov) { - if (lov->lo_owner != cfs_current()) + if (lov->lo_owner != current) down_read(&lov->lo_type_guard); } static inline void lov_conf_thaw(struct lov_object *lov) { - if (lov->lo_owner != cfs_current()) + if (lov->lo_owner != current) up_read(&lov->lo_type_guard); } @@ -566,10 +654,10 @@ do { \ static void lov_conf_lock(struct lov_object *lov) { - LASSERT(lov->lo_owner != cfs_current()); + LASSERT(lov->lo_owner != current); down_write(&lov->lo_type_guard); LASSERT(lov->lo_owner == NULL); - lov->lo_owner = cfs_current(); + lov->lo_owner = current; } static void lov_conf_unlock(struct lov_object *lov) @@ -595,8 +683,8 @@ static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov) } static int lov_layout_change(const struct lu_env *unused, - struct lov_object *lov, - const struct cl_object_conf *conf) + struct lov_object *lov, + const struct cl_object_conf *conf) { int result; enum lov_layout_type llt = LLT_EMPTY; @@ -604,7 +692,6 @@ static int lov_layout_change(const struct lu_env *unused, const struct lov_layout_operations *old_ops; const struct lov_layout_operations *new_ops; - struct cl_object_header *hdr = cl_object_header(&lov->lo_cl); void *cookie; struct lu_env *env; int refcheck; @@ -612,8 +699,8 @@ static int lov_layout_change(const struct lu_env *unused, LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch)); - if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL) - llt = LLT_RAID0; /* only raid0 is supported. */ + if (conf->u.coc_md != NULL) + llt = lov_type(conf->u.coc_md->lsm); LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch)); cookie = cl_env_reenter(); @@ -623,16 +710,20 @@ static int lov_layout_change(const struct lu_env *unused, RETURN(PTR_ERR(env)); } + CDEBUG(D_INODE, DFID" from %s to %s\n", + PFID(lu_object_fid(lov2lu(lov))), + llt2str(lov->lo_type), llt2str(llt)); + old_ops = &lov_dispatch[lov->lo_type]; new_ops = &lov_dispatch[llt]; + cl_object_prune(env, &lov->lo_cl); + result = old_ops->llo_delete(env, lov, &lov->u); if (result == 0) { old_ops->llo_fini(env, lov, &lov->u); LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0); - LASSERT(hdr->coh_tree.rnode == NULL); - LASSERT(hdr->coh_pages == 0); lov->lo_type = LLT_EMPTY; result = new_ops->llo_init(env, @@ -658,7 +749,6 @@ static int lov_layout_change(const struct lu_env *unused, * Lov object operations. * */ - int lov_object_init(const struct lu_env *env, struct lu_object *obj, const struct lu_object_conf *conf) { @@ -672,12 +762,12 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj, ENTRY; init_rwsem(&lov->lo_type_guard); cfs_atomic_set(&lov->lo_active_ios, 0); - cfs_waitq_init(&lov->lo_waitq); + init_waitqueue_head(&lov->lo_waitq); cl_object_page_init(lu2cl(obj), sizeof(struct lov_page)); /* no locking is necessary, as object is being created */ - lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY; + lov->lo_type = lov_type(cconf->u.coc_md->lsm); ops = &lov_dispatch[lov->lo_type]; result = ops->llo_init(env, dev, lov, cconf, set); if (result == 0) @@ -688,9 +778,9 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj, static int lov_conf_set(const struct lu_env *env, struct cl_object *obj, const struct cl_object_conf *conf) { - struct lov_stripe_md *lsm = NULL; - struct lov_object *lov = cl2lov(obj); - int result = 0; + struct lov_stripe_md *lsm = NULL; + struct lov_object *lov = cl2lov(obj); + int result = 0; ENTRY; lov_conf_lock(lov); @@ -714,8 +804,9 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj, if (conf->u.coc_md != NULL) lsm = conf->u.coc_md->lsm; if ((lsm == NULL && lov->lo_lsm == NULL) || - (lsm != NULL && lov->lo_lsm != NULL && - lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) { + ((lsm != NULL && lov->lo_lsm != NULL) && + (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) && + (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) { /* same version of layout */ lov->lo_layout_invalid = false; GOTO(out, result = 0); @@ -732,6 +823,8 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj, out: lov_conf_unlock(lov); + CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n", + PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid); RETURN(result); } @@ -762,10 +855,10 @@ static int lov_object_print(const struct lu_env *env, void *cookie, } int lov_page_init(const struct lu_env *env, struct cl_object *obj, - struct cl_page *page, cfs_page_t *vmpage) + struct cl_page *page, pgoff_t index) { - return LOV_2DISPATCH_NOLOCK(cl2lov(obj), - llo_page_init, env, obj, page, vmpage); + return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page, + index); } /** @@ -836,7 +929,7 @@ struct lu_object *lov_object_alloc(const struct lu_env *env, struct lu_object *obj; ENTRY; - OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, CFS_ALLOC_IO); + OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, __GFP_IO); if (lov != NULL) { obj = lov2lu(lov); lu_object_init(obj, NULL, dev); @@ -862,23 +955,12 @@ struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov) lsm = lsm_addref(lov->lo_lsm); CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n", lsm, cfs_atomic_read(&lsm->lsm_refc), - lov->lo_layout_invalid, cfs_current()); + lov->lo_layout_invalid, current); } lov_conf_thaw(lov); return lsm; } -void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm) -{ - if (lsm == NULL) - return; - - CDEBUG(D_INODE, "lsm %p decref %d by %p.\n", - lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current()); - - lov_free_memmd(&lsm); -} - struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj) { struct lu_object *luobj; @@ -928,6 +1010,7 @@ int lov_read_and_clear_async_rc(struct cl_object *clob) loi->loi_ar.ar_rc = 0; } } + case LLT_RELEASED: case LLT_EMPTY: break; default: