X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flov%2Flov_object.c;h=07ed284e66576a61108cf18ebcbfcd1ce80911c3;hb=df01bb37a5e8992ade0ab74386bc76a6d42e0235;hp=99d543afa8648f028aa472414b8bad393840a8da;hpb=ecaba99677b28536f9c376b2b835b554a7792668;p=fs%2Flustre-release.git diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index 99d543a..07ed284 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -27,7 +27,7 @@ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, Whamcloud, Inc. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -67,10 +67,8 @@ struct lov_layout_operations { union lov_layout_state *state); int (*llo_print)(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o); - struct cl_page *(*llo_page_init)(const struct lu_env *env, - struct cl_object *obj, - struct cl_page *page, - cfs_page_t *vmpage); + int (*llo_page_init)(const struct lu_env *env, struct cl_object *obj, + struct cl_page *page, cfs_page_t *vmpage); int (*llo_lock_init)(const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, const struct cl_io *io); @@ -80,6 +78,8 @@ struct lov_layout_operations { struct cl_attr *attr); }; +static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov); + /***************************************************************************** * * Lov object layout operations. @@ -133,6 +133,12 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, int result; if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) { + /* For sanity:test_206. + * Do not leave the object in cache to avoid accessing + * freed memory. This is because osc_object is referring to + * lov_oinfo of lsm_stripe_data which will be freed due to + * this failure. */ + cl_object_kill(env, stripe); cl_object_put(env, stripe); return -EIO; } @@ -142,12 +148,11 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, parent = subhdr->coh_parent; oinfo = lov->lo_lsm->lsm_oinfo[idx]; - CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" seq: "LPU64 - " idx: %d gen: %d\n", - PFID(&subhdr->coh_lu.loh_fid), subhdr, idx, - PFID(&hdr->coh_lu.loh_fid), hdr, - oinfo->loi_id, oinfo->loi_seq, - oinfo->loi_ost_idx, oinfo->loi_ost_gen); + CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID + " idx: %d gen: %d\n", + PFID(&subhdr->coh_lu.loh_fid), subhdr, idx, + PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi), + oinfo->loi_ost_idx, oinfo->loi_ost_gen); if (parent == NULL) { subhdr->coh_parent = hdr; @@ -158,13 +163,28 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, r0->lo_sub[idx]->lso_index = idx; result = 0; } else { - CERROR("Stripe is already owned by other file (%d).\n", idx); - LU_OBJECT_DEBUG(D_ERROR, env, &stripe->co_lu, "\n"); - LU_OBJECT_DEBUG(D_ERROR, env, lu_object_top(&parent->coh_lu), - "old\n"); - LU_OBJECT_HEADER(D_ERROR, env, lov2lu(lov), "new\n"); - cl_object_put(env, stripe); - result = -EIO; + struct lu_object *old_obj; + struct lov_object *old_lov; + unsigned int mask = D_INODE; + + old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type); + LASSERT(old_obj != NULL); + old_lov = cl2lov(lu2cl(old_obj)); + if (old_lov->lo_layout_invalid) { + /* the object's layout has already changed but isn't + * refreshed */ + lu_object_unhash(env, &stripe->co_lu); + result = -EAGAIN; + } else { + mask = D_ERROR; + result = -EIO; + } + + LU_OBJECT_DEBUG(mask, env, &stripe->co_lu, + "stripe %d is already owned.\n", idx); + LU_OBJECT_DEBUG(mask, env, old_obj, "owned.\n"); + LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n"); + cl_object_put(env, stripe); } return result; } @@ -201,7 +221,7 @@ static int lov_init_raid0(const struct lu_env *env, if (r0->lo_sub != NULL) { result = 0; subconf->coc_inode = conf->coc_inode; - cfs_spin_lock_init(&r0->lo_sub_lock); + spin_lock_init(&r0->lo_sub_lock); /* * Create stripe cl_objects. */ @@ -210,8 +230,11 @@ static int lov_init_raid0(const struct lu_env *env, struct lov_oinfo *oinfo = lsm->lsm_oinfo[i]; int ost_idx = oinfo->loi_ost_idx; - fid_ostid_unpack(ofid, &oinfo->loi_oi, - oinfo->loi_ost_idx); + result = ostid_to_fid(ofid, &oinfo->loi_oi, + oinfo->loi_ost_idx); + if (result != 0) + GOTO(out, result); + subdev = lovsub2cl_dev(dev->ld_target[ost_idx]); subconf->u.coc_oinfo = oinfo; LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx); @@ -219,13 +242,19 @@ static int lov_init_raid0(const struct lu_env *env, * lu_obj_hop_keycmp() */ /* coverity[overrun-buffer-val] */ stripe = lov_sub_find(env, subdev, ofid, subconf); - if (!IS_ERR(stripe)) + if (!IS_ERR(stripe)) { result = lov_init_sub(env, lov, stripe, r0, i); - else + if (result == -EAGAIN) { /* try again */ + --i; + result = 0; + } + } else { result = PTR_ERR(stripe); + } } } else result = -ENOMEM; +out: RETURN(result); } @@ -233,6 +262,9 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state) { LASSERT(lov->lo_type == LLT_EMPTY); + + lov_layout_wait(env, lov); + cl_object_prune(env, &lov->lo_cl); return 0; } @@ -269,12 +301,12 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, /* this wait-queue is signaled at the end of * lu_object_free(). */ cfs_set_current_state(CFS_TASK_UNINT); - cfs_spin_lock(&r0->lo_sub_lock); - if (r0->lo_sub[idx] == los) { - cfs_spin_unlock(&r0->lo_sub_lock); - cfs_waitq_wait(waiter, CFS_TASK_UNINT); - } else { - cfs_spin_unlock(&r0->lo_sub_lock); + spin_lock(&r0->lo_sub_lock); + if (r0->lo_sub[idx] == los) { + spin_unlock(&r0->lo_sub_lock); + cfs_waitq_wait(waiter, CFS_TASK_UNINT); + } else { + spin_unlock(&r0->lo_sub_lock); cfs_set_current_state(CFS_TASK_RUNNING); break; } @@ -294,9 +326,8 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, ENTRY; dump_lsm(D_INODE, lsm); - if (lov->lo_lsm_invalid && cfs_atomic_read(&lsm->lsm_refc) > 1) - RETURN(-EBUSY); + lov_layout_wait(env, lov); if (r0->lo_sub != NULL) { for (i = 0; i < r0->lo_nr; ++i) { struct lovsub_object *los = r0->lo_sub[i]; @@ -311,6 +342,7 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, } } } + cl_object_prune(env, &lov->lo_cl); RETURN(0); } @@ -340,7 +372,7 @@ static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov, static int lov_print_empty(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o) { - (*p)(env, cookie, "empty\n"); + (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid); return 0; } @@ -349,9 +381,13 @@ static int lov_print_raid0(const struct lu_env *env, void *cookie, { struct lov_object *lov = lu2lov(o); struct lov_layout_raid0 *r0 = lov_r0(lov); + struct lov_stripe_md *lsm = lov->lo_lsm; int i; - (*p)(env, cookie, "stripes: %d:\n", r0->lo_nr); + (*p)(env, cookie, "stripes: %d, %svalid, lsm{%p 0x%08X %d %u %u}: \n", + r0->lo_nr, lov->lo_layout_invalid ? "in" : "", lsm, + lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc), + lsm->lsm_stripe_count, lsm->lsm_layout_gen); for (i = 0; i < r0->lo_nr; ++i) { struct lu_object *sub; @@ -381,12 +417,10 @@ static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj, static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj, struct cl_attr *attr) { - struct lov_object *lov = cl2lov(obj); + struct lov_object *lov = cl2lov(obj); struct lov_layout_raid0 *r0 = lov_r0(lov); - struct lov_stripe_md *lsm = lov->lo_lsm; - struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb; - __u64 kms; - int result = 0; + struct cl_attr *lov_attr = &r0->lo_attr; + int result = 0; ENTRY; @@ -400,38 +434,51 @@ static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj, * can't go if locks exist. */ /* LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1); */ - if (!r0->lo_attr_valid) { - /* - * Fill LVB with attributes already initialized by the upper - * layer. - */ - cl_attr2lvb(lvb, attr); - kms = attr->cat_kms; - - /* - * XXX that should be replaced with a loop over sub-objects, - * doing cl_object_attr_get() on them. But for now, let's - * reuse old lov code. - */ - - /* - * XXX take lsm spin-lock to keep lov_merge_lvb_kms() - * happy. It's not needed, because new code uses - * ->coh_attr_guard spin-lock to protect consistency of - * sub-object attributes. - */ - lov_stripe_lock(lsm); - result = lov_merge_lvb_kms(lsm, lvb, &kms); - lov_stripe_unlock(lsm); - if (result == 0) { - cl_lvb2attr(attr, lvb); - attr->cat_kms = kms; - r0->lo_attr_valid = 1; - r0->lo_attr = *attr; - } - } else - *attr = r0->lo_attr; - RETURN(result); + if (!r0->lo_attr_valid) { + struct lov_stripe_md *lsm = lov->lo_lsm; + struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb; + __u64 kms = 0; + + memset(lvb, 0, sizeof(*lvb)); + /* XXX: timestamps can be negative by sanity:test_39m, + * how can it be? */ + lvb->lvb_atime = LLONG_MIN; + lvb->lvb_ctime = LLONG_MIN; + lvb->lvb_mtime = LLONG_MIN; + + /* + * XXX that should be replaced with a loop over sub-objects, + * doing cl_object_attr_get() on them. But for now, let's + * reuse old lov code. + */ + + /* + * XXX take lsm spin-lock to keep lov_merge_lvb_kms() + * happy. It's not needed, because new code uses + * ->coh_attr_guard spin-lock to protect consistency of + * sub-object attributes. + */ + lov_stripe_lock(lsm); + result = lov_merge_lvb_kms(lsm, lvb, &kms); + lov_stripe_unlock(lsm); + if (result == 0) { + cl_lvb2attr(lov_attr, lvb); + lov_attr->cat_kms = kms; + r0->lo_attr_valid = 1; + } + } + if (result == 0) { /* merge results */ + attr->cat_blocks = lov_attr->cat_blocks; + attr->cat_size = lov_attr->cat_size; + attr->cat_kms = lov_attr->cat_kms; + if (attr->cat_atime < lov_attr->cat_atime) + attr->cat_atime = lov_attr->cat_atime; + if (attr->cat_ctime < lov_attr->cat_ctime) + attr->cat_ctime = lov_attr->cat_ctime; + if (attr->cat_mtime < lov_attr->cat_mtime) + attr->cat_mtime = lov_attr->cat_mtime; + } + RETURN(result); } const static struct lov_layout_operations lov_dispatch[] = { @@ -476,13 +523,13 @@ const static struct lov_layout_operations lov_dispatch[] = { static inline void lov_conf_freeze(struct lov_object *lov) { if (lov->lo_owner != cfs_current()) - cfs_down_read(&lov->lo_type_guard); + down_read(&lov->lo_type_guard); } static inline void lov_conf_thaw(struct lov_object *lov) { if (lov->lo_owner != cfs_current()) - cfs_up_read(&lov->lo_type_guard); + up_read(&lov->lo_type_guard); } #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...) \ @@ -520,7 +567,7 @@ do { \ static void lov_conf_lock(struct lov_object *lov) { LASSERT(lov->lo_owner != cfs_current()); - cfs_down_write(&lov->lo_type_guard); + down_write(&lov->lo_type_guard); LASSERT(lov->lo_owner == NULL); lov->lo_owner = cfs_current(); } @@ -528,33 +575,31 @@ static void lov_conf_lock(struct lov_object *lov) static void lov_conf_unlock(struct lov_object *lov) { lov->lo_owner = NULL; - cfs_up_write(&lov->lo_type_guard); + up_write(&lov->lo_type_guard); } static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov) { struct l_wait_info lwi = { 0 }; - struct lov_stripe_md *lsm = lov->lo_lsm; ENTRY; - if (!lov->lo_lsm_invalid || lsm == NULL) - RETURN(0); + while (cfs_atomic_read(&lov->lo_active_ios) > 0) { + CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n", + PFID(lu_object_fid(lov2lu(lov))), + cfs_atomic_read(&lov->lo_active_ios)); - LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0); - while (cfs_atomic_read(&lsm->lsm_refc) > 1) { - lov_conf_unlock(lov); l_wait_event(lov->lo_waitq, - cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi); - lov_conf_lock(lov); + cfs_atomic_read(&lov->lo_active_ios) == 0, &lwi); } RETURN(0); } static int lov_layout_change(const struct lu_env *unused, - struct lov_object *lov, enum lov_layout_type llt, + struct lov_object *lov, const struct cl_object_conf *conf) { int result; + enum lov_layout_type llt = LLT_EMPTY; union lov_layout_state *state = &lov->u; const struct lov_layout_operations *old_ops; const struct lov_layout_operations *new_ops; @@ -563,10 +608,13 @@ static int lov_layout_change(const struct lu_env *unused, void *cookie; struct lu_env *env; int refcheck; + ENTRY; LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch)); + + if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL) + llt = LLT_RAID0; /* only raid0 is supported. */ LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch)); - ENTRY; cookie = cl_env_reenter(); env = cl_env_get(&refcheck); @@ -581,7 +629,8 @@ static int lov_layout_change(const struct lu_env *unused, result = old_ops->llo_delete(env, lov, &lov->u); if (result == 0) { old_ops->llo_fini(env, lov, &lov->u); - LASSERT(cfs_list_empty(&hdr->coh_locks)); + + LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0); LASSERT(hdr->coh_tree.rnode == NULL); LASSERT(hdr->coh_pages == 0); @@ -621,9 +670,12 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj, int result; ENTRY; - cfs_init_rwsem(&lov->lo_type_guard); + init_rwsem(&lov->lo_type_guard); + cfs_atomic_set(&lov->lo_active_ios, 0); cfs_waitq_init(&lov->lo_waitq); + cl_object_page_init(lu2cl(obj), sizeof(struct lov_page)); + /* no locking is necessary, as object is being created */ lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY; ops = &lov_dispatch[lov->lo_type]; @@ -642,42 +694,40 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj, ENTRY; lov_conf_lock(lov); - if (conf->coc_invalidate) { - lov->lo_lsm_invalid = 1; + if (conf->coc_opc == OBJECT_CONF_INVALIDATE) { + lov->lo_layout_invalid = true; GOTO(out, result = 0); } + if (conf->coc_opc == OBJECT_CONF_WAIT) { + if (lov->lo_layout_invalid && + cfs_atomic_read(&lov->lo_active_ios) > 0) { + lov_conf_unlock(lov); + result = lov_layout_wait(env, lov); + lov_conf_lock(lov); + } + GOTO(out, result); + } + + LASSERT(conf->coc_opc == OBJECT_CONF_SET); + if (conf->u.coc_md != NULL) lsm = conf->u.coc_md->lsm; - if ((lsm == NULL && lov->lo_lsm == NULL) || (lsm != NULL && lov->lo_lsm != NULL && lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) { - lov->lo_lsm_invalid = 0; + /* same version of layout */ + lov->lo_layout_invalid = false; GOTO(out, result = 0); } - /* will change layout */ - lov_layout_wait(env, lov); - - /* - * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported. - */ - switch (lov->lo_type) { - case LLT_EMPTY: - if (lsm != NULL) - result = lov_layout_change(env, lov, LLT_RAID0, conf); - break; - case LLT_RAID0: - if (lsm == NULL) - result = lov_layout_change(env, lov, LLT_EMPTY, conf); - else if (lov_stripe_md_cmp(lov->lo_lsm, lsm)) - result = -EOPNOTSUPP; - break; - default: - LBUG(); + /* will change layout - check if there still exists active IO. */ + if (cfs_atomic_read(&lov->lo_active_ios) > 0) { + lov->lo_layout_invalid = true; + GOTO(out, result = -EBUSY); } - lov->lo_lsm_invalid = result != 0; + + lov->lo_layout_invalid = lov_layout_change(env, lov, conf); EXIT; out: @@ -708,11 +758,11 @@ static void lov_object_free(const struct lu_env *env, struct lu_object *obj) static int lov_object_print(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o) { - return LOV_2DISPATCH(lu2lov(o), llo_print, env, cookie, p, o); + return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o); } -struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj, - struct cl_page *page, cfs_page_t *vmpage) +int lov_page_init(const struct lu_env *env, struct cl_object *obj, + struct cl_page *page, cfs_page_t *vmpage) { return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page, vmpage); @@ -812,7 +862,7 @@ struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov) lsm = lsm_addref(lov->lo_lsm); CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n", lsm, cfs_atomic_read(&lsm->lsm_refc), - lov->lo_lsm_invalid, cfs_current()); + lov->lo_layout_invalid, cfs_current()); } lov_conf_thaw(lov); return lsm; @@ -826,8 +876,7 @@ void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm) CDEBUG(D_INODE, "lsm %p decref %d by %p.\n", lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current()); - if (lov_free_memmd(&lsm) <= 1 && lov->lo_lsm_invalid) - cfs_waitq_signal(&lov->lo_waitq); + lov_free_memmd(&lsm); } struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj) @@ -846,18 +895,10 @@ struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj) } EXPORT_SYMBOL(lov_lsm_get); -void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm) +void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm) { - struct lu_object *luobj; - - if (clobj == NULL || lsm == NULL) - return; - - luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu, - &lov_device_type); - LASSERT(luobj != NULL); - - lov_lsm_decref(lu2lov(luobj), lsm); + if (lsm != NULL) + lov_free_memmd(&lsm); } EXPORT_SYMBOL(lov_lsm_put);