Whamcloud - gitweb
LU-2430 mdd: add lfs mv to migrate inode.
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
index 0afa5e8..9de12fa 100644 (file)
@@ -269,14 +269,11 @@ static int mdt_md_create(struct mdt_thread_info *info)
                  "in "DFID,
                  PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
 
-       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+       if (!fid_is_md_operative(rr->rr_fid1))
                RETURN(-EPERM);
 
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
-       lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
-
        parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
        if (IS_ERR(parent))
                RETURN(PTR_ERR(parent));
@@ -793,7 +790,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
                 RETURN(err_serious(-ENOENT));
 
-       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+       if (!fid_is_md_operative(rr->rr_fid1))
                RETURN(-EPERM);
 
         /*
@@ -856,7 +853,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 }
        }
 
-       if (fid_is_obf(child_fid) || fid_is_dot_lustre(child_fid))
+       if (!fid_is_md_operative(child_fid))
                GOTO(unlock_parent, rc = -EPERM);
 
        /* We will lock the child regardless it is local or remote. No harm. */
@@ -1029,8 +1026,8 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
                 RETURN(-EPERM);
 
-       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
-           fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
+       if (!fid_is_md_operative(rr->rr_fid1) ||
+           !fid_is_md_operative(rr->rr_fid2))
                RETURN(-EPERM);
 
         /* step 1: find & lock the target parent dir */
@@ -1201,29 +1198,333 @@ static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
         int rc = 0;
         ENTRY;
 
-        do {
-                LASSERT(fid_is_sane(&dst_fid));
-                dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
-                if (!IS_ERR(dst)) {
-                        rc = mdo_is_subdir(info->mti_env,
-                                           mdt_object_child(dst), fid,
-                                           &dst_fid);
-                        mdt_object_put(info->mti_env, dst);
-                        if (rc != -EREMOTE && rc < 0) {
-                                CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
-                        } else {
-                                /* check the found fid */
-                                if (lu_fid_eq(&dst_fid, fid))
-                                        rc = -EINVAL;
-                        }
-                } else {
-                        rc = PTR_ERR(dst);
-                }
-        } while (rc == -EREMOTE);
+       /* If the source and target are in the same directory, they can not
+        * be parent/child relationship, so subdir check is not needed */
+       if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
+               return 0;
+
+       do {
+               LASSERT(fid_is_sane(&dst_fid));
+               dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
+               if (!IS_ERR(dst)) {
+                       rc = mdo_is_subdir(info->mti_env,
+                                          mdt_object_child(dst), fid,
+                                          &dst_fid);
+                       mdt_object_put(info->mti_env, dst);
+                       if (rc != -EREMOTE && rc < 0) {
+                               CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
+                       } else {
+                               /* check the found fid */
+                               if (lu_fid_eq(&dst_fid, fid))
+                                       rc = -EINVAL;
+                       }
+               } else {
+                       rc = PTR_ERR(dst);
+               }
+       } while (rc == -EREMOTE);
 
         RETURN(rc);
 }
 
+/* Update object linkEA */
+struct mdt_lock_list {
+       struct mdt_object       *mll_obj;
+       struct mdt_lock_handle  mll_lh;
+       struct list_head        mll_list;
+};
+
+static void mdt_unlock_list(struct mdt_thread_info *info,
+                           struct list_head *list, int rc)
+{
+       struct mdt_lock_list *mll;
+       struct mdt_lock_list *mll2;
+
+       list_for_each_entry_safe(mll, mll2, list, mll_list) {
+               mdt_object_unlock_put(info, mll->mll_obj, &mll->mll_lh, rc);
+               list_del(&mll->mll_list);
+               OBD_FREE_PTR(mll);
+       }
+}
+
+static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info,
+                                     struct mdt_object *obj,
+                                     struct mdt_object *pobj,
+                                     struct list_head *lock_list)
+{
+       struct lu_buf           *buf = &info->mti_big_buf;
+       struct linkea_data      ldata = { 0 };
+       int                     count;
+       int                     rc;
+       ENTRY;
+
+       if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
+               RETURN(0);
+
+       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       if (buf->lb_buf == NULL)
+               RETURN(-ENOMEM);
+
+       ldata.ld_buf = buf;
+       rc = mdt_links_read(info, obj, &ldata);
+       if (rc != 0) {
+               if (rc == -ENOENT || rc == -ENODATA)
+                       rc = 0;
+               RETURN(rc);
+       }
+
+       LASSERT(ldata.ld_leh != NULL);
+       ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
+       for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
+               struct mdt_device *mdt = info->mti_mdt;
+               struct mdt_object *mdt_pobj;
+               struct mdt_lock_list *mll;
+               struct lu_name name;
+               struct lu_fid  fid;
+
+               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
+                                   &name, &fid);
+               ldata.ld_lee = (struct link_ea_entry *)((char *)ldata.ld_lee +
+                                                        ldata.ld_reclen);
+               mdt_pobj = mdt_object_find(info->mti_env, mdt, &fid);
+               if (IS_ERR(mdt_pobj)) {
+                       CWARN("%s: cannot find obj "DFID": rc = %ld\n",
+                             mdt_obd_name(mdt), PFID(&fid), PTR_ERR(mdt_pobj));
+                       continue;
+               }
+
+               if (!mdt_object_exists(mdt_pobj)) {
+                       CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
+                             mdt_obd_name(mdt), PFID(&fid));
+                       mdt_object_put(info->mti_env, mdt_pobj);
+                       continue;
+               }
+
+               if (mdt_pobj == pobj) {
+                       CDEBUG(D_INFO, "%s: skipping parent obj "DFID"\n",
+                              mdt_obd_name(mdt), PFID(&fid));
+                       mdt_object_put(info->mti_env, mdt_pobj);
+                       continue;
+               }
+
+               OBD_ALLOC_PTR(mll);
+               if (mll == NULL) {
+                       mdt_object_put(info->mti_env, mdt_pobj);
+                       GOTO(out, rc = -ENOMEM);
+               }
+
+               if (mdt_object_remote(mdt_pobj)) {
+                       mdt_lock_reg_init(&mll->mll_lh, LCK_EX);
+                       rc = mdt_remote_object_lock(info, mdt_pobj,
+                                                   &mll->mll_lh.mlh_rreg_lh,
+                                                   mll->mll_lh.mlh_rreg_mode,
+                                                   MDS_INODELOCK_UPDATE);
+               } else {
+                       mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
+                       rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
+                                            MDS_INODELOCK_UPDATE,
+                                            MDT_LOCAL_LOCK);
+               }
+               if (rc != 0) {
+                       CERROR("%s: cannot lock "DFID": rc =%d\n",
+                              mdt_obd_name(mdt), PFID(&fid), rc);
+                       mdt_object_put(info->mti_env, mdt_pobj);
+                       OBD_FREE_PTR(mll);
+                       GOTO(out, rc);
+               }
+
+               CFS_INIT_LIST_HEAD(&mll->mll_list);
+               mll->mll_obj = mdt_pobj;
+               list_add_tail(&mll->mll_list, lock_list);
+       }
+out:
+       if (rc != 0)
+               mdt_unlock_list(info, lock_list, rc);
+       RETURN(rc);
+}
+
+/* migrate files from one MDT to another MDT */
+static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
+                                     struct mdt_lock_handle *lhc)
+{
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct md_attr          *ma = &info->mti_attr;
+       struct mdt_object       *msrcdir;
+       struct mdt_object       *mold;
+       struct mdt_object       *mnew = NULL;
+       struct mdt_lock_handle  *lh_dirp;
+       struct mdt_lock_handle  *lh_childp;
+       struct mdt_lock_handle  *lh_tgtp = NULL;
+       struct lu_fid           *old_fid = &info->mti_tmp_fid1;
+       struct list_head        lock_list;
+       int                     rc;
+       ENTRY;
+
+       CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
+              PNAME(&rr->rr_name), PFID(rr->rr_fid2));
+       /* 1: lock the source dir. */
+       msrcdir = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+       if (IS_ERR(msrcdir)) {
+               CERROR("%s: cannot find source dir "DFID" : rc = %d\n",
+                       mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+                       (int)PTR_ERR(msrcdir));
+               RETURN(PTR_ERR(msrcdir));
+       }
+
+       lh_dirp = &info->mti_lh[MDT_LH_PARENT];
+       if (mdt_object_remote(msrcdir)) {
+               mdt_lock_reg_init(lh_dirp, LCK_EX);
+               rc = mdt_remote_object_lock(info, msrcdir,
+                                           &lh_dirp->mlh_rreg_lh,
+                                           lh_dirp->mlh_rreg_mode,
+                                           MDS_INODELOCK_UPDATE);
+               if (rc != ELDLM_OK)
+                       GOTO(out_put_parent, rc);
+       } else {
+               mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
+               rc = mdt_object_lock(info, msrcdir, lh_dirp,
+                                    MDS_INODELOCK_UPDATE,
+                                    MDT_LOCAL_LOCK);
+               if (rc)
+                       GOTO(out_put_parent, rc);
+
+               rc = mdt_version_get_check_save(info, msrcdir, 0);
+               if (rc)
+                       GOTO(out_unlock_parent, rc);
+       }
+
+       /* 2: sanity check and find the object to be migrated. */
+       fid_zero(old_fid);
+       rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
+       if (rc != 0)
+               GOTO(out_unlock_parent, rc);
+
+       if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
+               GOTO(out_unlock_parent, rc = -EINVAL);
+
+       if (!fid_is_md_operative(old_fid))
+               GOTO(out_unlock_parent, rc = -EPERM);
+
+       mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
+       if (IS_ERR(mold))
+               GOTO(out_unlock_parent, rc = PTR_ERR(mold));
+
+       if (mdt_object_remote(mold)) {
+               CERROR("%s: source "DFID" is on the remote MDT\n",
+                      mdt_obd_name(info->mti_mdt), PFID(old_fid));
+               GOTO(out_put_child, rc = -EREMOTE);
+       }
+
+       if (S_ISREG(lu_object_attr(&mold->mot_obj)) &&
+           !mdt_object_remote(msrcdir)) {
+               CERROR("%s: parent "DFID" is still on the same"
+                      " MDT, which should be migrated first:"
+                      " rc = %d\n", mdt_obd_name(info->mti_mdt),
+                      PFID(mdt_object_fid(msrcdir)), -EPERM);
+               GOTO(out_put_child, rc = -EPERM);
+       }
+
+       /* 3: iterate the linkea of the object and lock all of the objects */
+       CFS_INIT_LIST_HEAD(&lock_list);
+       rc = mdt_lock_objects_in_linkea(info, mold, msrcdir, &lock_list);
+       if (rc != 0)
+               GOTO(out_put_child, rc);
+
+       /* 4: lock of the object migrated object */
+       lh_childp = &info->mti_lh[MDT_LH_OLD];
+       mdt_lock_reg_init(lh_childp, LCK_EX);
+       rc = mdt_object_lock(info, mold, lh_childp,
+                            MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
+                            MDS_INODELOCK_LAYOUT, MDT_CROSS_LOCK);
+       if (rc != 0)
+               GOTO(out_unlock_list, rc);
+
+       ma->ma_need = MA_LMV;
+       ma->ma_valid = 0;
+       ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
+       ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
+       rc = mdt_stripe_get(info, mold, ma, XATTR_NAME_LMV);
+       if (rc != 0)
+               GOTO(out_unlock_list, rc);
+
+       if ((ma->ma_valid & MA_LMV)) {
+               struct lmv_mds_md_v1 *lmm1;
+
+               lmv_le_to_cpu(ma->ma_lmv, ma->ma_lmv);
+               lmm1 = &ma->ma_lmv->lmv_md_v1;
+               if (lmm1->lmv_magic != LMV_MAGIC_MIGRATE) {
+                       CERROR("%s: can not migrate striped dir "DFID
+                              ": rc = %d\n", mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(mold)), -EPERM);
+                       GOTO(out_unlock_child, rc = -EPERM);
+               }
+
+               if (!fid_is_sane(&lmm1->lmv_stripe_fids[1]))
+                       GOTO(out_unlock_child, rc = -EINVAL);
+
+               mnew = mdt_object_find(info->mti_env, info->mti_mdt,
+                                      &lmm1->lmv_stripe_fids[1]);
+               if (IS_ERR(mnew))
+                       GOTO(out_unlock_child, rc = PTR_ERR(mnew));
+
+               if (!mdt_object_remote(mnew)) {
+                       CERROR("%s: "DFID" being migrated is on this MDT:"
+                              " rc  = %d\n", mdt_obd_name(info->mti_mdt),
+                              PFID(rr->rr_fid2), -EPERM);
+                       GOTO(out_put_new, rc = -EPERM);
+               }
+
+               lh_tgtp = &info->mti_lh[MDT_LH_CHILD];
+               mdt_lock_reg_init(lh_tgtp, LCK_EX);
+               rc = mdt_remote_object_lock(info, mnew,
+                                           &lh_tgtp->mlh_rreg_lh,
+                                           lh_tgtp->mlh_rreg_mode,
+                                           MDS_INODELOCK_UPDATE);
+               if (rc != 0) {
+                       lh_tgtp = NULL;
+                       GOTO(out_put_new, rc);
+               }
+       } else {
+               mnew = mdt_object_find(info->mti_env, info->mti_mdt,
+                                      rr->rr_fid2);
+               if (IS_ERR(mnew))
+                       GOTO(out_unlock_child, rc = PTR_ERR(mnew));
+               if (!mdt_object_remote(mnew)) {
+                       CERROR("%s: Migration "DFID" is on this MDT !\n",
+                              mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid2));
+                       GOTO(out_put_new, rc = -EXDEV);
+               }
+       }
+
+       /* 5: migrate it */
+       mdt_reint_init_ma(info, ma);
+
+       mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
+                      OBD_FAIL_MDS_REINT_RENAME_WRITE);
+
+       rc = mdo_migrate(info->mti_env, mdt_object_child(msrcdir),
+                        old_fid, &rr->rr_name, mdt_object_child(mnew), ma);
+       if (rc != 0)
+               GOTO(out_unlock_new, rc);
+out_unlock_new:
+       if (lh_tgtp != NULL)
+               mdt_object_unlock(info, mnew, lh_tgtp, rc);
+out_put_new:
+       if (mnew)
+               mdt_object_put(info->mti_env, mnew);
+out_unlock_child:
+       mdt_object_unlock(info, mold, lh_childp, rc);
+out_unlock_list:
+       mdt_unlock_list(info, &lock_list, rc);
+out_put_child:
+       mdt_object_put(info->mti_env, mold);
+out_unlock_parent:
+       mdt_object_unlock(info, msrcdir, lh_dirp, rc);
+out_put_parent:
+       mdt_object_put(info->mti_env, msrcdir);
+
+       RETURN(rc);
+}
+
 /*
  * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
  * 2 - src child; 3 - tgt child.
@@ -1240,80 +1541,64 @@ static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
  *    And tgt_c will be still in the same MDT as the original
  *    src_c.
  */
-static int mdt_reint_rename(struct mdt_thread_info *info,
-                            struct mdt_lock_handle *lhc)
+static int mdt_reint_rename_internal(struct mdt_thread_info *info,
+                                    struct mdt_lock_handle *lhc)
 {
-        struct mdt_reint_record *rr = &info->mti_rr;
-        struct md_attr          *ma = &info->mti_attr;
-        struct ptlrpc_request   *req = mdt_info_req(info);
-        struct mdt_object       *msrcdir;
-        struct mdt_object       *mtgtdir;
-        struct mdt_object       *mold;
-        struct mdt_object       *mnew = NULL;
-        struct mdt_lock_handle  *lh_srcdirp;
-        struct mdt_lock_handle  *lh_tgtdirp;
-        struct mdt_lock_handle  *lh_oldp;
-        struct mdt_lock_handle  *lh_newp;
-        struct lu_fid           *old_fid = &info->mti_tmp_fid1;
-        struct lu_fid           *new_fid = &info->mti_tmp_fid2;
-        struct lustre_handle     rename_lh = { 0 };
-        int                      rc;
-        ENTRY;
-
-        if (info->mti_dlm_req)
-                ldlm_request_cancel(req, info->mti_dlm_req, 0);
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct md_attr          *ma = &info->mti_attr;
+       struct ptlrpc_request   *req = mdt_info_req(info);
+       struct mdt_object       *msrcdir;
+       struct mdt_object       *mtgtdir;
+       struct mdt_object       *mold;
+       struct mdt_object       *mnew = NULL;
+       struct mdt_lock_handle  *lh_srcdirp;
+       struct mdt_lock_handle  *lh_tgtdirp;
+       struct mdt_lock_handle  *lh_oldp = NULL;
+       struct mdt_lock_handle  *lh_newp = NULL;
+       struct lu_fid           *old_fid = &info->mti_tmp_fid1;
+       struct lu_fid           *new_fid = &info->mti_tmp_fid2;
+       int                      rc;
+       ENTRY;
 
        DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
                  PFID(rr->rr_fid1), PNAME(&rr->rr_name),
                  PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
 
-       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
-           fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
-               RETURN(-EPERM);
-
-       rc = mdt_rename_lock(info, &rename_lh);
-       if (rc) {
-               CERROR("Can't lock FS for rename, rc %d\n", rc);
-               RETURN(rc);
-       }
-
-        lh_newp = &info->mti_lh[MDT_LH_NEW];
-
-        /* step 1: lock the source dir. */
-        lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
+       /* step 1: lock the source dir. */
+       lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
        mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
-        msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
-                                       MDS_INODELOCK_UPDATE);
-        if (IS_ERR(msrcdir))
-                GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
+       msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
+                                      MDS_INODELOCK_UPDATE);
+       if (IS_ERR(msrcdir))
+               RETURN(PTR_ERR(msrcdir));
 
-        rc = mdt_version_get_check_save(info, msrcdir, 0);
-        if (rc)
-                GOTO(out_unlock_source, rc);
+       rc = mdt_version_get_check_save(info, msrcdir, 0);
+       if (rc)
+               GOTO(out_unlock_source, rc);
 
-        /* step 2: find & lock the target dir. */
-        lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
+       /* step 2: find & lock the target dir. */
+       lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
        mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
-        if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
-                mdt_object_get(info->mti_env, msrcdir);
-                mtgtdir = msrcdir;
-                if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
-                         rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
-                                                 MDS_INODELOCK_UPDATE);
-                         if (rc)
-                                 GOTO(out_unlock_source, rc);
-                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
-                }
-        } else {
-                mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
-                                          rr->rr_fid2);
-                if (IS_ERR(mtgtdir))
-                        GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
+       if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
+               mdt_object_get(info->mti_env, msrcdir);
+               mtgtdir = msrcdir;
+               if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
+                       rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
+                                        MDS_INODELOCK_UPDATE);
+                       if (rc != 0)
+                               GOTO(out_unlock_source, rc);
+                       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
+               }
+       } else {
+               mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
+                                         rr->rr_fid2);
+               if (IS_ERR(mtgtdir))
+                       GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
 
-                /* check early, the real version will be saved after locking */
-                rc = mdt_version_get_check(info, mtgtdir, 1);
-                if (rc)
-                        GOTO(out_put_target, rc);
+               /* check early, the real version will be saved after locking */
+               rc = mdt_version_get_check(info, mtgtdir, 1);
+               if (rc)
+                       GOTO(out_put_target, rc);
 
                if (unlikely(mdt_object_remote(mtgtdir))) {
                        CDEBUG(D_INFO, "Source dir "DFID" target dir "DFID
@@ -1345,96 +1630,102 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
        if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
                GOTO(out_unlock_target, rc = -EINVAL);
 
-       if (fid_is_obf(old_fid) || fid_is_dot_lustre(old_fid))
+       if (!fid_is_md_operative(old_fid))
                GOTO(out_unlock_target, rc = -EPERM);
 
        mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
        if (IS_ERR(mold))
                GOTO(out_unlock_target, rc = PTR_ERR(mold));
 
-        lh_oldp = &info->mti_lh[MDT_LH_OLD];
-        mdt_lock_reg_init(lh_oldp, LCK_EX);
-       rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP |
-                            MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
-        if (rc != 0) {
-                mdt_object_put(info->mti_env, mold);
-                GOTO(out_unlock_target, rc);
-        }
-
        tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
-        /* save version after locking */
-        mdt_version_get_save(info, mold, 2);
-        mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
-
-        /* step 4: find & lock the new object. */
-        /* new target object may not exist now */
-        /* lookup with version checking */
-        fid_zero(new_fid);
+       /* save version after locking */
+       mdt_version_get_save(info, mold, 2);
+       mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
+
+       /* step 4: find & lock the new object. */
+       /* new target object may not exist now */
+       /* lookup with version checking */
+       fid_zero(new_fid);
        rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
                                      3);
-        if (rc == 0) {
-                /* the new_fid should have been filled at this moment */
-                if (lu_fid_eq(old_fid, new_fid))
-                       GOTO(out_unlock_old, rc);
+       if (rc == 0) {
+               /* the new_fid should have been filled at this moment */
+               if (lu_fid_eq(old_fid, new_fid))
+                       GOTO(out_put_old, rc);
 
-                if (lu_fid_eq(new_fid, rr->rr_fid1) ||
-                    lu_fid_eq(new_fid, rr->rr_fid2))
-                        GOTO(out_unlock_old, rc = -EINVAL);
+               if (lu_fid_eq(new_fid, rr->rr_fid1) ||
+                   lu_fid_eq(new_fid, rr->rr_fid2))
+                       GOTO(out_put_old, rc = -EINVAL);
 
-               if (fid_is_obf(new_fid) || fid_is_dot_lustre(new_fid))
-                       GOTO(out_unlock_old, rc = -EPERM);
+               if (!fid_is_md_operative(new_fid))
+                       GOTO(out_put_old, rc = -EPERM);
 
                if (mdt_object_remote(mold)) {
                        CDEBUG(D_INFO, "Src child "DFID" is on another MDT\n",
                               PFID(old_fid));
-                       GOTO(out_unlock_old, rc = -EXDEV);
+                       GOTO(out_put_old, rc = -EXDEV);
                }
 
-                mdt_lock_reg_init(lh_newp, LCK_EX);
-                mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
-                if (IS_ERR(mnew))
-                        GOTO(out_unlock_old, rc = PTR_ERR(mnew));
+               mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
+               if (IS_ERR(mnew))
+                       GOTO(out_put_old, rc = PTR_ERR(mnew));
 
                if (mdt_object_remote(mnew)) {
-                       mdt_object_put(info->mti_env, mnew);
                        CDEBUG(D_INFO, "src child "DFID" is on another MDT\n",
                               PFID(new_fid));
-                       GOTO(out_unlock_old, rc = -EXDEV);
+                       GOTO(out_put_new, rc = -EXDEV);
                }
 
+               lh_oldp = &info->mti_lh[MDT_LH_OLD];
+               mdt_lock_reg_init(lh_oldp, LCK_EX);
+               rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
+               if (rc != 0)
+                       GOTO(out_put_new, rc);
+
                /* We used to acquire MDS_INODELOCK_FULL here but we
                 * can't do this now because a running HSM restore on
                 * the rename onto victim will hold the layout
                 * lock. See LU-4002. */
+
+               lh_newp = &info->mti_lh[MDT_LH_NEW];
+               mdt_lock_reg_init(lh_newp, LCK_EX);
                rc = mdt_object_lock(info, mnew, lh_newp,
                                     MDS_INODELOCK_LOOKUP |
                                     MDS_INODELOCK_UPDATE,
                                     MDT_CROSS_LOCK);
-                if (rc != 0) {
-                        mdt_object_put(info->mti_env, mnew);
-                        GOTO(out_unlock_old, rc);
-                }
-                /* get and save version after locking */
-                mdt_version_get_save(info, mnew, 3);
-                mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
-        } else if (rc != -EREMOTE && rc != -ENOENT) {
-                GOTO(out_unlock_old, rc);
-        } else {
+               if (rc != 0)
+                       GOTO(out_unlock_old, rc);
+
+               /* get and save version after locking */
+               mdt_version_get_save(info, mnew, 3);
+               mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
+       } else if (rc != -EREMOTE && rc != -ENOENT) {
+               GOTO(out_put_old, rc);
+       } else {
                /* If mnew does not exist and mold are remote directory,
                 * it only allows rename if they are under same directory */
                if (mtgtdir != msrcdir && mdt_object_remote(mold)) {
                        CDEBUG(D_INFO, "Src child "DFID" is on another MDT\n",
                               PFID(old_fid));
-                       GOTO(out_unlock_old, rc = -EXDEV);
+                       GOTO(out_put_old, rc = -EXDEV);
                }
+
+               lh_oldp = &info->mti_lh[MDT_LH_OLD];
+               mdt_lock_reg_init(lh_oldp, LCK_EX);
+               rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
+               if (rc != 0)
+                       GOTO(out_put_old, rc);
+
                mdt_enoent_version_save(info, 3);
-        }
+       }
 
-        /* step 5: rename it */
-        mdt_reint_init_ma(info, ma);
+       /* step 5: rename it */
+       mdt_reint_init_ma(info, ma);
 
-        mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
-                       OBD_FAIL_MDS_REINT_RENAME_WRITE);
+       mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
+                      OBD_FAIL_MDS_REINT_RENAME_WRITE);
 
        /* Check if @dst is subdir of @src. */
        rc = mdt_rename_sanity(info, old_fid);
@@ -1459,25 +1750,74 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                        mdt_handle_last_unlink(info, mnew, ma);
 
                mdt_rename_counter_tally(info, info->mti_mdt, req,
-                                         msrcdir, mtgtdir);
-        }
+                                        msrcdir, mtgtdir);
+       }
 
-        EXIT;
+       EXIT;
 out_unlock_new:
-        if (mnew)
-                mdt_object_unlock_put(info, mnew, lh_newp, rc);
+       if (mnew != NULL)
+               mdt_object_unlock(info, mnew, lh_newp, rc);
 out_unlock_old:
-        mdt_object_unlock_put(info, mold, lh_oldp, rc);
+       mdt_object_unlock(info, mold, lh_oldp, rc);
+out_put_new:
+       if (mnew != NULL)
+               mdt_object_put(info->mti_env, mnew);
+out_put_old:
+       mdt_object_put(info->mti_env, mold);
 out_unlock_target:
-        mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
+       mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
 out_put_target:
-        mdt_object_put(info->mti_env, mtgtdir);
+       mdt_object_put(info->mti_env, mtgtdir);
 out_unlock_source:
-        mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
-out_rename_lock:
+       mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
+       return rc;
+}
+
+static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info,
+                                      struct mdt_lock_handle *lhc,
+                                      bool  rename)
+{
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct ptlrpc_request   *req = mdt_info_req(info);
+       struct lustre_handle    rename_lh = { 0 };
+       int                     rc;
+       ENTRY;
+
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(req, info->mti_dlm_req, 0);
+
+       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
+           fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
+               RETURN(-EPERM);
+
+       rc = mdt_rename_lock(info, &rename_lh);
+       if (rc != 0) {
+               CERROR("%s: can't lock FS for rename: rc  = %d\n",
+                      mdt_obd_name(info->mti_mdt), rc);
+               RETURN(rc);
+       }
+
+       if (rename)
+               rc = mdt_reint_rename_internal(info, lhc);
+       else
+               rc = mdt_reint_migrate_internal(info, lhc);
+
        if (lustre_handle_is_used(&rename_lh))
                mdt_rename_unlock(&rename_lh);
-       return rc;
+
+       RETURN(rc);
+}
+
+static int mdt_reint_rename(struct mdt_thread_info *info,
+                           struct mdt_lock_handle *lhc)
+{
+       return mdt_reint_rename_or_migrate(info, lhc, true);
+}
+
+static int mdt_reint_migrate(struct mdt_thread_info *info,
+                           struct mdt_lock_handle *lhc)
+{
+       return mdt_reint_rename_or_migrate(info, lhc, false);
 }
 
 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
@@ -1491,7 +1831,8 @@ static mdt_reinter reinters[REINT_MAX] = {
        [REINT_RENAME]   = mdt_reint_rename,
        [REINT_OPEN]     = mdt_reint_open,
        [REINT_SETXATTR] = mdt_reint_setxattr,
-       [REINT_RMENTRY]  = mdt_reint_unlink
+       [REINT_RMENTRY]  = mdt_reint_unlink,
+       [REINT_MIGRATE]   = mdt_reint_migrate,
 };
 
 int mdt_reint_rec(struct mdt_thread_info *info,