Whamcloud - gitweb
LU-11047 mdt: standardize mdt object locking
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index f45a84b..17b5e6b 100644 (file)
@@ -218,30 +218,29 @@ int mdt_lookup_version_check(struct mdt_thread_info *info,
 
 }
 
-static int mdt_unlock_slaves(struct mdt_thread_info *mti,
-                            struct mdt_object *obj,
-                            struct ldlm_enqueue_info *einfo,
-                            int decref)
+static int mdt_stripes_unlock(struct mdt_thread_info *mti,
+                             struct mdt_object *obj,
+                             struct ldlm_enqueue_info *einfo,
+                             int decref)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
        struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
-       struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
+       struct lustre_handle_array *locks = einfo->ei_cbdata;
        int i;
 
        LASSERT(S_ISDIR(obj->mot_header.loh_attr));
-       LASSERT(slave_locks);
+       LASSERT(locks);
 
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = einfo->ei_inodebits;
-       mdt_lock_handle_init(lh);
        mdt_lock_reg_init(lh, einfo->ei_mode);
-       for (i = 0; i < slave_locks->ha_count; i++) {
-               if (test_bit(i, (void *)slave_locks->ha_map))
-                       lh->mlh_rreg_lh = slave_locks->ha_handles[i];
+       for (i = 0; i < locks->ha_count; i++) {
+               if (test_bit(i, (void *)locks->ha_map))
+                       lh->mlh_rreg_lh = locks->ha_handles[i];
                else
-                       lh->mlh_reg_lh = slave_locks->ha_handles[i];
+                       lh->mlh_reg_lh = locks->ha_handles[i];
                mdt_object_unlock(mti, NULL, lh, decref);
-               slave_locks->ha_handles[i].cookie = 0ull;
+               locks->ha_handles[i].cookie = 0ull;
        }
 
        return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
@@ -276,14 +275,13 @@ static inline int mdt_object_striped(struct mdt_thread_info *mti,
  * Lock slave stripes if necessary, the lock handles of slave stripes
  * will be stored in einfo->ei_cbdata.
  **/
-static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
-                          enum ldlm_mode mode, __u64 ibits,
-                          struct ldlm_enqueue_info *einfo)
+static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
+                           enum ldlm_mode mode, __u64 ibits,
+                           struct ldlm_enqueue_info *einfo)
 {
        union ldlm_policy_data *policy = &mti->mti_policy;
 
        LASSERT(S_ISDIR(obj->mot_header.loh_attr));
-
        einfo->ei_type = LDLM_IBITS;
        einfo->ei_mode = mode;
        einfo->ei_cb_bl = mdt_remote_blocking_ast;
@@ -300,48 +298,77 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
                              policy);
 }
 
-int mdt_reint_striped_lock(struct mdt_thread_info *info,
-                          struct mdt_object *o,
-                          struct mdt_lock_handle *lh,
-                          __u64 ibits,
-                          struct ldlm_enqueue_info *einfo,
-                          bool cos_incompat)
+/** lock object, and stripes if it's a striped directory
+ *
+ * object should be local, this is called in operations which modify both object
+ * and stripes.
+ *
+ * \param info         struct mdt_thread_info
+ * \param parent       parent object, if it's NULL, find parent by mdo_lookup()
+ * \param child                child object
+ * \param lh           lock handle
+ * \param einfo                struct ldlm_enqueue_info
+ * \param ibits                MDS inode lock bits
+ * \param mode         lock mode
+ * \param cos_incompat DNE COS incompatible
+ *
+ * \retval             0 on success, -ev on error.
+ */
+int mdt_object_stripes_lock(struct mdt_thread_info *info,
+                           struct mdt_object *parent,
+                           struct mdt_object *child,
+                           struct mdt_lock_handle *lh,
+                           struct ldlm_enqueue_info *einfo, __u64 ibits,
+                           enum ldlm_mode mode, bool cos_incompat)
 {
        int rc;
 
-       LASSERT(!mdt_object_remote(o));
+       ENTRY;
+       /* according to the protocol, child should be local, is request sent to
+        * wrong MDT.
+        */
+       if (mdt_object_remote(child)) {
+               CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
+                      -EREMOTE);
+               RETURN(-EREMOTE);
+       }
 
        memset(einfo, 0, sizeof(*einfo));
-
-       rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
-       if (rc)
-               return rc;
-
-       rc = mdt_object_striped(info, o);
-       if (rc != 1) {
-               if (rc < 0)
-                       mdt_object_unlock(info, o, lh, rc);
-               return rc;
+       if (ibits & MDS_INODELOCK_LOOKUP) {
+               LASSERT(parent);
+               rc = mdt_object_check_lock(info, parent, child, lh, ibits,
+                                          mode, cos_incompat);
+       } else {
+               rc = mdt_object_lock(info, child, lh, ibits, mode,
+                                    cos_incompat);
        }
+       if (rc)
+               RETURN(rc);
 
-       rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
-       if (rc) {
-               mdt_object_unlock(info, o, lh, rc);
-               if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
-                       rc = 0;
+       if (S_ISDIR(child->mot_header.loh_attr)) {
+               rc = mdt_stripes_lock(info, child, mode, ibits, einfo);
+               if (rc) {
+                       mdt_object_unlock(info, child, lh, rc);
+                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
+                           rc == -EIO)
+                               rc = 0;
+               }
        }
 
-       return rc;
+       RETURN(rc);
 }
 
-void mdt_reint_striped_unlock(struct mdt_thread_info *info,
-                             struct mdt_object *o,
+void mdt_object_stripes_unlock(struct mdt_thread_info *info,
+                             struct mdt_object *obj,
                              struct mdt_lock_handle *lh,
                              struct ldlm_enqueue_info *einfo, int decref)
 {
+       /* this is checked in mdt_object_stripes_lock() */
+       LASSERT(!mdt_object_remote(obj));
        if (einfo->ei_cbdata)
-               mdt_unlock_slaves(info, o, einfo, decref);
-       mdt_object_unlock(info, o, lh, decref);
+               mdt_stripes_unlock(info, obj, einfo, decref);
+       mdt_object_unlock(info, obj, lh, decref);
 }
 
 static int mdt_restripe(struct mdt_thread_info *info,
@@ -379,9 +406,7 @@ static int mdt_restripe(struct mdt_thread_info *info,
                RETURN(rc);
 
        lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lhp, LCK_PW, lname);
-       rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
-                                  true);
+       rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW, true);
        if (rc)
                RETURN(rc);
 
@@ -437,20 +462,8 @@ static int mdt_restripe(struct mdt_thread_info *info,
 
        /* lock object */
        lhc = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(lhc, LCK_EX);
-
-       /* enqueue object remote LOOKUP lock */
-       if (mdt_object_remote(parent)) {
-               rc = mdt_remote_object_lock(info, parent, fid,
-                                           &lhc->mlh_rreg_lh,
-                                           lhc->mlh_rreg_mode,
-                                           MDS_INODELOCK_LOOKUP, false);
-               if (rc != ELDLM_OK)
-                       GOTO(out_child, rc);
-       }
-
-       rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
-                                   true);
+       rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
+                                    MDS_INODELOCK_FULL, LCK_PW, true);
        if (rc)
                GOTO(unlock_child, rc);
 
@@ -483,7 +496,7 @@ static int mdt_restripe(struct mdt_thread_info *info,
 restriping_clear:
        child->mot_restriping = 0;
 unlock_child:
-       mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+       mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
 out_child:
        mdt_object_put(info->mti_env, child);
 unlock_parent:
@@ -607,8 +620,7 @@ static int mdt_create(struct mdt_thread_info *info)
        OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
 
        lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
-       rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
+       rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW, false);
        if (rc)
                GOTO(put_parent, rc);
 
@@ -679,25 +691,22 @@ static int mdt_create(struct mdt_thread_info *info)
                if (cos_incompat) {
                        if (!mdt_object_remote(parent)) {
                                mdt_object_unlock(info, parent, lh, 1);
-                               mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
-                               rc = mdt_reint_object_lock(info, parent, lh,
-                                                          MDS_INODELOCK_UPDATE,
-                                                          true);
+                               rc = mdt_parent_lock(info, parent, lh,
+                                                    &rr->rr_name, LCK_PW,
+                                                    true);
                                if (rc)
                                        GOTO(put_child, rc);
                        }
                }
 
                lhc = &info->mti_lh[MDT_LH_CHILD];
-               mdt_lock_handle_init(lhc);
-               mdt_lock_reg_init(lhc, LCK_PW);
-               rc = mdt_reint_striped_lock(info, child, lhc,
-                                           MDS_INODELOCK_UPDATE, einfo,
-                                           cos_incompat);
+               rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
+                                            MDS_INODELOCK_UPDATE, LCK_PW,
+                                            cos_incompat);
                if (rc)
                        GOTO(put_child, rc);
 
-               mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+               mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
        }
 
        /* Return fid & attr to client. */
@@ -729,27 +738,19 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        rc = mdt_object_striped(info, mo);
        if (rc < 0)
                RETURN(rc);
-
        cos_incompat = rc;
 
-       lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_reg_init(lh, LCK_PW);
-
-       /* Even though the new MDT will grant PERM lock to the old
-        * client, but the old client will almost ignore that during
-        * So it needs to revoke both LOOKUP and PERM lock here, so
-        * both new and old client can cancel the dcache
-        */
        if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
-               lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
+               lockpart |= MDS_INODELOCK_PERM;
        /* Clear xattr cache on clients, so the virtual project ID xattr
         * can get the new project ID
         */
        if (ma->ma_attr.la_valid & LA_PROJID)
                lockpart |= MDS_INODELOCK_XATTR;
 
-       rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
-                                   cos_incompat);
+       lh = &info->mti_lh[MDT_LH_PARENT];
+       rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
+                                    LCK_PW, cos_incompat);
        if (rc != 0)
                RETURN(rc);
 
@@ -781,7 +782,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
        EXIT;
 out_unlock:
-       mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
+       mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
        return rc;
 }
 
@@ -869,9 +870,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
 
                if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
                        lhc = &info->mti_lh[MDT_LH_LOCAL];
-                       mdt_lock_reg_init(lhc, LCK_CW);
-
-                       rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
+                       rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
+                                            LCK_CW, false);
                        if (rc != 0) {
                                up_read(&mo->mot_open_sem);
                                GOTO(out_put, rc);
@@ -946,7 +946,6 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                struct lu_ucred *uc = mdt_ucred(info);
                struct mdt_lock_handle *lh;
                const char *name;
-               __u64 lockpart = MDS_INODELOCK_XATTR;
 
                /* reject if either remote or striped dir is disabled */
                if (ma->ma_valid & MA_LMV) {
@@ -971,27 +970,28 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                        GOTO(out_put, rc = -EPROTO);
 
                lh = &info->mti_lh[MDT_LH_PARENT];
-               mdt_lock_reg_init(lh, LCK_PW);
-
                if (ma->ma_valid & MA_LOV) {
                        buf->lb_buf = ma->ma_lmm;
                        buf->lb_len = ma->ma_lmm_size;
                        name = XATTR_NAME_LOV;
+                       rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
+                                            LCK_PW, false);
                } else {
-                       struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
-                       struct lu_fid *pfid = &info->mti_tmp_fid1;
-                       struct lu_name *pname = &info->mti_name;
-                       const char dotdot[] = "..";
-                       struct mdt_object *pobj;
-
-                       buf->lb_buf = lmu;
+                       buf->lb_buf = &ma->ma_lmv->lmv_user_md;
                        buf->lb_len = ma->ma_lmv_size;
                        name = XATTR_NAME_DEFAULT_LMV;
 
-                       if (fid_is_root(rr->rr_fid1)) {
-                               lockpart |= MDS_INODELOCK_LOOKUP;
+                       if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
+                               rc = mdt_object_lock(info, mo, lh,
+                                                    MDS_INODELOCK_XATTR |
+                                                    MDS_INODELOCK_LOOKUP,
+                                                    LCK_PW, false);
                        } else {
-                               /* force client to update dir default layout */
+                               struct lu_fid *pfid = &info->mti_tmp_fid1;
+                               struct lu_name *pname = &info->mti_name;
+                               const char dotdot[] = "..";
+                               struct mdt_object *pobj;
+
                                fid_zero(pfid);
                                pname->ln_name = dotdot;
                                pname->ln_namelen = sizeof(dotdot);
@@ -1001,27 +1001,19 @@ static int mdt_reint_setattr(struct mdt_thread_info *info,
                                if (rc)
                                        GOTO(out_put, rc);
 
-                               pobj = mdt_object_find(info->mti_env, mdt,
-                                                      pfid);
+                               pobj = mdt_object_find(info->mti_env,
+                                                      info->mti_mdt, pfid);
                                if (IS_ERR(pobj))
                                        GOTO(out_put, rc = PTR_ERR(pobj));
 
-                               if (mdt_object_remote(pobj))
-                                       rc = mdt_remote_object_lock(info, pobj,
-                                               mdt_object_fid(mo),
-                                               &lh->mlh_rreg_lh, LCK_EX,
-                                               MDS_INODELOCK_LOOKUP, false);
-                               else
-                                       lockpart |= MDS_INODELOCK_LOOKUP;
-
+                               rc = mdt_object_check_lock(info, pobj, mo, lh,
+                                                          MDS_INODELOCK_XATTR |
+                                                          MDS_INODELOCK_LOOKUP,
+                                                          LCK_PW, false);
                                mdt_object_put(info->mti_env, pobj);
-
-                               if (rc)
-                                       GOTO(out_put, rc);
                        }
                }
 
-               rc = mdt_object_lock(info, mo, lh, lockpart);
                if (rc != 0)
                        GOTO(out_put, rc);
 
@@ -1124,7 +1116,6 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        struct mdt_lock_handle *child_lh;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
        struct lu_ucred *uc  = mdt_ucred(info);
-       __u64 lock_ibits;
        bool cos_incompat = false;
        int no_name = 0;
        ktime_t kstart = ktime_get();
@@ -1163,9 +1154,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
 relock:
        parent_lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
-       rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
-                                  cos_incompat);
+       rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW,
+                            cos_incompat);
        if (rc != 0)
                GOTO(put_parent, rc);
 
@@ -1249,7 +1239,6 @@ relock:
        }
 
        child_lh = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(child_lh, LCK_EX);
        if (mdt_object_remote(mc)) {
                struct mdt_body  *repbody;
 
@@ -1273,7 +1262,11 @@ relock:
                 * would happen if another client try to grab the LOOKUP
                 * lock at the same time with unlink XXX
                 */
-               mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
+               rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX,
+                                           false);
+               if (rc)
+                       GOTO(put_child, rc);
+
                repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
                LASSERT(repbody != NULL);
                repbody->mbo_fid1 = *mdt_object_fid(mc);
@@ -1284,21 +1277,10 @@ relock:
         * this now because a running HSM restore on the child (unlink
         * victim) will hold the layout lock. See LU-4002.
         */
-       lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
-       if (mdt_object_remote(mp)) {
-               /* Enqueue lookup lock from parent MDT */
-               rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
-                                           &child_lh->mlh_rreg_lh,
-                                           child_lh->mlh_rreg_mode,
-                                           MDS_INODELOCK_LOOKUP, false);
-               if (rc != ELDLM_OK)
-                       GOTO(put_child, rc);
-
-               lock_ibits &= ~MDS_INODELOCK_LOOKUP;
-       }
-
-       rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
-                                   cos_incompat);
+       rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
+                                    MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_UPDATE,
+                                    LCK_EX, cos_incompat);
        if (rc != 0)
                GOTO(put_child, rc);
 
@@ -1357,7 +1339,7 @@ out_stat:
        EXIT;
 
 unlock_child:
-       mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
+       mdt_object_stripes_unlock(info, mc, child_lh, einfo, rc);
 put_child:
        if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
            info->mti_big_buf.lb_buf)
@@ -1444,19 +1426,16 @@ static int mdt_reint_link(struct mdt_thread_info *info,
        OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
 
        lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
-       rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
-                                  cos_incompat);
+       rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW, cos_incompat);
        if (rc != 0)
                GOTO(put_source, rc);
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
 
        lhs = &info->mti_lh[MDT_LH_CHILD];
-       mdt_lock_reg_init(lhs, LCK_EX);
-       rc = mdt_reint_object_lock(info, ms, lhs,
-                                  MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
-                                  cos_incompat);
+       rc = mdt_object_lock(info, ms, lhs,
+                            MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, LCK_EX,
+                            cos_incompat);
        if (rc != 0)
                GOTO(unlock_parent, rc);
 
@@ -1503,98 +1482,37 @@ put_parent:
        mdt_object_put(info->mti_env, mp);
        return rc;
 }
-/**
- * lock the part of the directory according to the hash of the name
- * (lh->mlh_pdo_hash) in parallel directory lock.
- */
-static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
-                             struct mdt_lock_handle *lh,
-                             struct mdt_object *obj, __u64 ibits,
-                             bool cos_incompat)
-{
-       struct ldlm_res_id *res = &info->mti_res_id;
-       struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
-       union ldlm_policy_data *policy = &info->mti_policy;
-       __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
-       int rc;
-
-       /*
-        * Finish res_id initializing by name hash marking part of
-        * directory which is taking modification.
-        */
-       LASSERT(lh->mlh_pdo_hash != 0);
-       fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
-       memset(policy, 0, sizeof(*policy));
-       policy->l_inodebits.bits = ibits;
-       if (cos_incompat &&
-           (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
-               dlmflags |= LDLM_FL_COS_INCOMPAT;
-       /*
-        * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
-        * going to be sent to client. If it is - mdt_intent_policy() path will
-        * fix it up and turn FL_LOCAL flag off.
-        */
-       rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
-                         policy, res, dlmflags,
-                         &info->mti_exp->exp_handle.h_cookie);
-       return rc;
-}
 
 /**
  * Get BFL lock for rename or migrate process.
  **/
 static int mdt_rename_lock(struct mdt_thread_info *info,
-                          struct lustre_handle *lh)
+                          struct mdt_lock_handle *lh)
 {
-       int     rc;
+       struct lu_fid *fid = &info->mti_tmp_fid1;
+       struct mdt_object *obj;
+       __u64 ibits = MDS_INODELOCK_UPDATE;
+       int rc;
 
        ENTRY;
-       if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
-               struct lu_fid *fid = &info->mti_tmp_fid1;
-               struct mdt_object *obj;
+       lu_root_fid(fid);
+       obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+       if (IS_ERR(obj))
+               RETURN(PTR_ERR(obj));
 
-               /* XXX, right now, it has to use object API to
-                * enqueue lock cross MDT, so it will enqueue
-                * rename lock(with LUSTRE_BFL_FID) by root object
-                */
-               lu_root_fid(fid);
-               obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
-               if (IS_ERR(obj))
-                       RETURN(PTR_ERR(obj));
-
-               rc = mdt_remote_object_lock(info, obj,
-                                           &LUSTRE_BFL_FID, lh,
-                                           LCK_EX,
-                                           MDS_INODELOCK_UPDATE, false);
-               mdt_object_put(info->mti_env, obj);
-       } else {
-               struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
-               union ldlm_policy_data *policy = &info->mti_policy;
-               struct ldlm_res_id *res_id = &info->mti_res_id;
-               __u64 flags = 0;
-
-               fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
-               memset(policy, 0, sizeof(*policy));
-               policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
-               flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
-               rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
-                                           LDLM_IBITS, policy, LCK_EX, &flags,
-                                           ldlm_blocking_ast,
-                                           ldlm_completion_ast, NULL, NULL, 0,
-                                           LVB_T_NONE,
-                                           &info->mti_exp->exp_handle.h_cookie,
-                                           lh);
-               RETURN(rc);
-       }
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
+                                     &ibits, 0, false, false);
+       mdt_object_put(info->mti_env, obj);
        RETURN(rc);
 }
 
-static void mdt_rename_unlock(struct lustre_handle *lh)
+static void mdt_rename_unlock(struct mdt_thread_info *info,
+                             struct mdt_lock_handle *lh)
 {
        ENTRY;
-       LASSERT(lustre_handle_is_used(lh));
        /* Cancel the single rename lock right away */
-       ldlm_lock_decref_and_cancel(lh, LCK_EX);
+       mdt_object_unlock(info, NULL, lh, 1);
        EXIT;
 }
 
@@ -1638,32 +1556,8 @@ int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
        struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
        int rc;
 
-       mdt_lock_handle_init(lh);
-       mdt_lock_reg_init(lh, LCK_EX);
-
-       if (mdt_object_remote(pobj)) {
-               /* don't bother to check if pobj and obj are on the same MDT. */
-               rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
-                                           &lh->mlh_rreg_lh, LCK_EX,
-                                           MDS_INODELOCK_LOOKUP, false);
-       } else if (mdt_object_remote(obj)) {
-               struct ldlm_res_id *res = &info->mti_res_id;
-               union ldlm_policy_data *policy = &info->mti_policy;
-               __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
-                                LDLM_FL_COS_INCOMPAT;
-
-               fid_build_reg_res_name(mdt_object_fid(obj), res);
-               memset(policy, 0, sizeof(*policy));
-               policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
-               rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
-                                 &lh->mlh_reg_lh, LCK_EX, policy, res,
-                                 dlmflags, NULL);
-       } else {
-               /* do nothing if both are local */
-               return 0;
-       }
-
-       if (rc != ELDLM_OK)
+       rc = mdt_object_lookup_lock(info, pobj, obj, lh, LCK_EX, true);
+       if (rc)
                return rc;
 
        /*
@@ -1709,7 +1603,7 @@ static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
                mdt_unlock_list(info, slave_locks, decref);
                mdt_object_unlock(info, obj, lh, decref);
        } else {
-               mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
+               mdt_object_stripes_unlock(info, obj, lh, einfo, decref);
        }
 }
 
@@ -1851,10 +1745,9 @@ static int mdt_link_parents_lock(struct mdt_thread_info *info,
                 * one, and continue processing the remaining entries, and in
                 * the end of the loop restart from beginning.
                 */
-               mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
                ibits = 0;
                rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
-                                        MDS_INODELOCK_UPDATE, true);
+                                        MDS_INODELOCK_UPDATE, LCK_PW, true);
                if (!(ibits & MDS_INODELOCK_UPDATE)) {
 
                        CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
@@ -1870,9 +1763,9 @@ static int mdt_link_parents_lock(struct mdt_thread_info *info,
 
                        blocked = true;
 
-                       mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
                        rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
-                                            MDS_INODELOCK_UPDATE);
+                                            MDS_INODELOCK_UPDATE, LCK_PW,
+                                            true);
                        if (rc) {
                                mdt_object_put(info->mti_env, lnkp);
                                OBD_FREE_PTR(msl);
@@ -1968,9 +1861,8 @@ static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
                        GOTO(out, rc = -ENOMEM);
                }
 
-               mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
-               rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
-                                          MDS_INODELOCK_UPDATE, true);
+               rc = mdt_object_lock(info, slave, &msl->msl_lh,
+                                    MDS_INODELOCK_UPDATE, LCK_EX, true);
                if (rc) {
                        OBD_FREE_PTR(msl);
                        mdt_object_put(info->mti_env, slave);
@@ -2000,10 +1892,9 @@ static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
        int rc;
 
        if (mdt_object_remote(obj)) {
-               rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
-                                           &lh->mlh_rreg_lh, LCK_PW,
-                                           MDS_INODELOCK_UPDATE, false);
-               if (rc != ELDLM_OK)
+               rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
+                                    LCK_PW, true);
+               if (rc)
                        return rc;
 
                /*
@@ -2016,8 +1907,9 @@ static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
                                mdt_object_unlock(info, obj, lh, rc);
                }
        } else {
-               rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
-                                           einfo, true);
+               rc = mdt_object_stripes_lock(info, NULL, obj, lh, einfo,
+                                            MDS_INODELOCK_UPDATE, LCK_PW,
+                                            true);
        }
 
        return rc;
@@ -2042,10 +1934,9 @@ static int mdt_migrate_object_lock(struct mdt_thread_info *info,
                if (rc)
                        return rc;
 
-               rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
-                                           &lh->mlh_rreg_lh, LCK_EX,
-                                           MDS_INODELOCK_FULL, false);
-               if (rc != ELDLM_OK)
+               rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_FULL, LCK_EX,
+                                    true);
+               if (rc)
                        return rc;
 
                /*
@@ -2069,14 +1960,8 @@ static int mdt_migrate_object_lock(struct mdt_thread_info *info,
                        }
                }
        } else {
-               if (mdt_object_remote(pobj)) {
-                       rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
-                       if (rc)
-                               return rc;
-               }
-
-               rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
-                                           einfo, true);
+               rc = mdt_object_stripes_lock(info, pobj, obj, lh, einfo,
+                                            MDS_INODELOCK_FULL, LCK_EX, true);
        }
 
        return rc;
@@ -2258,7 +2143,7 @@ int mdt_reint_migrate(struct mdt_thread_info *info,
        struct mdt_object *spobj = NULL;
        struct mdt_object *sobj = NULL;
        struct mdt_object *tobj;
-       struct lustre_handle rename_lh = { 0 };
+       struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
        struct mdt_lock_handle *lhp;
        struct mdt_lock_handle *lhs;
        struct mdt_lock_handle *lht;
@@ -2306,7 +2191,7 @@ int mdt_reint_migrate(struct mdt_thread_info *info,
         * req is NULL if this is called by directory auto-split.
         */
        if (req && !req_is_replay(req)) {
-               rc = mdt_rename_lock(info, &rename_lh);
+               rc = mdt_rename_lock(info, rename_lh);
                if (rc != 0) {
                        CERROR("%s: can't lock FS for rename: rc = %d\n",
                               mdt_obd_name(info->mti_mdt), rc);
@@ -2342,7 +2227,6 @@ int mdt_reint_migrate(struct mdt_thread_info *info,
 lock_parent:
        /* lock parent object */
        lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_reg_init(lhp, LCK_PW);
        rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
                                     &parent_slave_locks);
        if (rc)
@@ -2424,7 +2308,6 @@ lock_parent:
 
        /* lock source */
        lhs = &info->mti_lh[MDT_LH_OLD];
-       mdt_lock_reg_init(lhs, LCK_EX);
        rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
                                     &child_slave_locks);
        if (rc)
@@ -2436,8 +2319,7 @@ lock_parent:
                GOTO(unlock_source, rc = PTR_ERR(tobj));
 
        lht = &info->mti_lh[MDT_LH_NEW];
-       mdt_lock_reg_init(lht, LCK_EX);
-       rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
+       rc = mdt_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, LCK_EX, true);
        if (rc)
                GOTO(put_target, rc);
 
@@ -2479,30 +2361,11 @@ unlock_parent:
 put_parent:
        mdt_object_put(env, pobj);
 unlock_rename:
-       if (lustre_handle_is_used(&rename_lh))
-               mdt_rename_unlock(&rename_lh);
+       mdt_rename_unlock(info, rename_lh);
 
        return rc;
 }
 
-static int mdt_object_lock_save(struct mdt_thread_info *info,
-                               struct mdt_object *dir,
-                               struct mdt_lock_handle *lh,
-                               int idx, bool cos_incompat)
-{
-       int rc;
-
-       /* we lock the target dir if it is local */
-       rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
-                                  cos_incompat);
-       if (rc != 0)
-               return rc;
-
-       /* get and save correct version after locking */
-       mdt_version_get_save(info, dir, idx);
-       return 0;
-}
-
 /*
  * determine lock order of sobj and tobj
  *
@@ -2615,43 +2478,18 @@ static int mdt_rename_source_lock(struct mdt_thread_info *info,
        if (rc < 0)
                return rc;
 
-       if (rc) {
-               /* enqueue remote LOOKUP lock from the parent MDT */
-               __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
-
-               if (mdt_object_remote(parent)) {
-                       rc = mdt_remote_object_lock(info, parent,
-                                                   mdt_object_fid(child),
-                                                   &lhr->mlh_rreg_lh,
-                                                   lhr->mlh_rreg_mode,
-                                                   rmt_ibits, false);
-                       if (rc != ELDLM_OK)
-                               return rc;
-               } else {
-                       LASSERT(mdt_object_remote(child));
-                       rc = mdt_object_local_lock(info, child, lhr,
-                                                  &rmt_ibits, 0, true);
-                       if (rc < 0)
-                               return rc;
-               }
+       if (rc == 1) {
+               rc = mdt_object_lookup_lock(info, parent, child, lhr, LCK_EX,
+                                           cos_incompat);
+               if (rc)
+                       return rc;
 
                ibits &= ~MDS_INODELOCK_LOOKUP;
        }
 
-       if (mdt_object_remote(child)) {
-               rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
-                                           &lhc->mlh_rreg_lh,
-                                           lhc->mlh_rreg_mode,
-                                           ibits, false);
-               if (rc == ELDLM_OK)
-                       rc = 0;
-       } else {
-               rc = mdt_reint_object_lock(info, child, lhc, ibits,
-                                          cos_incompat);
-       }
-
-       if (!rc)
-               mdt_object_unlock(info, child, lhr, rc);
+       rc = mdt_object_lock(info, child, lhc, ibits, LCK_EX, cos_incompat);
+       if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
+               mdt_object_unlock(info, NULL, lhr, rc);
 
        return rc;
 }
@@ -2662,30 +2500,35 @@ static int mdt_rename_source_lock(struct mdt_thread_info *info,
 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
                             struct mdt_object *mfirstdir,
                             struct mdt_lock_handle *lh_firstdirp,
+                            const struct lu_name *firstname,
                             struct mdt_object *mseconddir,
                             struct mdt_lock_handle *lh_seconddirp,
+                            const struct lu_name *secondname,
                             bool cos_incompat)
 {
        int rc;
 
-       rc = mdt_object_lock_save(info, mfirstdir, lh_firstdirp, 0,
-                                 cos_incompat);
+       rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW,
+                            cos_incompat);
        if (rc)
                return rc;
 
+       mdt_version_get_save(info, mfirstdir, 0);
        OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
 
        if (mfirstdir != mseconddir) {
-               rc = mdt_object_lock_save(info, mseconddir, lh_seconddirp, 1,
-                                         cos_incompat);
-       } else if (!mdt_object_remote(mseconddir) &&
-                  lh_firstdirp->mlh_pdo_hash !=
-                  lh_seconddirp->mlh_pdo_hash) {
-               rc = mdt_pdir_hash_lock(info, lh_seconddirp, mseconddir,
-                                       MDS_INODELOCK_UPDATE,
-                                       cos_incompat);
-               OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
+               rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
+                                    secondname, LCK_PW, cos_incompat);
+       } else if (!mdt_object_remote(mseconddir)) {
+               if (lh_firstdirp->mlh_pdo_hash !=
+                   lh_seconddirp->mlh_pdo_hash) {
+                       rc = mdt_object_pdo_lock(info, mseconddir,
+                                                lh_seconddirp, secondname,
+                                                LCK_PW, false, cos_incompat);
+                       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
+               }
        }
+       mdt_version_get_save(info, mseconddir, 1);
 
        if (rc != 0)
                mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
@@ -2709,7 +2552,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
        struct mdt_object *mtgtdir = NULL;
        struct mdt_object *mold;
        struct mdt_object *mnew = NULL;
-       struct lustre_handle rename_lh = { 0 };
+       struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
        struct mdt_lock_handle *lh_srcdirp;
        struct mdt_lock_handle *lh_tgtdirp;
        struct mdt_lock_handle *lh_oldp = NULL;
@@ -2718,7 +2561,6 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
        struct lu_fid *old_fid = &info->mti_tmp_fid1;
        struct lu_fid *new_fid = &info->mti_tmp_fid2;
        struct lu_ucred *uc = mdt_ucred(info);
-       __u64 lock_ibits;
        bool reverse = false, discard = false;
        bool cos_incompat;
        ktime_t kstart = ktime_get();
@@ -2797,7 +2639,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                     !mdt->mdt_enable_parallel_rename_dir) ||
                    (!S_ISDIR(ma->ma_attr.la_mode) &&
                     !mdt->mdt_enable_parallel_rename_file)) {
-                       rc = mdt_rename_lock(info, &rename_lh);
+                       rc = mdt_rename_lock(info, rename_lh);
                        if (rc != 0) {
                                CERROR("%s: cannot lock for rename: rc = %d\n",
                                       mdt_obd_name(mdt), rc);
@@ -2851,11 +2693,13 @@ relock:
                reverse = 0;
 
        if (reverse)
-               rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp, msrcdir,
-                                      lh_srcdirp, cos_incompat);
+               rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
+                                      &rr->rr_tgt_name, msrcdir, lh_srcdirp,
+                                      &rr->rr_name, cos_incompat);
        else
-               rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, mtgtdir,
-                                      lh_tgtdirp, cos_incompat);
+               rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
+                                      mtgtdir, lh_tgtdirp, &rr->rr_tgt_name,
+                                      cos_incompat);
 
        if (rc != 0)
                GOTO(out_unlock_rename, rc);
@@ -2965,12 +2809,10 @@ relock:
                        GOTO(out_put_new, rc = -EISDIR);
 
                lh_oldp = &info->mti_lh[MDT_LH_OLD];
-               lh_rmt = &info->mti_lh[MDT_LH_RMT];
-               mdt_lock_reg_init(lh_oldp, LCK_EX);
-               mdt_lock_reg_init(lh_rmt, LCK_EX);
-               lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
+               lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
                rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
-                                           lh_rmt, lock_ibits, cos_incompat);
+                                           lh_rmt, MDS_INODELOCK_LOOKUP |
+                                           MDS_INODELOCK_XATTR, cos_incompat);
                if (rc < 0)
                        GOTO(out_put_new, rc);
 
@@ -2994,21 +2836,9 @@ relock:
                 */
 
                lh_newp = &info->mti_lh[MDT_LH_NEW];
-               mdt_lock_reg_init(lh_newp, LCK_EX);
-               lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
-               if (mdt_object_remote(mtgtdir)) {
-                       rc = mdt_remote_object_lock(info, mtgtdir,
-                                                   mdt_object_fid(mnew),
-                                                   &lh_newp->mlh_rreg_lh,
-                                                   lh_newp->mlh_rreg_mode,
-                                                   MDS_INODELOCK_LOOKUP,
-                                                   false);
-                       if (rc != ELDLM_OK)
-                               GOTO(out_unlock_old, rc);
-
-                       lock_ibits &= ~MDS_INODELOCK_LOOKUP;
-               }
-               rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
+               rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
+                                          MDS_INODELOCK_LOOKUP |
+                                          MDS_INODELOCK_UPDATE, LCK_EX,
                                           cos_incompat);
                if (rc != 0)
                        GOTO(out_unlock_new, rc);
@@ -3019,12 +2849,10 @@ relock:
                GOTO(out_put_old, rc);
        } else {
                lh_oldp = &info->mti_lh[MDT_LH_OLD];
-               lh_rmt = &info->mti_lh[MDT_LH_RMT];
-               mdt_lock_reg_init(lh_oldp, LCK_EX);
-               mdt_lock_reg_init(lh_rmt, LCK_EX);
-               lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
+               lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
                rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
-                                           lh_rmt, lock_ibits, cos_incompat);
+                                           lh_rmt, MDS_INODELOCK_LOOKUP |
+                                           MDS_INODELOCK_XATTR, cos_incompat);
                if (rc != 0)
                        GOTO(out_put_old, rc);
 
@@ -3075,8 +2903,7 @@ out_unlock_parents:
        mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
        mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
 out_unlock_rename:
-       if (lustre_handle_is_used(&rename_lh))
-               mdt_rename_unlock(&rename_lh);
+       mdt_rename_unlock(info, rename_lh);
 out_put_tgtdir:
        mdt_object_put(info->mti_env, mtgtdir);
 out_put_srcdir: