X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_reint.c;h=a252b5f42522bc7e5aef9d641cdd38f6d32f22f0;hp=5663c54e788978cc267a9fb47b413105edaf1d63;hb=cc6ef11d2f972ebc440013bddda87a536a09750c;hpb=0754bc8f2623bea184111af216f7567608db35b6 diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 5663c54..a252b5f 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2015, Intel Corporation. + * Copyright (c) 2011, 2016, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -65,14 +61,15 @@ static inline void mdt_reint_init_ma(struct mdt_thread_info *info, static void mdt_obj_version_get(struct mdt_thread_info *info, struct mdt_object *o, __u64 *version) { - LASSERT(o); + LASSERT(o); + if (mdt_object_exists(o) && !mdt_object_remote(o) && !fid_is_obf(mdt_object_fid(o))) - *version = dt_version_get(info->mti_env, mdt_obj2dt(o)); - else - *version = ENOENT_VERSION; - CDEBUG(D_INODE, "FID "DFID" version is "LPX64"\n", - PFID(mdt_object_fid(o)), *version); + *version = dt_version_get(info->mti_env, mdt_obj2dt(o)); + else + *version = ENOENT_VERSION; + CDEBUG(D_INODE, "FID "DFID" version is %#llx\n", + PFID(mdt_object_fid(o)), *version); } /** @@ -100,7 +97,7 @@ static int mdt_version_check(struct ptlrpc_request *req, spin_unlock(&req->rq_export->exp_lock); RETURN(-EOVERFLOW); } else if (pre_ver[idx] != version) { - CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n", + CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n", pre_ver[idx], version); spin_lock(&req->rq_export->exp_lock); req->rq_export->exp_vbr_failed = 1; @@ -292,131 +289,16 @@ static int mdt_remote_permission(struct mdt_thread_info *info) return 0; } -/* - * VBR: we save three versions in reply: - * 0 - parent. Check that parent version is the same during replay. - * 1 - name. Version of 'name' if file exists with the same name or - * ENOENT_VERSION, it is needed because file may appear due to missed replays. - * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity - * check. - */ -static int mdt_md_create(struct mdt_thread_info *info) -{ - struct mdt_device *mdt = info->mti_mdt; - struct mdt_object *parent; - struct mdt_object *child; - struct mdt_lock_handle *lh; - struct mdt_body *repbody; - struct md_attr *ma = &info->mti_attr; - struct mdt_reint_record *rr = &info->mti_rr; - int rc; - ENTRY; - - DEBUG_REQ(D_INODE, mdt_info_req(info), "Create ("DNAME"->"DFID") " - "in "DFID, - PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1)); - - if (!fid_is_md_operative(rr->rr_fid1)) - RETURN(-EPERM); - - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); - - parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); - if (IS_ERR(parent)) - RETURN(PTR_ERR(parent)); - - if (!mdt_object_exists(parent)) - GOTO(put_parent, rc = -ENOENT); - - lh = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name); - rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE); - if (rc) - GOTO(put_parent, rc); - - if (!mdt_object_remote(parent)) { - rc = mdt_version_get_check_save(info, parent, 0); - if (rc) - GOTO(unlock_parent, rc); - } - - /* - * Check child name version during replay. - * During create replay a file may exist with same name. - */ - rc = mdt_lookup_version_check(info, parent, &rr->rr_name, - &info->mti_tmp_fid1, 1); - if (rc == 0) - GOTO(unlock_parent, rc = -EEXIST); - - /* -ENOENT is expected here */ - if (rc != -ENOENT) - GOTO(unlock_parent, rc); - - /* save version of file name for replay, it must be ENOENT here */ - mdt_enoent_version_save(info, 1); - - child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2); - if (likely(!IS_ERR(child))) { - struct md_object *next = mdt_object_child(parent); - - rc = mdt_remote_permission(info); - if (rc != 0) - GOTO(out_put_child, rc); - - ma->ma_need = MA_INODE; - ma->ma_valid = 0; - - mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, - OBD_FAIL_MDS_REINT_CREATE_WRITE); - - /* Version of child will be updated on disk. */ - tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child)); - rc = mdt_version_get_check_save(info, child, 2); - if (rc) - GOTO(out_put_child, rc); - - /* Let lower layer know current lock mode. */ - info->mti_spec.sp_cr_mode = - mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode); - - /* - * Do not perform lookup sanity check. We know that name does - * not exist. - */ - info->mti_spec.sp_cr_lookup = 0; - info->mti_spec.sp_feat = &dt_directory_features; - - rc = mdo_create(info->mti_env, next, &rr->rr_name, - mdt_object_child(child), &info->mti_spec, ma); - if (rc == 0) - rc = mdt_attr_get_complex(info, child, ma); - - if (rc == 0) { - /* Return fid & attr to client. */ - if (ma->ma_valid & MA_INODE) - mdt_pack_attr2body(info, repbody, &ma->ma_attr, - mdt_object_fid(child)); - } -out_put_child: - mdt_object_put(info->mti_env, child); - } else { - rc = PTR_ERR(child); - } -unlock_parent: - mdt_object_unlock(info, parent, lh, rc); -put_parent: - mdt_object_put(info->mti_env, parent); - RETURN(rc); -} - static int mdt_unlock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, __u64 ibits, struct mdt_lock_handle *s0_lh, struct mdt_object *s0_obj, - struct ldlm_enqueue_info *einfo) + struct ldlm_enqueue_info *einfo, + int decref) { union ldlm_policy_data *policy = &mti->mti_policy; + struct lustre_handle_array *slave_locks = einfo->ei_cbdata; + int i; int rc; ENTRY; @@ -426,31 +308,33 @@ static int mdt_unlock_slaves(struct mdt_thread_info *mti, /* Unlock stripe 0 */ if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) { LASSERT(s0_obj != NULL); - mdt_object_unlock_put(mti, s0_obj, s0_lh, 1); + mdt_object_unlock_put(mti, s0_obj, s0_lh, decref); } memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = ibits; + if (slave_locks != NULL) { + LASSERT(s0_lh != NULL); + for (i = 1; i < slave_locks->count; i++) { + /* borrow s0_lh temporarily to do mdt unlock */ + mdt_lock_reg_init(s0_lh, einfo->ei_mode); + s0_lh->mlh_rreg_lh = slave_locks->handles[i]; + mdt_object_unlock(mti, NULL, s0_lh, decref); + slave_locks->handles[i].cookie = 0ull; + } + } + rc = mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo, policy); RETURN(rc); } -/** - * Lock slave stripes if necessary, the lock handles of slave stripes - * will be stored in einfo->ei_cbdata. - **/ -static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, - enum ldlm_mode mode, __u64 ibits, - struct mdt_lock_handle *s0_lh, - struct mdt_object **s0_objp, - struct ldlm_enqueue_info *einfo) +static int mdt_init_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, + struct lu_fid *fid) { - union ldlm_policy_data *policy = &mti->mti_policy; struct lu_buf *buf = &mti->mti_buf; struct lmv_mds_md_v1 *lmv; - struct lu_fid *fid = &mti->mti_tmp_fid1; int rc; ENTRY; @@ -480,18 +364,50 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, RETURN(-EINVAL); fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]); - if (!lu_fid_eq(fid, mdt_object_fid(obj))) { + + RETURN(rc); +} + +/** + * Lock slave stripes if necessary, the lock handles of slave stripes + * will be stored in einfo->ei_cbdata. + **/ +static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, + enum ldlm_mode mode, __u64 ibits, + struct lu_fid *s0_fid, + struct mdt_lock_handle *s0_lh, + struct mdt_object **s0_objp, + struct ldlm_enqueue_info *einfo) +{ + union ldlm_policy_data *policy = &mti->mti_policy; + int rc; + ENTRY; + + memset(einfo, 0, sizeof(*einfo)); + + rc = mdt_init_slaves(mti, obj, s0_fid); + if (rc <= 0) + RETURN(rc); + + LASSERT(S_ISDIR(obj->mot_header.loh_attr)); + + if (!lu_fid_eq(s0_fid, mdt_object_fid(obj))) { /* Except migrating object, whose 0_stripe and master * object are the same object, 0_stripe and master * object are different, though they are in the same * MDT, to avoid adding osd_object_lock here, so we * will enqueue the stripe0 lock in MDT0 for now */ - *s0_objp = mdt_object_find_lock(mti, fid, s0_lh, ibits); + *s0_objp = mdt_object_find(mti->mti_env, mti->mti_mdt, s0_fid); if (IS_ERR(*s0_objp)) RETURN(PTR_ERR(*s0_objp)); + + rc = mdt_reint_object_lock(mti, *s0_objp, s0_lh, ibits, true); + if (rc < 0) { + mdt_object_put(mti->mti_env, *s0_objp); + RETURN(rc); + } } - memset(einfo, 0, sizeof(*einfo)); einfo->ei_type = LDLM_IBITS; einfo->ei_mode = mode; einfo->ei_cb_bl = mdt_remote_blocking_ast; @@ -507,18 +423,193 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, RETURN(rc); } +/* + * VBR: we save three versions in reply: + * 0 - parent. Check that parent version is the same during replay. + * 1 - name. Version of 'name' if file exists with the same name or + * ENOENT_VERSION, it is needed because file may appear due to missed replays. + * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity + * check. + */ +static int mdt_create(struct mdt_thread_info *info) +{ + struct mdt_device *mdt = info->mti_mdt; + struct mdt_object *parent; + struct mdt_object *child; + struct mdt_lock_handle *lh; + struct mdt_body *repbody; + struct md_attr *ma = &info->mti_attr; + struct mdt_reint_record *rr = &info->mti_rr; + int rc; + ENTRY; + + DEBUG_REQ(D_INODE, mdt_info_req(info), "Create ("DNAME"->"DFID") " + "in "DFID, + PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1)); + + if (!fid_is_md_operative(rr->rr_fid1)) + RETURN(-EPERM); + + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + + parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); + if (IS_ERR(parent)) + RETURN(PTR_ERR(parent)); + + if (!mdt_object_exists(parent)) + GOTO(put_parent, rc = -ENOENT); + + lh = &info->mti_lh[MDT_LH_PARENT]; + mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name); + rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE); + if (rc) + GOTO(put_parent, rc); + + if (!mdt_object_remote(parent)) { + rc = mdt_version_get_check_save(info, parent, 0); + if (rc) + GOTO(unlock_parent, rc); + } + + /* + * Check child name version during replay. + * During create replay a file may exist with same name. + */ + rc = mdt_lookup_version_check(info, parent, &rr->rr_name, + &info->mti_tmp_fid1, 1); + if (rc == 0) + GOTO(unlock_parent, rc = -EEXIST); + + /* -ENOENT is expected here */ + if (rc != -ENOENT) + GOTO(unlock_parent, rc); + + /* save version of file name for replay, it must be ENOENT here */ + mdt_enoent_version_save(info, 1); + + child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2); + if (unlikely(IS_ERR(child))) + GOTO(unlock_parent, rc = PTR_ERR(child)); + + rc = mdt_remote_permission(info); + if (rc != 0) + GOTO(put_child, rc); + + ma->ma_need = MA_INODE; + ma->ma_valid = 0; + + mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, + OBD_FAIL_MDS_REINT_CREATE_WRITE); + + /* Version of child will be updated on disk. */ + tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child)); + rc = mdt_version_get_check_save(info, child, 2); + if (rc) + GOTO(put_child, rc); + + /* Let lower layer know current lock mode. */ + info->mti_spec.sp_cr_mode = mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode); + + /* + * Do not perform lookup sanity check. We know that name does + * not exist. + */ + info->mti_spec.sp_cr_lookup = 0; + info->mti_spec.sp_feat = &dt_directory_features; + + rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name, + mdt_object_child(child), &info->mti_spec, ma); + if (rc == 0) + rc = mdt_attr_get_complex(info, child, ma); + + if (rc < 0) + GOTO(put_child, rc); + + /* + * On DNE, we need to eliminate dependey between 'mkdir a' and + * 'mkdir a/b' if b is a striped directory, to achieve this, two + * things are done below: + * 1. save child and slaves lock. + * 2. if the child is a striped directory, relock parent so to + * compare against with COS locks to ensure parent was + * committed to disk. + */ + if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) { + struct mdt_lock_handle *lhc; + struct mdt_lock_handle *s0_lh; + struct mdt_object *s0_obj = NULL; + struct ldlm_enqueue_info *einfo; + struct lu_fid *s0_fid = &info->mti_tmp_fid1; + bool cos_incompat = false; + + rc = mdt_init_slaves(info, child, s0_fid); + if (rc > 0) { + cos_incompat = true; + if (!mdt_object_remote(parent)) { + mdt_object_unlock(info, parent, lh, 1); + mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name); + rc = mdt_reint_object_lock(info, parent, lh, + MDS_INODELOCK_UPDATE, + true); + if (rc) + GOTO(put_child, rc); + } + } + + einfo = &info->mti_einfo; + lhc = &info->mti_lh[MDT_LH_CHILD]; + mdt_lock_handle_init(lhc); + mdt_lock_reg_init(lhc, LCK_PW); + rc = mdt_reint_object_lock(info, child, lhc, + MDS_INODELOCK_UPDATE, + cos_incompat); + if (rc) + GOTO(put_child, rc); + mdt_object_unlock(info, child, lhc, rc); + + s0_lh = &info->mti_lh[MDT_LH_LOCAL]; + mdt_lock_handle_init(s0_lh); + mdt_lock_reg_init(s0_lh, LCK_PW); + rc = mdt_lock_slaves(info, child, LCK_PW, MDS_INODELOCK_UPDATE, + s0_fid, s0_lh, &s0_obj, einfo); + mdt_unlock_slaves(info, child, MDS_INODELOCK_UPDATE, s0_lh, + s0_obj, einfo, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && rc == -EIO) + rc = 0; + } + + /* Return fid & attr to client. */ + if (ma->ma_valid & MA_INODE) + mdt_pack_attr2body(info, repbody, &ma->ma_attr, + mdt_object_fid(child)); +put_child: + mdt_object_put(info->mti_env, child); +unlock_parent: + mdt_object_unlock(info, parent, lh, rc); +put_parent: + mdt_object_put(info->mti_env, parent); + RETURN(rc); +} + static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, struct md_attr *ma) { struct mdt_lock_handle *lh; - int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS); + int do_vbr = ma->ma_attr.la_valid & + (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS); __u64 lockpart = MDS_INODELOCK_UPDATE; struct ldlm_enqueue_info *einfo = &info->mti_einfo; - struct mdt_lock_handle *s0_lh; - struct mdt_object *s0_obj = NULL; + struct lu_fid *s0_fid = &info->mti_tmp_fid1; + struct mdt_lock_handle *s0_lh = NULL; + struct mdt_object *s0_obj = NULL; + bool cos_incompat = false; int rc; ENTRY; + rc = mdt_init_slaves(info, mo, s0_fid); + if (rc > 0) + cos_incompat = true; + lh = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_reg_init(lh, LCK_PW); @@ -529,13 +620,14 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID)) lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM; - rc = mdt_object_lock(info, mo, lh, lockpart); + rc = mdt_reint_object_lock(info, mo, lh, lockpart, cos_incompat); if (rc != 0) RETURN(rc); s0_lh = &info->mti_lh[MDT_LH_LOCAL]; mdt_lock_reg_init(s0_lh, LCK_PW); - rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_lh, &s0_obj, einfo); + rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_fid, s0_lh, &s0_obj, + einfo); if (rc != 0) GOTO(out_unlock, rc); @@ -557,13 +649,13 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, } /* Ensure constant striping during chown(). See LU-2789. */ - if (ma->ma_attr.la_valid & (LA_UID|LA_GID)) + if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID)) mutex_lock(&mo->mot_lov_mutex); /* all attrs are packed into mti_attr in unpack_setattr */ rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma); - if (ma->ma_attr.la_valid & (LA_UID|LA_GID)) + if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID)) mutex_unlock(&mo->mot_lov_mutex); if (rc != 0) @@ -571,7 +663,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, EXIT; out_unlock: - mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo); + mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo, rc); mdt_object_unlock(info, mo, lh, rc); return rc; } @@ -600,20 +692,12 @@ int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo, /* If an up2date copy exists in the backend, add dirty flag */ if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS) && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) { - struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD]; - ma->ma_hsm.mh_flags |= HS_DIRTY; - mdt_lock_reg_init(lh, LCK_PW); - rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR); - if (rc != 0) - RETURN(rc); - rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm); if (rc) CERROR("file attribute change error for "DFID": %d\n", PFID(mdt_object_fid(mo)), rc); - mdt_object_unlock(info, mo, lh, rc); } RETURN(rc); @@ -659,21 +743,10 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, GOTO(out_put, rc = -EPROTO); rc = mdt_attr_set(info, mo, ma); - if (rc) - GOTO(out_put, rc); - } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) { - struct lu_buf *buf = &info->mti_buf; - - if (ma->ma_attr.la_valid != 0) - GOTO(out_put, rc = -EPROTO); - - buf->lb_buf = ma->ma_lmm; - buf->lb_len = ma->ma_lmm_size; - rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), - buf, XATTR_NAME_LOV, 0); if (rc) GOTO(out_put, rc); - } else if ((ma->ma_valid & MA_LMV) && (ma->ma_valid & MA_INODE)) { + } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) && + (ma->ma_valid & MA_INODE)) { struct lu_buf *buf = &info->mti_buf; struct mdt_lock_handle *lh; @@ -687,15 +760,21 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, lh = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_reg_init(lh, LCK_PW); - rc = mdt_object_lock(info, mo, lh, - MDS_INODELOCK_XATTR); + rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR); if (rc != 0) GOTO(out_put, rc); - buf->lb_buf = ma->ma_lmv; - buf->lb_len = ma->ma_lmv_size; - rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), - buf, XATTR_NAME_DEFAULT_LMV, 0); + if (ma->ma_valid & MA_LOV) { + buf->lb_buf = ma->ma_lmm; + buf->lb_len = ma->ma_lmm_size; + } else { + buf->lb_buf = ma->ma_lmv; + buf->lb_len = ma->ma_lmv_size; + } + rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf, + (ma->ma_valid & MA_LOV) ? + XATTR_NAME_LOV : XATTR_NAME_DEFAULT_LMV, + 0); mdt_object_unlock(info, mo, lh, rc); if (rc) @@ -767,7 +846,7 @@ static int mdt_reint_create(struct mdt_thread_info *info, RETURN(err_serious(-EOPNOTSUPP)); } - rc = mdt_md_create(info); + rc = mdt_create(info); RETURN(rc); } @@ -776,22 +855,24 @@ static int mdt_reint_create(struct mdt_thread_info *info, * Version of child is getting and checking during its lookup. If */ static int mdt_reint_unlink(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc) + struct mdt_lock_handle *lhc) { - struct mdt_reint_record *rr = &info->mti_rr; - struct ptlrpc_request *req = mdt_info_req(info); - struct md_attr *ma = &info->mti_attr; - struct lu_fid *child_fid = &info->mti_tmp_fid1; - struct mdt_object *mp; - struct mdt_object *mc; - struct mdt_lock_handle *parent_lh; - struct mdt_lock_handle *child_lh; + struct mdt_reint_record *rr = &info->mti_rr; + struct ptlrpc_request *req = mdt_info_req(info); + struct md_attr *ma = &info->mti_attr; + struct lu_fid *child_fid = &info->mti_tmp_fid1; + struct mdt_object *mp; + struct mdt_object *mc; + struct mdt_lock_handle *parent_lh; + struct mdt_lock_handle *child_lh; struct ldlm_enqueue_info *einfo = &info->mti_einfo; - struct mdt_lock_handle *s0_lh = NULL; - struct mdt_object *s0_obj = NULL; - __u64 lock_ibits; - int rc; - int no_name = 0; + struct lu_fid *s0_fid = &info->mti_tmp_fid2; + struct mdt_lock_handle *s0_lh = NULL; + struct mdt_object *s0_obj = NULL; + __u64 lock_ibits; + bool cos_incompat = false; + int no_name = 0; + int rc; ENTRY; DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1), @@ -800,34 +881,32 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, if (info->mti_dlm_req) ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) - RETURN(err_serious(-ENOENT)); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) + RETURN(err_serious(-ENOENT)); if (!fid_is_md_operative(rr->rr_fid1)) RETURN(-EPERM); - /* - * step 1: Found the parent. - */ mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); - if (IS_ERR(mp)) { - rc = PTR_ERR(mp); - GOTO(out, rc); + if (IS_ERR(mp)) + RETURN(PTR_ERR(mp)); + + if (mdt_object_remote(mp)) { + cos_incompat = true; + } else { + rc = mdt_version_get_check_save(info, mp, 0); + if (rc) + GOTO(put_parent, rc); } +relock: parent_lh = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name); - rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE); + rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE, + cos_incompat); if (rc != 0) GOTO(put_parent, rc); - if (!mdt_object_remote(mp)) { - rc = mdt_version_get_check_save(info, mp, 0); - if (rc) - GOTO(unlock_parent, rc); - } - - /* step 2: find & lock the child */ /* lookup child object along with version checking */ fid_zero(child_fid); rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid, 1); @@ -864,8 +943,15 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, if (IS_ERR(mc)) GOTO(unlock_parent, rc = PTR_ERR(mc)); - child_lh = &info->mti_lh[MDT_LH_CHILD]; - mdt_lock_reg_init(child_lh, LCK_EX); + if (!cos_incompat && mdt_init_slaves(info, mc, s0_fid) > 0) { + cos_incompat = true; + mdt_object_put(info->mti_env, mc); + mdt_object_unlock(info, mp, parent_lh, -EAGAIN); + goto relock; + } + + child_lh = &info->mti_lh[MDT_LH_CHILD]; + mdt_lock_reg_init(child_lh, LCK_EX); if (info->mti_spec.sp_rm_entry) { struct lu_ucred *uc = mdt_ucred(info); @@ -928,9 +1014,11 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, lock_ibits &= ~MDS_INODELOCK_LOOKUP; } - rc = mdt_object_lock(info, mc, child_lh, lock_ibits); + rc = mdt_reint_object_lock(info, mc, child_lh, lock_ibits, + cos_incompat); if (rc != 0) - GOTO(put_child, rc); + GOTO(unlock_child, rc); + /* * Now we can only make sure we need MA_INODE, in mdd layer, will check * whether need MA_LOV and MA_COOKIE. @@ -940,8 +1028,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, s0_lh = &info->mti_lh[MDT_LH_LOCAL]; mdt_lock_reg_init(s0_lh, LCK_EX); - rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_lh, - &s0_obj, einfo); + rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_fid, + s0_lh, &s0_obj, einfo); if (rc != 0) GOTO(unlock_child, rc); @@ -984,7 +1072,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, EXIT; unlock_child: - mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo); + mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo, + rc); mdt_object_unlock(info, mc, child_lh, rc); put_child: mdt_object_put(info->mti_env, mc); @@ -992,7 +1081,6 @@ unlock_parent: mdt_object_unlock(info, mp, parent_lh, rc); put_parent: mdt_object_put(info->mti_env, mp); -out: return rc; } @@ -1001,109 +1089,117 @@ out: * name. */ static int mdt_reint_link(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc) + struct mdt_lock_handle *lhc) { - struct mdt_reint_record *rr = &info->mti_rr; - struct ptlrpc_request *req = mdt_info_req(info); - struct md_attr *ma = &info->mti_attr; - struct mdt_object *ms; - struct mdt_object *mp; - struct mdt_lock_handle *lhs; - struct mdt_lock_handle *lhp; - int rc; - ENTRY; + struct mdt_reint_record *rr = &info->mti_rr; + struct ptlrpc_request *req = mdt_info_req(info); + struct md_attr *ma = &info->mti_attr; + struct mdt_object *ms; + struct mdt_object *mp; + struct mdt_lock_handle *lhs; + struct mdt_lock_handle *lhp; + bool cos_incompat; + int rc; + ENTRY; DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME, PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name)); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) - RETURN(err_serious(-ENOENT)); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) + RETURN(err_serious(-ENOENT)); if (info->mti_dlm_req) ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); - /* Invalid case so return error immediately instead of - * processing it */ - if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) - RETURN(-EPERM); + /* Invalid case so return error immediately instead of + * processing it */ + if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) + RETURN(-EPERM); if (!fid_is_md_operative(rr->rr_fid1) || !fid_is_md_operative(rr->rr_fid2)) RETURN(-EPERM); - /* step 1: find & lock the target parent dir */ - lhp = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name); - mp = mdt_object_find_lock(info, rr->rr_fid2, lhp, - MDS_INODELOCK_UPDATE); - if (IS_ERR(mp)) - RETURN(PTR_ERR(mp)); - - rc = mdt_version_get_check_save(info, mp, 0); - if (rc) - GOTO(out_unlock_parent, rc); + /* step 1: find target parent dir */ + mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2); + if (IS_ERR(mp)) + RETURN(PTR_ERR(mp)); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5); - - /* step 2: find & lock the source */ - lhs = &info->mti_lh[MDT_LH_CHILD]; - mdt_lock_reg_init(lhs, LCK_EX); + rc = mdt_version_get_check_save(info, mp, 0); + if (rc) + GOTO(put_parent, rc); - ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); - if (IS_ERR(ms)) - GOTO(out_unlock_parent, rc = PTR_ERR(ms)); + /* step 2: find source */ + ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); + if (IS_ERR(ms)) + GOTO(put_parent, rc = PTR_ERR(ms)); if (!mdt_object_exists(ms)) { - mdt_object_put(info->mti_env, ms); CDEBUG(D_INFO, "%s: "DFID" does not exist.\n", mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1)); - GOTO(out_unlock_parent, rc = -ENOENT); + GOTO(put_source, rc = -ENOENT); } - rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE | - MDS_INODELOCK_XATTR); - if (rc != 0) { - mdt_object_put(info->mti_env, ms); - GOTO(out_unlock_parent, rc); - } + cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms)); - /* step 3: link it */ - mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, - OBD_FAIL_MDS_REINT_LINK_WRITE); + lhp = &info->mti_lh[MDT_LH_PARENT]; + mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name); + rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE, + cos_incompat); + if (rc != 0) + GOTO(put_source, rc); + + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5); + + lhs = &info->mti_lh[MDT_LH_CHILD]; + mdt_lock_reg_init(lhs, LCK_EX); + rc = mdt_reint_object_lock(info, ms, lhs, + MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, + cos_incompat); + if (rc != 0) + GOTO(unlock_parent, rc); + + /* step 3: link it */ + mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, + OBD_FAIL_MDS_REINT_LINK_WRITE); tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms)); - rc = mdt_version_get_check_save(info, ms, 1); - if (rc) - GOTO(out_unlock_child, rc); + rc = mdt_version_get_check_save(info, ms, 1); + if (rc) + GOTO(unlock_source, rc); - /** check target version by name during replay */ + /** check target version by name during replay */ rc = mdt_lookup_version_check(info, mp, &rr->rr_name, &info->mti_tmp_fid1, 2); - if (rc != 0 && rc != -ENOENT) - GOTO(out_unlock_child, rc); - /* save version of file name for replay, it must be ENOENT here */ - if (!req_is_replay(mdt_info_req(info))) { + if (rc != 0 && rc != -ENOENT) + GOTO(unlock_source, rc); + /* save version of file name for replay, it must be ENOENT here */ + if (!req_is_replay(mdt_info_req(info))) { if (rc != -ENOENT) { CDEBUG(D_INFO, "link target "DNAME" existed!\n", PNAME(&rr->rr_name)); - GOTO(out_unlock_child, rc = -EEXIST); + GOTO(unlock_source, rc = -EEXIST); } - info->mti_ver[2] = ENOENT_VERSION; - mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2); - } + info->mti_ver[2] = ENOENT_VERSION; + mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2); + } rc = mdo_link(info->mti_env, mdt_object_child(mp), - mdt_object_child(ms), &rr->rr_name, ma); + mdt_object_child(ms), &rr->rr_name, ma); - if (rc == 0) + if (rc == 0) mdt_counter_incr(req, LPROC_MDT_LINK); - EXIT; -out_unlock_child: - mdt_object_unlock_put(info, ms, lhs, rc); -out_unlock_parent: - mdt_object_unlock_put(info, mp, lhp, rc); - return rc; + EXIT; +unlock_source: + mdt_object_unlock(info, ms, lhs, rc); +unlock_parent: + mdt_object_unlock(info, mp, lhp, rc); +put_source: + mdt_object_put(info->mti_env, ms); +put_parent: + mdt_object_put(info->mti_env, mp); + return rc; } /** * lock the part of the directory according to the hash of the name @@ -1111,11 +1207,13 @@ out_unlock_parent: */ static int mdt_pdir_hash_lock(struct mdt_thread_info *info, struct mdt_lock_handle *lh, - struct mdt_object *obj, __u64 ibits) + struct mdt_object *obj, __u64 ibits, + bool cos_incompat) { struct ldlm_res_id *res = &info->mti_res_id; struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; union ldlm_policy_data *policy = &info->mti_policy; + __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; int rc; /* @@ -1126,14 +1224,16 @@ static int mdt_pdir_hash_lock(struct mdt_thread_info *info, fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res); memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = ibits; + if (cos_incompat && + (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX)) + dlmflags |= LDLM_FL_COS_INCOMPAT; /* * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is * going to be sent to client. If it is - mdt_intent_policy() path will * fix it up and turn FL_LOCAL flag off. */ rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB, - &info->mti_exp->exp_handle.h_cookie); + res, dlmflags, &info->mti_exp->exp_handle.h_cookie); return rc; } @@ -1260,6 +1360,7 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info, struct lu_buf *buf = &info->mti_big_buf; struct linkea_data ldata = { NULL }; int count; + int retry_count; int rc; ENTRY; @@ -1278,6 +1379,10 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info, RETURN(rc); } + /* ignore the migrating parent(@pobj) */ + retry_count = ldata.ld_leh->leh_reccount - 1; + +again: LASSERT(ldata.ld_leh != NULL); ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1); for (count = 0; count < ldata.ld_leh->leh_reccount; count++) { @@ -1286,6 +1391,7 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info, struct mdt_lock_list *mll; struct lu_name name; struct lu_fid fid; + __u64 ibits; linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, &name, &fid); @@ -1326,18 +1432,53 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info, /* Since this needs to lock all of objects in linkea, to avoid * deadlocks, because it does not follow parent-child order as - * other MDT operation, let's use try_lock here, i.e. it will - * return immediately once there are conflict locks, and return - * EBUSY to client */ + * other MDT operation, let's use try_lock here and if the lock + * cannot be gotten because of conflicting locks, then drop all + * current locks, send an AST to the client, and start again. */ mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name); - rc = mdt_object_lock_try(info, mdt_pobj, &mll->mll_lh, - MDS_INODELOCK_UPDATE); - if (rc == 0) { - CDEBUG(D_ERROR, "%s: cannot lock "DFID": rc =%d\n", - mdt_obd_name(mdt), PFID(&fid), rc); - mdt_object_put(info->mti_env, mdt_pobj); + ibits = 0; + rc = mdt_object_lock_try(info, mdt_pobj, &mll->mll_lh, &ibits, + MDS_INODELOCK_UPDATE, true); + if (!(ibits & MDS_INODELOCK_UPDATE)) { + mdt_unlock_list(info, lock_list, rc); + + CDEBUG(D_INFO, "%s: busy lock on "DFID" %s retry %d\n", + mdt_obd_name(mdt), PFID(&fid), name.ln_name, + retry_count); + + if (retry_count == 0) { + mdt_object_put(info->mti_env, mdt_pobj); + OBD_FREE_PTR(mll); + GOTO(out, rc = -EBUSY); + } + + rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh, + MDS_INODELOCK_UPDATE); + if (rc != 0) { + mdt_object_put(info->mti_env, mdt_pobj); + OBD_FREE_PTR(mll); + GOTO(out, rc); + } + + if (mdt_object_remote(mdt_pobj)) { + struct ldlm_lock *lock; + + /* For remote object, Set lock to cb_atomic, + * so lock can be released in blocking_ast() + * immediately, then the next try_lock will + * have better chance to succeds */ + lock = + ldlm_handle2lock(&mll->mll_lh.mlh_rreg_lh); + LASSERT(lock != NULL); + lock_res_and_lock(lock); + ldlm_set_atomic_cb(lock); + unlock_res_and_lock(lock); + LDLM_LOCK_PUT(lock); + } + mdt_object_unlock_put(info, mdt_pobj, &mll->mll_lh, rc); OBD_FREE_PTR(mll); - GOTO(out, rc = -EBUSY); + retry_count--; + goto again; } rc = 0; INIT_LIST_HEAD(&mll->mll_list); @@ -1346,7 +1487,6 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info, next: ldata.ld_lee = (struct link_ea_entry *)((char *)ldata.ld_lee + ldata.ld_reclen); - } out: if (rc != 0) @@ -1380,7 +1520,7 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info, /* 1: lock the source dir. */ msrcdir = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); if (IS_ERR(msrcdir)) { - CERROR("%s: cannot find source dir "DFID" : rc = %d\n", + CDEBUG(D_OTHER, "%s: cannot find source dir "DFID" : rc = %d\n", mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1), (int)PTR_ERR(msrcdir)); RETURN(PTR_ERR(msrcdir)); @@ -1388,8 +1528,8 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info, lh_dirp = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name); - rc = mdt_object_lock(info, msrcdir, lh_dirp, - MDS_INODELOCK_UPDATE); + rc = mdt_reint_object_lock(info, msrcdir, lh_dirp, MDS_INODELOCK_UPDATE, + true); if (rc) GOTO(out_put_parent, rc); @@ -1411,19 +1551,22 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info, if (!fid_is_md_operative(old_fid)) GOTO(out_unlock_parent, rc = -EPERM); + if (lu_fid_eq(old_fid, &info->mti_mdt->mdt_md_root_fid)) + GOTO(out_unlock_parent, rc = -EPERM); + mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid); if (IS_ERR(mold)) GOTO(out_unlock_parent, rc = PTR_ERR(mold)); if (mdt_object_remote(mold)) { - CERROR("%s: source "DFID" is on the remote MDT\n", + CDEBUG(D_OTHER, "%s: source "DFID" is on the remote MDT\n", mdt_obd_name(info->mti_mdt), PFID(old_fid)); GOTO(out_put_child, rc = -EREMOTE); } if (S_ISREG(lu_object_attr(&mold->mot_obj)) && !mdt_object_remote(msrcdir)) { - CERROR("%s: parent "DFID" is still on the same" + CDEBUG(D_OTHER, "%s: parent "DFID" is still on the same" " MDT, which should be migrated first:" " rc = %d\n", mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(msrcdir)), -EPERM); @@ -1471,7 +1614,7 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info, lease_broken = ldlm_is_cancel(lease); unlock_res_and_lock(lease); - LDLM_DEBUG(lease, DFID " lease broken? %d\n", + LDLM_DEBUG(lease, DFID " lease broken? %d", PFID(mdt_object_fid(mold)), lease_broken); /* Cancel server side lease. Client side counterpart should @@ -1506,10 +1649,25 @@ out_lease: lock_ibits &= ~MDS_INODELOCK_LOOKUP; } - rc = mdt_object_lock(info, mold, lh_childp, lock_ibits); + rc = mdt_reint_object_lock(info, mold, lh_childp, lock_ibits, true); + if (rc != 0) + GOTO(out_unlock_child, rc); + + /* Migration is incompatible with HSM. */ + ma->ma_need = MA_HSM; + ma->ma_valid = 0; + rc = mdt_attr_get_complex(info, mold, ma); if (rc != 0) GOTO(out_unlock_child, rc); + if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0) { + rc = -ENOSYS; + CDEBUG(D_OTHER, + "%s: cannot migrate HSM archived file "DFID": rc = %d\n", + mdt_obd_name(info->mti_mdt), PFID(old_fid), rc); + GOTO(out_unlock_child, rc); + } + ma->ma_need = MA_LMV; ma->ma_valid = 0; ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf; @@ -1524,7 +1682,7 @@ out_lease: lmv_le_to_cpu(ma->ma_lmv, ma->ma_lmv); lmm1 = &ma->ma_lmv->lmv_md_v1; if (!(lmm1->lmv_hash_type & LMV_HASH_FLAG_MIGRATION)) { - CERROR("%s: can not migrate striped dir "DFID + CDEBUG(D_OTHER, "%s: can not migrate striped dir "DFID ": rc = %d\n", mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(mold)), -EPERM); GOTO(out_unlock_child, rc = -EPERM); @@ -1539,7 +1697,8 @@ out_lease: GOTO(out_unlock_child, rc = PTR_ERR(mnew)); if (!mdt_object_remote(mnew)) { - CERROR("%s: "DFID" being migrated is on this MDT:" + CDEBUG(D_OTHER, + "%s: "DFID" being migrated is on this MDT:" " rc = %d\n", mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid2), -EPERM); GOTO(out_put_new, rc = -EPERM); @@ -1562,7 +1721,7 @@ out_lease: if (IS_ERR(mnew)) GOTO(out_unlock_child, rc = PTR_ERR(mnew)); if (!mdt_object_remote(mnew)) { - CERROR("%s: Migration "DFID" is on this MDT:" + CDEBUG(D_OTHER, "%s: Migration "DFID" is on this MDT:" " rc = %d\n", mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid2), -EXDEV); GOTO(out_put_new, rc = -EXDEV); @@ -1590,7 +1749,10 @@ out_put_new: out_unlock_child: mdt_object_unlock(info, mold, lh_childp, rc); out_unlock_list: - mdt_unlock_list(info, &lock_list, rc); + /* we don't really modify linkea objects, so we can safely decref these + * locks, and this can avoid saving them as COS locks, which may prevent + * subsequent migrate. */ + mdt_unlock_list(info, &lock_list, 1); if (lease != NULL) { ldlm_reprocess_all(lease->l_resource); LDLM_LOCK_PUT(lease); @@ -1634,12 +1796,13 @@ out_put: static int mdt_object_lock_save(struct mdt_thread_info *info, struct mdt_object *dir, struct mdt_lock_handle *lh, - int idx) + int idx, bool cos_incompat) { int rc; /* we lock the target dir if it is local */ - rc = mdt_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE); + rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE, + cos_incompat); if (rc != 0) return rc; @@ -1648,146 +1811,131 @@ static int mdt_object_lock_save(struct mdt_thread_info *info, return 0; } - -static int mdt_rename_parents_lock(struct mdt_thread_info *info, - struct mdt_object **srcp, - struct mdt_object **tgtp) +/* + * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent; + * 2 - srcdir child; 3 - tgtdir child. + * Update on disk version of srcdir child. + */ +/** + * For DNE phase I, only these renames are allowed + * mv src_p/src_c tgt_p/tgt_c + * 1. src_p/src_c/tgt_p/tgt_c are in the same MDT. + * 2. src_p and tgt_p are same directory, and tgt_c does not + * exists. In this case, all of modification will happen + * in the MDT where ithesource parent is, only one remote + * update is needed, i.e. set c_time/m_time on the child. + * And tgt_c will be still in the same MDT as the original + * src_c. + */ +static int mdt_reint_rename_internal(struct mdt_thread_info *info, + struct mdt_lock_handle *lhc) { struct mdt_reint_record *rr = &info->mti_rr; - const struct lu_fid *fid_src = rr->rr_fid1; - const struct lu_fid *fid_tgt = rr->rr_fid2; - struct mdt_lock_handle *lh_src = &info->mti_lh[MDT_LH_PARENT]; - struct mdt_lock_handle *lh_tgt = &info->mti_lh[MDT_LH_CHILD]; - struct mdt_object *src; - struct mdt_object *tgt; - int reverse = 0; - int rc; + struct md_attr *ma = &info->mti_attr; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_object *msrcdir = NULL; + struct mdt_object *mtgtdir = NULL; + struct mdt_object *mold; + struct mdt_object *mnew = NULL; + struct mdt_lock_handle *lh_srcdirp; + struct mdt_lock_handle *lh_tgtdirp; + struct mdt_lock_handle *lh_oldp = NULL; + struct mdt_lock_handle *lh_newp = NULL; + struct lu_fid *old_fid = &info->mti_tmp_fid1; + struct lu_fid *new_fid = &info->mti_tmp_fid2; + __u64 lock_ibits; + bool reverse = false; + bool cos_incompat; + int rc; ENTRY; + DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME, + PFID(rr->rr_fid1), PNAME(&rr->rr_name), + PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name)); + /* find both parents. */ - src = mdt_object_find_check(info, fid_src, 0); - if (IS_ERR(src)) - RETURN(PTR_ERR(src)); + msrcdir = mdt_object_find_check(info, rr->rr_fid1, 0); + if (IS_ERR(msrcdir)) + RETURN(PTR_ERR(msrcdir)); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5); - if (lu_fid_eq(fid_src, fid_tgt)) { - tgt = src; - mdt_object_get(info->mti_env, tgt); + if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) { + mtgtdir = msrcdir; + mdt_object_get(info->mti_env, mtgtdir); } else { - /* Check if the @src is not a child of the @tgt, otherwise a - * reverse locking must take place. */ - rc = mdt_is_subdir(info, src, fid_tgt); + /* Check if the @msrcdir is not a child of the @mtgtdir, + * otherwise a reverse locking must take place. */ + rc = mdt_is_subdir(info, msrcdir, rr->rr_fid2); if (rc == -EINVAL) - reverse = 1; + reverse = true; else if (rc) - GOTO(err_src_put, rc); + GOTO(out_put_srcdir, rc); - tgt = mdt_object_find_check(info, fid_tgt, 1); - if (IS_ERR(tgt)) - GOTO(err_src_put, rc = PTR_ERR(tgt)); + mtgtdir = mdt_object_find_check(info, rr->rr_fid2, 1); + if (IS_ERR(mtgtdir)) + GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir)); } + /* source needs to be looked up after locking source parent, otherwise + * this rename may race with unlink source, and cause rename hang, see + * sanityn.sh 55b, so check parents first, if later we found source is + * remote, relock parents. */ + cos_incompat = (mdt_object_remote(msrcdir) || + mdt_object_remote(mtgtdir)); + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5); /* lock parents in the proper order. */ + lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; + lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD]; + +relock: + mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name); + mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name); + if (reverse) { - rc = mdt_object_lock_save(info, tgt, lh_tgt, 1); + rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1, + cos_incompat); if (rc) - GOTO(err_tgt_put, rc); + GOTO(out_put_tgtdir, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5); - rc = mdt_object_lock_save(info, src, lh_src, 0); + rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0, + cos_incompat); + if (rc != 0) { + mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc); + GOTO(out_put_tgtdir, rc); + } } else { - rc = mdt_object_lock_save(info, src, lh_src, 0); + rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0, + cos_incompat); if (rc) - GOTO(err_tgt_put, rc); + GOTO(out_put_tgtdir, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5); - if (tgt != src) - rc = mdt_object_lock_save(info, tgt, lh_tgt, 1); - else if (lh_src->mlh_pdo_hash != lh_tgt->mlh_pdo_hash) { - rc = mdt_pdir_hash_lock(info, lh_tgt, tgt, - MDS_INODELOCK_UPDATE); + if (mtgtdir != msrcdir) { + rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1, + cos_incompat); + } else if (lh_srcdirp->mlh_pdo_hash != + lh_tgtdirp->mlh_pdo_hash) { + rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir, + MDS_INODELOCK_UPDATE, + cos_incompat); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10); } + if (rc != 0) { + mdt_object_unlock(info, msrcdir, lh_srcdirp, rc); + GOTO(out_put_tgtdir, rc); + } } - if (rc) - GOTO(err_unlock, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5); - - *srcp = src; - *tgtp = tgt; - RETURN(0); - -err_unlock: - /* The order does not matter as the handle is checked inside, - * as well as not used handle. */ - mdt_object_unlock(info, src, lh_src, rc); - mdt_object_unlock(info, tgt, lh_tgt, rc); -err_tgt_put: - mdt_object_put(info->mti_env, tgt); -err_src_put: - mdt_object_put(info->mti_env, src); - RETURN(rc); -} - -/* - * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent; - * 2 - src child; 3 - tgt child. - * Update on disk version of src child. - */ -/** - * For DNE phase I, only these renames are allowed - * mv src_p/src_c tgt_p/tgt_c - * 1. src_p/src_c/tgt_p/tgt_c are in the same MDT. - * 2. src_p and tgt_p are same directory, and tgt_c does not - * exists. In this case, all of modification will happen - * in the MDT where ithesource parent is, only one remote - * update is needed, i.e. set c_time/m_time on the child. - * And tgt_c will be still in the same MDT as the original - * src_c. - */ -static int mdt_reint_rename_internal(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc) -{ - struct mdt_reint_record *rr = &info->mti_rr; - struct md_attr *ma = &info->mti_attr; - struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_object *msrcdir = NULL; - struct mdt_object *mtgtdir = NULL; - struct mdt_object *mold; - struct mdt_object *mnew = NULL; - struct mdt_lock_handle *lh_srcdirp; - struct mdt_lock_handle *lh_tgtdirp; - struct mdt_lock_handle *lh_oldp = NULL; - struct mdt_lock_handle *lh_newp = NULL; - struct lu_fid *old_fid = &info->mti_tmp_fid1; - struct lu_fid *new_fid = &info->mti_tmp_fid2; - __u64 lock_ibits; - int rc; - ENTRY; - - DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME, - PFID(rr->rr_fid1), PNAME(&rr->rr_name), - PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name)); - - lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name); - lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD]; - mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name); - - /* step 1&2: lock the source and target dirs. */ - rc = mdt_rename_parents_lock(info, &msrcdir, &mtgtdir); - if (rc) - RETURN(rc); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5); - /* step 3: find & lock the old object. */ + /* find mold object. */ fid_zero(old_fid); rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2); if (rc != 0) @@ -1805,17 +1953,27 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, /* Check if @mtgtdir is subdir of @mold, before locking child * to avoid reverse locking. */ - rc = mdt_is_subdir(info, mtgtdir, old_fid); - if (rc) - GOTO(out_put_old, rc); + if (mtgtdir != msrcdir) { + rc = mdt_is_subdir(info, mtgtdir, old_fid); + if (rc) + GOTO(out_put_old, rc); + } tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold)); /* save version after locking */ mdt_version_get_save(info, mold, 2); - /* step 4: find & lock the new object. */ - /* new target object may not exist now */ - /* lookup with version checking */ + if (!cos_incompat && mdt_object_remote(mold)) { + cos_incompat = true; + mdt_object_put(info->mti_env, mold); + mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN); + mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN); + goto relock; + } + + /* find mnew object: + * mnew target object may not exist now + * lookup with version checking */ fid_zero(new_fid); rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid, 3); @@ -1856,7 +2014,6 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, lh_oldp = &info->mti_lh[MDT_LH_OLD]; mdt_lock_reg_init(lh_oldp, LCK_EX); - lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR; if (mdt_object_remote(msrcdir)) { /* Enqueue lookup lock from the parent MDT */ @@ -1872,15 +2029,18 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, lock_ibits &= ~MDS_INODELOCK_LOOKUP; } - rc = mdt_object_lock(info, mold, lh_oldp, lock_ibits); + rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits, + cos_incompat); if (rc != 0) GOTO(out_unlock_old, rc); /* Check if @msrcdir is subdir of @mnew, before locking child * to avoid reverse locking. */ - rc = mdt_is_subdir(info, msrcdir, new_fid); - if (rc) - GOTO(out_unlock_old, rc); + if (mtgtdir != msrcdir) { + rc = mdt_is_subdir(info, msrcdir, new_fid); + if (rc) + GOTO(out_unlock_old, rc); + } /* We used to acquire MDS_INODELOCK_FULL here but we * can't do this now because a running HSM restore on @@ -1889,9 +2049,10 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, lh_newp = &info->mti_lh[MDT_LH_NEW]; mdt_lock_reg_init(lh_newp, LCK_EX); - rc = mdt_object_lock(info, mnew, lh_newp, - MDS_INODELOCK_LOOKUP | - MDS_INODELOCK_UPDATE); + rc = mdt_reint_object_lock(info, mnew, lh_newp, + MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE, + cos_incompat); if (rc != 0) GOTO(out_unlock_old, rc); @@ -1902,7 +2063,6 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, } else { lh_oldp = &info->mti_lh[MDT_LH_OLD]; mdt_lock_reg_init(lh_oldp, LCK_EX); - lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR; if (mdt_object_remote(msrcdir)) { /* Enqueue lookup lock from the parent MDT */ @@ -1913,14 +2073,15 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, MDS_INODELOCK_LOOKUP, false); if (rc != ELDLM_OK) - GOTO(out_put_new, rc); + GOTO(out_put_old, rc); lock_ibits &= ~MDS_INODELOCK_LOOKUP; } - rc = mdt_object_lock(info, mold, lh_oldp, lock_ibits); + rc = mdt_reint_object_lock(info, mold, lh_oldp, lock_ibits, + cos_incompat); if (rc != 0) - GOTO(out_put_old, rc); + GOTO(out_unlock_old, rc); mdt_enoent_version_save(info, 3); } @@ -1963,8 +2124,12 @@ out_put_new: out_put_old: mdt_object_put(info->mti_env, mold); out_unlock_parents: - mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc); - mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc); + mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc); + mdt_object_unlock(info, msrcdir, lh_srcdirp, rc); +out_put_tgtdir: + mdt_object_put(info->mti_env, mtgtdir); +out_put_srcdir: + mdt_object_put(info->mti_env, msrcdir); return rc; }