Whamcloud - gitweb
LU-15529 mdt: optimize dir migration locking
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 17b5e6b..7b85c00 100644 (file)
@@ -325,7 +325,7 @@ int mdt_object_stripes_lock(struct mdt_thread_info *info,
 
        ENTRY;
        /* according to the protocol, child should be local, is request sent to
-        * wrong MDT.
+        * wrong MDT?
         */
        if (mdt_object_remote(child)) {
                CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
@@ -346,17 +346,23 @@ int mdt_object_stripes_lock(struct mdt_thread_info *info,
        if (rc)
                RETURN(rc);
 
-       if (S_ISDIR(child->mot_header.loh_attr)) {
-               rc = mdt_stripes_lock(info, child, mode, ibits, einfo);
-               if (rc) {
-                       mdt_object_unlock(info, child, lh, rc);
-                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
-                           rc == -EIO)
-                               rc = 0;
-               }
-       }
+       rc = mdt_object_striped(info, child);
+       if (rc == 0)
+               return 0;
 
-       RETURN(rc);
+       if (rc < 0)
+               goto unlock;
+
+       /* lock stripes for striped directory */
+       rc = mdt_stripes_lock(info, child, lh->mlh_reg_mode, ibits, einfo);
+       if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
+               rc = 0;
+
+unlock:
+       if (rc)
+               mdt_object_unlock(info, child, lh, rc);
+
+       return rc;
 }
 
 void mdt_object_stripes_unlock(struct mdt_thread_info *info,
@@ -364,8 +370,6 @@ void mdt_object_stripes_unlock(struct mdt_thread_info *info,
                              struct mdt_lock_handle *lh,
                              struct ldlm_enqueue_info *einfo, int decref)
 {
-       /* this is checked in mdt_object_stripes_lock() */
-       LASSERT(!mdt_object_remote(obj));
        if (einfo->ei_cbdata)
                mdt_stripes_unlock(info, obj, einfo, decref);
        mdt_object_unlock(info, obj, lh, decref);
@@ -380,7 +384,7 @@ static int mdt_restripe(struct mdt_thread_info *info,
 {
        struct mdt_device *mdt = info->mti_mdt;
        struct lu_fid *fid = &info->mti_tmp_fid2;
-       struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+       struct ldlm_enqueue_info *einfo = &info->mti_einfo;
        struct lmv_user_md *lum = spec->u.sp_ea.eadata;
        struct lu_ucred *uc = mdt_ucred(info);
        struct lmv_mds_md_v1 *lmv;
@@ -680,7 +684,7 @@ static int mdt_create(struct mdt_thread_info *info)
         */
        if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
                struct mdt_lock_handle *lhc;
-               struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+               struct ldlm_enqueue_info *einfo = &info->mti_einfo;
                bool cos_incompat;
 
                rc = mdt_object_striped(info, child);
@@ -730,7 +734,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        int do_vbr = ma->ma_attr.la_valid &
                        (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
        __u64 lockpart = MDS_INODELOCK_UPDATE;
-       struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+       struct ldlm_enqueue_info *einfo = &info->mti_einfo;
        bool cos_incompat;
        int rc;
 
@@ -1114,7 +1118,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        struct mdt_object *mc;
        struct mdt_lock_handle *parent_lh;
        struct mdt_lock_handle *child_lh;
-       struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+       struct ldlm_enqueue_info *einfo = &info->mti_einfo;
        struct lu_ucred *uc  = mdt_ucred(info);
        bool cos_incompat = false;
        int no_name = 0;
@@ -1546,90 +1550,276 @@ out_put:
 }
 
 /*
- * in case obj is remote obj on its parent, revoke LOOKUP lock,
- * herein we don't really check it, just do revoke.
+ * lock rename source object.
+ *
+ * Both source and its parent object may be located on remote MDTs, and even on
+ * different MDTs, which means source object is a remote object on parent.
+ *
+ * \retval     0 on success
+ * \retval     -ev negative errno upon error
  */
-int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
-                                 struct mdt_object *pobj,
-                                 struct mdt_object *obj)
+static int mdt_rename_source_lock(struct mdt_thread_info *info,
+                                 struct mdt_object *parent,
+                                 struct mdt_object *child,
+                                 struct mdt_lock_handle *lh,
+                                 struct mdt_lock_handle *lh_lookup,
+                                 __u64 ibits, bool cos_incompat)
 {
-       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
        int rc;
 
-       rc = mdt_object_lookup_lock(info, pobj, obj, lh, LCK_EX, true);
-       if (rc)
+       LASSERT(ibits & MDS_INODELOCK_LOOKUP);
+       /* if @obj is remote object, LOOKUP lock needs to be taken from
+        * parent MDT.
+        */
+       rc = mdt_is_remote_object(info, parent, child);
+       if (rc < 0)
                return rc;
 
-       /*
-        * TODO, currently we don't save this lock because there is no place to
-        * hold this lock handle, but to avoid race we need to save this lock.
-        */
-       mdt_object_unlock(info, NULL, lh, 1);
+       if (rc == 1) {
+               rc = mdt_object_lookup_lock(info, parent, child, lh_lookup,
+                                           LCK_EX, cos_incompat);
+               if (rc)
+                       return rc;
+
+               ibits &= ~MDS_INODELOCK_LOOKUP;
+       }
+
+       rc = mdt_object_lock(info, child, lh, ibits, LCK_EX, cos_incompat);
+       if (unlikely(rc && !(ibits & MDS_INODELOCK_LOOKUP)))
+               mdt_object_unlock(info, NULL, lh_lookup, rc);
 
        return 0;
 }
 
-/*
- * operation may takes locks of linkea, or directory stripes, group them in
- * different list.
- */
-struct mdt_sub_lock {
-       struct mdt_object *msl_obj;
-       struct mdt_lock_handle msl_lh;
-       struct list_head msl_linkage;
+static void mdt_rename_source_unlock(struct mdt_thread_info *info,
+                                    struct mdt_object *obj,
+                                    struct mdt_lock_handle *lh,
+                                    struct mdt_lock_handle *lh_lookup,
+                                    int decref)
+{
+       mdt_object_unlock(info, obj, lh, decref);
+       mdt_object_unlock(info, NULL, lh_lookup, decref);
+}
+
+/* migration takes UPDATE lock of link parent, and LOOKUP lock of link */
+struct mdt_link_lock {
+       struct mdt_object *mll_obj;
+       struct mdt_lock_handle mll_lh;
+       struct list_head mll_linkage;
 };
 
-static void mdt_unlock_list(struct mdt_thread_info *info,
-                           struct list_head *list, int decref)
+static inline int mdt_migrate_link_lock_add(struct mdt_thread_info *info,
+                                           struct mdt_object *o,
+                                           struct mdt_lock_handle *lh,
+                                           struct list_head *list)
 {
-       struct mdt_sub_lock *msl;
-       struct mdt_sub_lock *tmp;
+       struct mdt_link_lock *mll;
 
-       list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
-               mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
-               list_del(&msl->msl_linkage);
-               OBD_FREE_PTR(msl);
-       }
+       OBD_ALLOC_PTR(mll);
+       if (mll == NULL)
+               return -ENOMEM;
+
+       INIT_LIST_HEAD(&mll->mll_linkage);
+       mdt_object_get(info->mti_env, o);
+       mll->mll_obj = o;
+       mll->mll_lh = *lh;
+       memset(lh, 0, sizeof(*lh));
+       list_add_tail(&mll->mll_linkage, list);
+
+       return 0;
 }
 
-static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
-                                            struct mdt_object *obj,
-                                            struct mdt_lock_handle *lh,
-                                            struct ldlm_enqueue_info *einfo,
-                                            struct list_head *slave_locks,
+static inline void mdt_migrate_link_lock_del(struct mdt_thread_info *info,
+                                            struct mdt_link_lock *mll,
                                             int decref)
 {
-       if (mdt_object_remote(obj)) {
-               mdt_unlock_list(info, slave_locks, decref);
-               mdt_object_unlock(info, obj, lh, decref);
-       } else {
-               mdt_object_stripes_unlock(info, obj, lh, einfo, decref);
+       mdt_object_unlock(info, mll->mll_obj, &mll->mll_lh, decref);
+       mdt_object_put(info->mti_env, mll->mll_obj);
+       list_del(&mll->mll_linkage);
+       OBD_FREE_PTR(mll);
+}
+
+static void mdt_migrate_links_unlock(struct mdt_thread_info *info,
+                                    struct list_head *list, int decref)
+{
+       struct mdt_link_lock *mll;
+       struct mdt_link_lock *tmp;
+
+       list_for_each_entry_safe(mll, tmp, list, mll_linkage)
+               mdt_migrate_link_lock_del(info, mll, decref);
+}
+
+/* take link parent UPDATE lock.
+ * \retval     0 \a lnkp is already locked, no lock taken.
+ *             1 lock taken
+ *             -ev negative errno.
+ */
+static int mdt_migrate_link_parent_lock(struct mdt_thread_info *info,
+                                       struct mdt_object *lnkp,
+                                       struct list_head *update_locks,
+                                       bool *blocked)
+{
+       const struct lu_fid *fid = mdt_object_fid(lnkp);
+       struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
+       struct mdt_link_lock *entry;
+       __u64 ibits = 0;
+       int rc;
+
+       ENTRY;
+
+       /* check if it's already locked */
+       list_for_each_entry(entry, update_locks, mll_linkage) {
+               if (lu_fid_eq(mdt_object_fid(entry->mll_obj), fid)) {
+                       CDEBUG(D_INFO, "skip "DFID" lock\n", PFID(fid));
+                       RETURN(0);
+               }
+       }
+
+       /* link parent UPDATE lock */
+       CDEBUG(D_INFO, "lock "DFID"\n", PFID(fid));
+
+       if (*blocked) {
+               /* revoke lock instead of take in *blocked* mode */
+               rc = mdt_object_lock(info, lnkp, lhl, MDS_INODELOCK_UPDATE,
+                                    LCK_PW, true);
+               if (rc)
+                       RETURN(rc);
+
+               if (mdt_object_remote(lnkp)) {
+                       struct ldlm_lock *lock;
+
+                       /*
+                        * for remote object, set lock cb_atomic, so lock can be
+                        * released in blocking_ast() immediately, then the next
+                        * lock_try will have better chance of success.
+                        */
+                       lock = ldlm_handle2lock(&lhl->mlh_rreg_lh);
+                       LASSERT(lock != NULL);
+                       lock_res_and_lock(lock);
+                       ldlm_set_atomic_cb(lock);
+                       unlock_res_and_lock(lock);
+                       LDLM_LOCK_PUT(lock);
+               }
+
+               mdt_object_unlock(info, lnkp, lhl, 1);
+               RETURN(0);
+       }
+
+       /*
+        * we can't follow parent-child lock order like other MD
+        * operations, use lock_try here to avoid deadlock, if the lock
+        * cannot be taken, drop all locks taken, revoke the blocked
+        * one, and continue processing the remaining entries, and in
+        * the end of the loop restart from beginning.
+        *
+        * don't lock with PDO mode in case two links are under the same
+        * parent and their hash values are different.
+        */
+       rc = mdt_object_lock_try(info, lnkp, lhl, &ibits, MDS_INODELOCK_UPDATE,
+                                LCK_PW, true);
+       if (rc < 0)
+               RETURN(rc);
+
+       if (!(ibits & MDS_INODELOCK_UPDATE)) {
+               CDEBUG(D_INFO, "busy lock on "DFID"\n", PFID(fid));
+               *blocked = true;
+               RETURN(-EAGAIN);
+       }
+
+       rc = mdt_migrate_link_lock_add(info, lnkp, lhl, update_locks);
+       if (rc) {
+               mdt_object_unlock(info, lnkp, lhl, 1);
+               RETURN(rc);
+       }
+
+       RETURN(1);
+}
+
+/* take link LOOKUP lock.
+ * \retval     0 \a lnkp is already locked, no lock taken.
+ *             1 lock taken.
+ *             -ev negative errno.
+ */
+static int mdt_migrate_link_lock(struct mdt_thread_info *info,
+                                struct mdt_object *lnkp,
+                                struct mdt_object *spobj,
+                                struct mdt_object *obj,
+                                struct list_head *lookup_locks)
+{
+       const struct lu_fid *fid = mdt_object_fid(lnkp);
+       struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
+       struct mdt_link_lock *entry;
+       int rc;
+
+       ENTRY;
+
+       /* check if it's already locked by source */
+       rc = mdt_fids_different_target(info, fid, mdt_object_fid(spobj));
+       if (rc <= 0) {
+               CDEBUG(D_INFO, "skip lookup lock on source parent "DFID"\n",
+                      PFID(fid));
+               RETURN(rc);
+       }
+
+       /* check if it's already locked by other links */
+       list_for_each_entry(entry, lookup_locks, mll_linkage) {
+               rc = mdt_fids_different_target(info, fid,
+                                              mdt_object_fid(entry->mll_obj));
+               if (rc <= 0) {
+                       CDEBUG(D_INFO, "skip lookup lock on parent "DFID"\n",
+                              PFID(fid));
+                       RETURN(rc);
+               }
        }
+
+       rc = mdt_object_lookup_lock(info, lnkp, obj, lhl, LCK_EX, true);
+       if (rc)
+               RETURN(rc);
+
+       /* don't take local LOOKUP lock, because later we will lock other ibits
+        * of sobj (which is on local MDT), and lock the same object twice may
+        * deadlock, just revoke this lock.
+        */
+       if (!mdt_object_remote(lnkp))
+               GOTO(unlock, rc = 0);
+
+       rc = mdt_migrate_link_lock_add(info, lnkp, lhl, lookup_locks);
+       if (rc)
+               GOTO(unlock, rc);
+
+       RETURN(1);
+unlock:
+       mdt_object_unlock(info, lnkp, lhl, 1);
+       return rc;
 }
 
 /*
- * lock parents of links, and also check whether total locks don't exceed
- * RS_MAX_LOCKS.
+ * take UPDATE lock of link parents and LOOKUP lock of links, also check whether
+ * total local lock count exceeds RS_MAX_LOCKS.
  *
  * \retval     0 on success, and locks can be saved in ptlrpc_reply_stat
  * \retval     1 on success, but total lock count may exceed RS_MAX_LOCKS
  * \retval     -ev negative errno upon error
  */
-static int mdt_link_parents_lock(struct mdt_thread_info *info,
-                                struct mdt_object *pobj,
-                                const struct md_attr *ma,
-                                struct mdt_object *obj,
-                                struct mdt_lock_handle *lhp,
-                                struct ldlm_enqueue_info *peinfo,
-                                struct list_head *parent_slave_locks,
-                                struct list_head *link_locks)
+static int mdt_migrate_links_lock(struct mdt_thread_info *info,
+                                 struct mdt_object *spobj,
+                                 struct mdt_object *tpobj,
+                                 struct mdt_object *obj,
+                                 struct mdt_lock_handle *lhsp,
+                                 struct mdt_lock_handle *lhtp,
+                                 struct list_head *link_locks)
 {
        struct mdt_device *mdt = info->mti_mdt;
        struct lu_buf *buf = &info->mti_big_buf;
        struct lu_name *lname = &info->mti_name;
        struct linkea_data ldata = { NULL };
+       int local_lock_cnt = 0;
        bool blocked = false;
-       int local_lnkp_cnt = 0;
+       bool saved;
+       struct mdt_object *lnkp;
+       struct lu_fid fid;
+       LIST_HEAD(update_locks);
+       LIST_HEAD(lookup_locks);
        int rc;
 
        ENTRY;
@@ -1650,73 +1840,25 @@ static int mdt_link_parents_lock(struct mdt_thread_info *info,
 
        for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
             linkea_next_entry(&ldata)) {
-               struct mdt_object *lnkp;
-               struct mdt_sub_lock *msl;
-               struct lu_fid fid;
-               __u64 ibits;
-
                linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
                                    &fid);
 
-               /* check if it's also linked to parent */
-               if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
-                       CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
+               /* check if link parent is source parent too */
+               if (lu_fid_eq(mdt_object_fid(spobj), &fid)) {
+                       CDEBUG(D_INFO,
+                              "skip lock on source parent "DFID"/"DNAME"\n",
                               PFID(&fid), PNAME(lname));
-                       /* in case link is remote object, revoke LOOKUP lock */
-                       rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
                        continue;
                }
 
-               lnkp = NULL;
-
-               /* check if it's linked to a stripe of parent */
-               if (ma->ma_valid & MA_LMV) {
-                       struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
-                       struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
-                       int j = 0;
-
-                       for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
-                               fid_le_to_cpu(stripe_fid,
-                                             &lmv->lmv_stripe_fids[j]);
-                               if (lu_fid_eq(stripe_fid, &fid)) {
-                                       CDEBUG(D_INFO, "skip stripe "DFID
-                                              ", reovke "DNAME"\n",
-                                              PFID(&fid), PNAME(lname));
-                                       lnkp = mdt_object_find(info->mti_env,
-                                                              mdt, &fid);
-                                       if (IS_ERR(lnkp))
-                                               GOTO(out, rc = PTR_ERR(lnkp));
-                                       break;
-                               }
-                       }
-
-                       if (lnkp) {
-                               rc = mdt_revoke_remote_lookup_lock(info, lnkp,
-                                                                  obj);
-                               mdt_object_put(info->mti_env, lnkp);
-                               continue;
-                       }
-               }
-
-               /* Check if it's already locked */
-               list_for_each_entry(msl, link_locks, msl_linkage) {
-                       if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
-                               CDEBUG(D_INFO,
-                                      DFID" was locked, revoke "DNAME"\n",
-                                      PFID(&fid), PNAME(lname));
-                               lnkp = msl->msl_obj;
-                               break;
-                       }
-               }
-
-               if (lnkp) {
-                       rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
+               /* check if link parent is target parent too */
+               if (tpobj != spobj && lu_fid_eq(mdt_object_fid(tpobj), &fid)) {
+                       CDEBUG(D_INFO,
+                              "skip lock on target parent "DFID"/"DNAME"\n",
+                              PFID(&fid), PNAME(lname));
                        continue;
                }
 
-               CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
-                      PFID(&fid), PNAME(lname));
-
                lnkp = mdt_object_find(info->mti_env, mdt, &fid);
                if (IS_ERR(lnkp)) {
                        CWARN("%s: cannot find obj "DFID": %ld\n",
@@ -1726,80 +1868,39 @@ static int mdt_link_parents_lock(struct mdt_thread_info *info,
 
                if (!mdt_object_exists(lnkp)) {
                        CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
-                             PFID(&fid), PNAME(lname));
+                              PFID(&fid), PNAME(lname));
                        mdt_object_put(info->mti_env, lnkp);
                        continue;
                }
-
-               if (!mdt_object_remote(lnkp))
-                       local_lnkp_cnt++;
-
-               OBD_ALLOC_PTR(msl);
-               if (msl == NULL)
-                       GOTO(out, rc = -ENOMEM);
-
-               /*
-                * we can't follow parent-child lock order like other MD
-                * operations, use lock_try here to avoid deadlock, if the lock
-                * cannot be taken, drop all locks taken, revoke the blocked
-                * one, and continue processing the remaining entries, and in
-                * the end of the loop restart from beginning.
-                */
-               ibits = 0;
-               rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
-                                        MDS_INODELOCK_UPDATE, LCK_PW, true);
-               if (!(ibits & MDS_INODELOCK_UPDATE)) {
-
-                       CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
-                              PFID(&fid), PNAME(lname));
-
-                       mdt_unlock_list(info, link_locks, 1);
-                       /* also unlock parent locks to avoid deadlock */
-                       if (!blocked)
-                               mdt_migrate_object_unlock(info, pobj, lhp,
-                                                         peinfo,
-                                                         parent_slave_locks,
-                                                         1);
-
-                       blocked = true;
-
-                       rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
-                                            MDS_INODELOCK_UPDATE, LCK_PW,
-                                            true);
-                       if (rc) {
-                               mdt_object_put(info->mti_env, lnkp);
-                               OBD_FREE_PTR(msl);
-                               GOTO(out, rc);
-                       }
-
-                       if (mdt_object_remote(lnkp)) {
-                               struct ldlm_lock *lock;
-
-                               /*
-                                * for remote object, set lock cb_atomic,
-                                * so lock can be released in blocking_ast()
-                                * immediately, then the next lock_try will
-                                * have better chance of success.
-                                */
-                               lock = ldlm_handle2lock(
-                                               &msl->msl_lh.mlh_rreg_lh);
-                               LASSERT(lock != NULL);
-                               lock_res_and_lock(lock);
-                               ldlm_set_atomic_cb(lock);
-                               unlock_res_and_lock(lock);
-                               LDLM_LOCK_PUT(lock);
-                       }
-
-                       mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
-                       OBD_FREE_PTR(msl);
-                       continue;
+relock:
+               saved = blocked;
+               rc = mdt_migrate_link_parent_lock(info, lnkp, &update_locks,
+                                                 &blocked);
+               if (!saved && blocked) {
+                       /* unlock all locks taken to avoid deadlock */
+                       mdt_migrate_links_unlock(info, &update_locks, 1);
+                       mdt_object_unlock(info, spobj, lhsp, 1);
+                       if (tpobj != spobj)
+                               mdt_object_unlock(info, tpobj, lhtp, 1);
+                       goto relock;
+               }
+               if (rc < 0) {
+                       mdt_object_put(info->mti_env, lnkp);
+                       GOTO(out, rc);
                }
 
-               INIT_LIST_HEAD(&msl->msl_linkage);
-               msl->msl_obj = lnkp;
-               list_add_tail(&msl->msl_linkage, link_locks);
+               if (rc == 1 && !mdt_object_remote(lnkp))
+                       local_lock_cnt++;
 
-               rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
+               rc = mdt_migrate_link_lock(info, lnkp, spobj, obj,
+                                          &lookup_locks);
+               if (rc < 0) {
+                       mdt_object_put(info->mti_env, lnkp);
+                       GOTO(out, rc);
+               }
+               if (rc == 1 && !mdt_object_remote(lnkp))
+                       local_lock_cnt++;
+               mdt_object_put(info->mti_env, lnkp);
        }
 
        if (blocked)
@@ -1807,11 +1908,11 @@ static int mdt_link_parents_lock(struct mdt_thread_info *info,
 
        EXIT;
 out:
-       if (rc) {
-               mdt_unlock_list(info, link_locks, rc);
-       } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
-               CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
-                      local_lnkp_cnt);
+       list_splice(&update_locks, link_locks);
+       list_splice(&lookup_locks, link_locks);
+       if (rc < 0) {
+               mdt_migrate_links_unlock(info, link_locks, rc);
+       } else if (local_lock_cnt > RS_MAX_LOCKS - 5) {
                /*
                 * parent may have 3 local objects: master object and 2 stripes
                 * (if it's being migrated too); source may have 1 local objects
@@ -1819,195 +1920,71 @@ out:
                 * Note, source may have 2 local locks if it is directory but it
                 * can't have hardlinks, so it is not considered here.
                 */
+               CDEBUG(D_INFO, "Too many local locks (%d), migrate in sync mode\n",
+                      local_lock_cnt);
                rc = 1;
        }
        return rc;
 }
 
-static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
-                                 struct mdt_object *obj,
-                                 const struct md_attr *ma,
-                                 struct list_head *slave_locks)
-{
-       struct mdt_device *mdt = info->mti_mdt;
-       const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
-       struct lu_fid *fid = &info->mti_tmp_fid1;
-       struct mdt_object *slave;
-       struct mdt_sub_lock *msl;
-       int i;
-       int rc;
-
-       ENTRY;
-       LASSERT(mdt_object_remote(obj));
-       LASSERT(ma->ma_valid & MA_LMV);
-       LASSERT(lmv);
-
-       if (!lmv_is_sane(lmv))
-               RETURN(-EINVAL);
-
-       for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
-               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
-
-               if (!fid_is_sane(fid))
-                       continue;
-
-               slave = mdt_object_find(info->mti_env, mdt, fid);
-               if (IS_ERR(slave))
-                       GOTO(out, rc = PTR_ERR(slave));
-
-               OBD_ALLOC_PTR(msl);
-               if (!msl) {
-                       mdt_object_put(info->mti_env, slave);
-                       GOTO(out, rc = -ENOMEM);
-               }
-
-               rc = mdt_object_lock(info, slave, &msl->msl_lh,
-                                    MDS_INODELOCK_UPDATE, LCK_EX, true);
-               if (rc) {
-                       OBD_FREE_PTR(msl);
-                       mdt_object_put(info->mti_env, slave);
-                       GOTO(out, rc);
-               }
-
-               INIT_LIST_HEAD(&msl->msl_linkage);
-               msl->msl_obj = slave;
-               list_add_tail(&msl->msl_linkage, slave_locks);
-       }
-       EXIT;
-
-out:
-       if (rc)
-               mdt_unlock_list(info, slave_locks, rc);
-       return rc;
-}
-
-/* lock parent and its stripes */
-static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
-                                  struct mdt_object *obj,
-                                  const struct md_attr *ma,
-                                  struct mdt_lock_handle *lh,
-                                  struct ldlm_enqueue_info *einfo,
-                                  struct list_head *slave_locks)
-{
-       int rc;
-
-       if (mdt_object_remote(obj)) {
-               rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
-                                    LCK_PW, true);
-               if (rc)
-                       return rc;
-
-               /*
-                * if obj is remote and striped, lock its stripes explicitly
-                * because it's not striped in LOD layer on this MDT.
-                */
-               if (ma->ma_valid & MA_LMV) {
-                       rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
-                       if (rc)
-                               mdt_object_unlock(info, obj, lh, rc);
-               }
-       } else {
-               rc = mdt_object_stripes_lock(info, NULL, obj, lh, einfo,
-                                            MDS_INODELOCK_UPDATE, LCK_PW,
-                                            true);
-       }
-
-       return rc;
-}
-
-/*
- * in migration, object may be remote, and we need take full lock of it and its
- * stripes if it's directory, besides, object may be a remote object on its
- * parent, revoke its LOOKUP lock on where its parent is located.
- */
-static int mdt_migrate_object_lock(struct mdt_thread_info *info,
-                                  struct mdt_object *pobj,
-                                  struct mdt_object *obj,
-                                  struct mdt_lock_handle *lh,
-                                  struct ldlm_enqueue_info *einfo,
-                                  struct list_head *slave_locks)
-{
-       int rc;
-
-       if (mdt_object_remote(obj)) {
-               rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
-               if (rc)
-                       return rc;
-
-               rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_FULL, LCK_EX,
-                                    true);
-               if (rc)
-                       return rc;
-
-               /*
-                * if obj is remote and striped, lock its stripes explicitly
-                * because it's not striped in LOD layer on this MDT.
-                */
-               if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
-                       struct md_attr *ma = &info->mti_attr;
-
-                       rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
-                       if (rc) {
-                               mdt_object_unlock(info, obj, lh, rc);
-                               return rc;
-                       }
-
-                       if (ma->ma_valid & MA_LMV) {
-                               rc = mdt_lock_remote_slaves(info, obj, ma,
-                                                           slave_locks);
-                               if (rc)
-                                       mdt_object_unlock(info, obj, lh, rc);
-                       }
-               }
-       } else {
-               rc = mdt_object_stripes_lock(info, pobj, obj, lh, einfo,
-                                            MDS_INODELOCK_FULL, LCK_EX, true);
-       }
-
-       return rc;
-}
-
 /*
  * lookup source by name, if parent is striped directory, we need to find the
  * corresponding stripe where source is located, and then lookup there.
  *
  * besides, if parent is migrating too, and file is already in target stripe,
  * this should be a redo of 'lfs migrate' on client side.
+ *
+ * \retval 1 tpobj stripe index is less than spobj stripe index
+ * \retval 0 tpobj stripe index is larger than or equal to spobj stripe index
+ * \retval -ev negative errno upon error
  */
 static int mdt_migrate_lookup(struct mdt_thread_info *info,
                              struct mdt_object *pobj,
                              const struct md_attr *ma,
                              const struct lu_name *lname,
                              struct mdt_object **spobj,
+                             struct mdt_object **tpobj,
                              struct mdt_object **sobj)
 {
        const struct lu_env *env = info->mti_env;
        struct lu_fid *fid = &info->mti_tmp_fid1;
-       struct mdt_object *stripe;
+       int spindex = -1;
+       int tpindex = -1;
        int rc;
 
        if (ma->ma_valid & MA_LMV) {
                /* if parent is striped, lookup on corresponding stripe */
                struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+               struct lu_fid *fid2 = &info->mti_tmp_fid2;
 
                if (!lmv_is_sane(lmv))
                        return -EBADF;
 
-               rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
-                                                 lname->ln_namelen);
-               if (rc < 0)
-                       return rc;
+               spindex = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
+                                                      lname->ln_namelen);
+               if (spindex < 0)
+                       return spindex;
+
+               fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[spindex]);
 
-               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
+               *spobj = mdt_object_find(env, info->mti_mdt, fid2);
+               if (IS_ERR(*spobj)) {
+                       rc = PTR_ERR(*spobj);
+                       *spobj = NULL;
+                       return rc;
+               }
 
-               stripe = mdt_object_find(env, info->mti_mdt, fid);
-               if (IS_ERR(stripe))
-                       return PTR_ERR(stripe);
+               if (!mdt_object_exists(*spobj))
+                       GOTO(spobj_put, rc = -ENOENT);
 
                fid_zero(fid);
-               rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
+               rc = mdo_lookup(env, mdt_object_child(*spobj), lname, fid,
                                &info->mti_spec);
-               if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
+               if ((rc == -ENOENT || rc == 0) && lmv_is_layout_changing(lmv)) {
+                       /* fail check here to let top dir migration succeed. */
+                       if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+                               GOTO(spobj_put, rc = -EIO);
+
                        /*
                         * if parent layout is changeing, and lookup child
                         * failed on source stripe, lookup again on target
@@ -2015,27 +1992,35 @@ static int mdt_migrate_lookup(struct mdt_thread_info *info,
                         * was interrupted, and current file was migrated
                         * already.
                         */
-                       mdt_object_put(env, stripe);
-
-                       rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
-                                                     lname->ln_namelen);
-                       if (rc < 0)
-                               return rc;
-
-                       fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
+                       tpindex = lmv_name_to_stripe_index(lmv, lname->ln_name,
+                                                          lname->ln_namelen);
+                       if (tpindex < 0)
+                               GOTO(spobj_put, rc = tpindex);
+
+                       fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[tpindex]);
+
+                       *tpobj = mdt_object_find(env, info->mti_mdt, fid2);
+                       if (IS_ERR(*tpobj)) {
+                               rc = PTR_ERR(*tpobj);
+                               *tpobj = NULL;
+                               GOTO(spobj_put, rc);
+                       }
 
-                       stripe = mdt_object_find(env, info->mti_mdt, fid);
-                       if (IS_ERR(stripe))
-                               return PTR_ERR(stripe);
+                       if (!mdt_object_exists(*tpobj))
+                               GOTO(tpobj_put, rc = -ENOENT);
 
-                       fid_zero(fid);
-                       rc = mdo_lookup(env, mdt_object_child(stripe), lname,
-                                       fid, &info->mti_spec);
-                       mdt_object_put(env, stripe);
-                       return rc ?: -EALREADY;
+                       if (rc == -ENOENT) {
+                               fid_zero(fid);
+                               rc = mdo_lookup(env, mdt_object_child(*tpobj),
+                                               lname, fid, &info->mti_spec);
+                               GOTO(tpobj_put, rc = rc ?: -EALREADY);
+                       }
                } else if (rc) {
-                       mdt_object_put(env, stripe);
-                       return rc;
+                       GOTO(spobj_put, rc);
+               } else {
+                       *tpobj = *spobj;
+                       tpindex = spindex;
+                       mdt_object_get(env, *tpobj);
                }
        } else {
                fid_zero(fid);
@@ -2044,20 +2029,34 @@ static int mdt_migrate_lookup(struct mdt_thread_info *info,
                if (rc)
                        return rc;
 
-               stripe = pobj;
-               mdt_object_get(env, stripe);
+               *spobj = pobj;
+               *tpobj = pobj;
+               mdt_object_get(env, pobj);
+               mdt_object_get(env, pobj);
        }
 
-       *spobj = stripe;
-
        *sobj = mdt_object_find(env, info->mti_mdt, fid);
        if (IS_ERR(*sobj)) {
-               mdt_object_put(env, stripe);
                rc = PTR_ERR(*sobj);
-               *spobj = NULL;
                *sobj = NULL;
+               GOTO(tpobj_put, rc);
        }
 
+       if (!mdt_object_exists(*sobj))
+               GOTO(sobj_put, rc = -ENOENT);
+
+       return (tpindex < spindex);
+
+sobj_put:
+       mdt_object_put(env, *sobj);
+       *sobj = NULL;
+tpobj_put:
+       mdt_object_put(env, *tpobj);
+       *tpobj = NULL;
+spobj_put:
+       mdt_object_put(env, *spobj);
+       *spobj = NULL;
+
        return rc;
 }
 
@@ -2117,16 +2116,16 @@ close:
 
 /*
  * migrate file in below steps:
- *  1. lock parent and its stripes
+ *  1. lock source and target stripes
  *  2. lookup source by name
  *  3. lock parents of source links if source is not directory
  *  4. reject if source is in HSM
  *  5. take source open_sem and close file if source is regular file
- *  6. lock source and its stripes if it's directory
- *  7. lock target so subsequent change to it can trigger COS
- *  8. migrate file
+ *  6. lock source, and its stripes if it's directory
+ *  7. migrate file
+ *  8. lock target so subsequent change to it can trigger COS
  *  9. unlock above locks
- * 10. sync device if source has links
+ * 10. sync device if source has too many links
  */
 int mdt_reint_migrate(struct mdt_thread_info *info,
                      struct mdt_lock_handle *unused)
@@ -2137,22 +2136,22 @@ int mdt_reint_migrate(struct mdt_thread_info *info,
        struct mdt_reint_record *rr = &info->mti_rr;
        struct lu_ucred *uc = mdt_ucred(info);
        struct md_attr *ma = &info->mti_attr;
-       struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
-       struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
        struct mdt_object *pobj;
-       struct mdt_object *spobj = NULL;
-       struct mdt_object *sobj = NULL;
+       struct mdt_object *spobj;
+       struct mdt_object *tpobj;
+       struct mdt_object *sobj;
        struct mdt_object *tobj;
        struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
-       struct mdt_lock_handle *lhp;
+       struct mdt_lock_handle *lhsp;
+       struct mdt_lock_handle *lhtp;
        struct mdt_lock_handle *lhs;
-       struct mdt_lock_handle *lht;
-       LIST_HEAD(parent_slave_locks);
-       LIST_HEAD(child_slave_locks);
+       struct mdt_lock_handle *lhl;
        LIST_HEAD(link_locks);
        int lock_retries = 5;
+       bool reverse = false;
        bool open_sem_locked = false;
        bool do_sync = false;
+       bool is_plain_dir = false;
        int rc;
 
        ENTRY;
@@ -2224,59 +2223,100 @@ int mdt_reint_migrate(struct mdt_thread_info *info,
        if (rc)
                GOTO(put_parent, rc);
 
-lock_parent:
-       /* lock parent object */
-       lhp = &info->mti_lh[MDT_LH_PARENT];
-       rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
-                                    &parent_slave_locks);
-       if (rc)
+       /* @spobj is the parent stripe of @sobj if @pobj is striped directory,
+        * if @pobj is migrating too, tpobj is the target parent stripe.
+        */
+       rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &tpobj,
+                               &sobj);
+       if (rc < 0)
                GOTO(put_parent, rc);
+       reverse = rc;
 
-       /*
-        * spobj is the corresponding stripe against name if pobj is striped
-        * directory, which is the real parent, and no need to lock, because
-        * we've taken full lock of pobj.
-        */
-       rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
-       if (rc)
-               GOTO(unlock_parent, rc);
+       /* parent unchanged, this happens in dir restripe */
+       if (info->mti_spec.sp_migrate_nsonly && spobj == tpobj)
+               GOTO(put_source, rc = -EALREADY);
+
+lock_parent:
+       LASSERT(spobj);
+       LASSERT(tpobj);
+       lhsp = &info->mti_lh[MDT_LH_PARENT];
+       lhtp = &info->mti_lh[MDT_LH_CHILD];
+       /* lock spobj and tpobj in stripe index order */
+       if (reverse) {
+               rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name, LCK_PW,
+                                    true);
+               if (rc)
+                       GOTO(put_source, rc);
+
+               LASSERT(spobj != tpobj);
+               rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW,
+                                    true);
+               if (rc)
+                       GOTO(unlock_parent, rc);
+       } else {
+               rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW,
+                                    true);
+               if (rc)
+                       GOTO(put_source, rc);
 
-       /* lock parents of source links, and revoke LOOKUP lock of links */
-       rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
-                                  &parent_slave_locks, &link_locks);
-       if (rc == -EBUSY && lock_retries-- > 0) {
-               mdt_object_put(env, sobj);
-               mdt_object_put(env, spobj);
-               goto lock_parent;
+               if (tpobj != spobj) {
+                       rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name,
+                                            LCK_PW, true);
+                       if (rc)
+                               GOTO(unlock_parent, rc);
+               }
        }
 
-       if (rc < 0)
-               GOTO(put_source, rc);
+       /* if inode is not migrated, or is dir, no need to lock links */
+       if (!info->mti_spec.sp_migrate_nsonly &&
+           !S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
+               /* lock link parents, and take LOOKUP lock of links */
+               rc = mdt_migrate_links_lock(info, spobj, tpobj, sobj, lhsp,
+                                           lhtp, &link_locks);
+               if (rc == -EBUSY && lock_retries-- > 0) {
+                       LASSERT(list_empty(&link_locks));
+                       goto lock_parent;
+               }
 
-       /*
-        * RS_MAX_LOCKS is the limit of number of locks that can be saved along
-        * with one request, if total lock count exceeds this limit, we will
-        * drop all locks after migration, and synchronous device in the end.
-        */
-       do_sync = rc;
+               if (rc < 0)
+                       GOTO(put_source, rc);
+
+               /*
+                * RS_MAX_LOCKS is the limit of number of locks that can be
+                * saved along with one request, if total lock count exceeds
+                * this limit, we will drop all locks after migration, and
+                * trigger commit in the end.
+                */
+               do_sync = rc;
+       }
+
+       /* lock source */
+       lhs = &info->mti_lh[MDT_LH_OLD];
+       lhl = &info->mti_lh[MDT_LH_LOOKUP];
+       rc = mdt_rename_source_lock(info, spobj, sobj, lhs, lhl,
+                                   MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR |
+                                   MDS_INODELOCK_OPEN, true);
+       if (rc)
+               GOTO(unlock_links, rc);
 
-       /* TODO: DoM migration is not supported, migrate dirent only */
        if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
+               /* TODO: DoM migration is not supported, migrate dirent only */
                rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
                if (rc)
-                       GOTO(unlock_links, rc);
+                       GOTO(unlock_source, rc);
 
                if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
                        info->mti_spec.sp_migrate_nsonly = 1;
        } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
                rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
                if (rc)
-                       GOTO(unlock_links, rc);
+                       GOTO(unlock_source, rc);
 
-               /* race with restripe/auto-split? */
-               if ((ma->ma_valid & MA_LMV) &&
-                   lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
-                       GOTO(unlock_links, rc = -EBUSY);
+               if (!(ma->ma_valid & MA_LMV))
+                       is_plain_dir = true;
+               else if (lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
+                       /* race with restripe/auto-split */
+                       GOTO(unlock_source, rc = -EBUSY);
        }
 
        /* if migration HSM is allowed */
@@ -2285,10 +2325,10 @@ lock_parent:
                ma->ma_valid = 0;
                rc = mdt_attr_get_complex(info, sobj, ma);
                if (rc)
-                       GOTO(unlock_links, rc);
+                       GOTO(unlock_source, rc);
 
                if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
-                       GOTO(unlock_links, rc = -EOPNOTSUPP);
+                       GOTO(unlock_source, rc = -EOPNOTSUPP);
        }
 
        /* end lease and close file for regular file */
@@ -2297,72 +2337,87 @@ lock_parent:
                if (!down_write_trylock(&sobj->mot_open_sem)) {
                        /* close anyway */
                        mdd_migrate_close(info, sobj);
-                       GOTO(unlock_links, rc = -EBUSY);
+                       GOTO(unlock_source, rc = -EBUSY);
                } else {
                        open_sem_locked = true;
                        rc = mdd_migrate_close(info, sobj);
-                       if (rc)
+                       if (rc && rc != -ESTALE)
                                GOTO(unlock_open_sem, rc);
                }
        }
 
-       /* lock source */
-       lhs = &info->mti_lh[MDT_LH_OLD];
-       rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
-                                    &child_slave_locks);
-       if (rc)
-               GOTO(unlock_open_sem, rc);
-
-       /* lock target */
        tobj = mdt_object_find(env, mdt, rr->rr_fid2);
        if (IS_ERR(tobj))
-               GOTO(unlock_source, rc = PTR_ERR(tobj));
-
-       lht = &info->mti_lh[MDT_LH_NEW];
-       rc = mdt_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, LCK_EX, true);
-       if (rc)
-               GOTO(put_target, rc);
+               GOTO(unlock_open_sem, rc = PTR_ERR(tobj));
 
        /* Don't do lookup sanity check. We know name doesn't exist. */
        info->mti_spec.sp_cr_lookup = 0;
        info->mti_spec.sp_feat = &dt_directory_features;
 
-       rc = mdo_migrate(env, mdt_object_child(pobj),
-                        mdt_object_child(sobj), &rr->rr_name,
-                        mdt_object_child(tobj),
+       rc = mdo_migrate(env, mdt_object_child(spobj),
+                        mdt_object_child(tpobj), mdt_object_child(sobj),
+                        mdt_object_child(tobj), &rr->rr_name,
                         &info->mti_spec, ma);
-       if (!rc)
-               lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
-                                    LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
-       EXIT;
+       if (rc)
+               GOTO(put_target, rc);
+
+       /* save target locks for directory */
+       if (S_ISDIR(lu_object_attr(&sobj->mot_obj)) &&
+           !info->mti_spec.sp_migrate_nsonly) {
+               struct mdt_lock_handle *lht = &info->mti_lh[MDT_LH_NEW];
+               struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+
+               /* in case sobj becomes a stripe of tobj, unlock sobj here,
+                * otherwise stripes lock may deadlock.
+                */
+               if (is_plain_dir)
+                       mdt_rename_source_unlock(info, sobj, lhs, lhl, 1);
 
-       mdt_object_unlock(info, tobj, lht, rc);
+               rc = mdt_object_stripes_lock(info, tpobj, tobj, lht, einfo,
+                                            MDS_INODELOCK_UPDATE, LCK_PW,
+                                            true);
+               if (rc)
+                       GOTO(put_target, rc);
+
+               mdt_object_stripes_unlock(info, tobj, lht, einfo, 0);
+       }
+
+       lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
+                            LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
+
+       EXIT;
 put_target:
        mdt_object_put(env, tobj);
-unlock_source:
-       mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
-                                 &child_slave_locks, rc);
 unlock_open_sem:
        if (open_sem_locked)
                up_write(&sobj->mot_open_sem);
+unlock_source:
+       mdt_rename_source_unlock(info, sobj, lhs, lhl, rc);
 unlock_links:
        /* if we've got too many locks to save into RPC,
         * then just commit before the locks are released
         */
        if (!rc && do_sync)
                mdt_device_sync(env, mdt);
-       mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
+       mdt_migrate_links_unlock(info, &link_locks, do_sync ? 1 : rc);
+unlock_parent:
+       mdt_object_unlock(info, spobj, lhsp, rc);
+       mdt_object_unlock(info, tpobj, lhtp, rc);
 put_source:
        mdt_object_put(env, sobj);
        mdt_object_put(env, spobj);
-unlock_parent:
-       mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
-                                 &parent_slave_locks, rc);
+       mdt_object_put(env, tpobj);
 put_parent:
+       mo_invalidate(env, mdt_object_child(pobj));
        mdt_object_put(env, pobj);
 unlock_rename:
        mdt_rename_unlock(info, rename_lh);
 
+       if (rc)
+               CERROR("%s: migrate "DFID"/"DNAME" failed: rc = %d\n",
+                      mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+                      PNAME(&rr->rr_name), rc);
+
        return rc;
 }
 
@@ -2454,46 +2509,6 @@ static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
        return sindex < tindex ? 0 : 1;
 }
 
-/*
- * lock rename source object.
- *
- * Both source and source parent may be remote, and source may be a remote
- * object on source parent, to avoid overriding lock handle, store remote
- * LOOKUP lock separately in @lhr.
- *
- * \retval     0 on success
- * \retval     -ev negative errno upon error
- */
-static int mdt_rename_source_lock(struct mdt_thread_info *info,
-                                 struct mdt_object *parent,
-                                 struct mdt_object *child,
-                                 struct mdt_lock_handle *lhc,
-                                 struct mdt_lock_handle *lhr,
-                                 __u64 ibits,
-                                 bool cos_incompat)
-{
-       int rc;
-
-       rc = mdt_is_remote_object(info, parent, child);
-       if (rc < 0)
-               return rc;
-
-       if (rc == 1) {
-               rc = mdt_object_lookup_lock(info, parent, child, lhr, LCK_EX,
-                                           cos_incompat);
-               if (rc)
-                       return rc;
-
-               ibits &= ~MDS_INODELOCK_LOOKUP;
-       }
-
-       rc = mdt_object_lock(info, child, lhc, ibits, LCK_EX, cos_incompat);
-       if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
-               mdt_object_unlock(info, NULL, lhr, rc);
-
-       return rc;
-}
-
 /* Helper function for mdt_reint_rename so we don't need to opencode
  * two different order lockings
  */
@@ -2556,7 +2571,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
        struct mdt_lock_handle *lh_srcdirp;
        struct mdt_lock_handle *lh_tgtdirp;
        struct mdt_lock_handle *lh_oldp = NULL;
-       struct mdt_lock_handle *lh_rmt = NULL;
+       struct mdt_lock_handle *lh_lookup = NULL;
        struct mdt_lock_handle *lh_newp = NULL;
        struct lu_fid *old_fid = &info->mti_tmp_fid1;
        struct lu_fid *new_fid = &info->mti_tmp_fid2;
@@ -2809,9 +2824,10 @@ relock:
                        GOTO(out_put_new, rc = -EISDIR);
 
                lh_oldp = &info->mti_lh[MDT_LH_OLD];
-               lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
+               lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
                rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
-                                           lh_rmt, MDS_INODELOCK_LOOKUP |
+                                           lh_lookup,
+                                           MDS_INODELOCK_LOOKUP |
                                            MDS_INODELOCK_XATTR, cos_incompat);
                if (rc < 0)
                        GOTO(out_put_new, rc);
@@ -2849,9 +2865,10 @@ relock:
                GOTO(out_put_old, rc);
        } else {
                lh_oldp = &info->mti_lh[MDT_LH_OLD];
-               lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
+               lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
                rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
-                                           lh_rmt, MDS_INODELOCK_LOOKUP |
+                                           lh_lookup,
+                                           MDS_INODELOCK_LOOKUP |
                                            MDS_INODELOCK_XATTR, cos_incompat);
                if (rc != 0)
                        GOTO(out_put_old, rc);
@@ -2892,7 +2909,7 @@ out_unlock_new:
        if (mnew != NULL)
                mdt_object_unlock(info, mnew, lh_newp, rc);
 out_unlock_old:
-       mdt_object_unlock(info, NULL, lh_rmt, rc);
+       mdt_object_unlock(info, NULL, lh_lookup, rc);
        mdt_object_unlock(info, mold, lh_oldp, rc);
 out_put_new:
        if (mnew && !discard)