Whamcloud - gitweb
LU-14430 mdd: fix inheritance of big default ACLs
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index 134982d..c0e27bf 100644 (file)
@@ -124,7 +124,7 @@ int mdd_lookup(const struct lu_env *env,
 }
 
 /** Read the link EA into a temp buffer.
- * Uses the mdd_thread_info::mti_big_buf since it is generally large.
+ * Uses the mdd_thread_info::mti_link_buf since it is generally large.
  * A pointer to the buffer is stored in \a ldata::ld_buf.
  *
  * \retval 0 or error
@@ -1254,7 +1254,7 @@ static inline int mdd_links_del(const struct lu_env *env,
 /** Read the link EA into a temp buffer.
  * Uses the name_buf since it is generally large.
  * \retval IS_ERR err
- * \retval ptr to \a lu_buf (always \a mti_big_buf)
+ * \retval ptr to \a lu_buf (always \a mti_link_buf)
  */
 struct lu_buf *mdd_links_get(const struct lu_env *env,
                             struct mdd_object *mdd_obj)
@@ -2303,6 +2303,7 @@ static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
                        struct lu_buf *acl_buf)
 {
        int     rc;
+
        ENTRY;
 
        if (S_ISLNK(la->la_mode)) {
@@ -2572,22 +2573,23 @@ int mdd_create(const struct lu_env *env, struct md_object *pobj,
                      const struct lu_name *lname, struct md_object *child,
                      struct md_op_spec *spec, struct md_attr *ma)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct lu_attr          *la = &info->mti_la_for_fix;
-       struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
-       struct mdd_object       *son = md2mdd_obj(child);
-       struct mdd_device       *mdd = mdo2mdd(pobj);
-       struct lu_attr          *attr = &ma->ma_attr;
-       struct thandle          *handle;
-       struct lu_attr          *pattr = &info->mti_pattr;
-       struct lu_buf           acl_buf;
-       struct lu_buf           def_acl_buf;
-       struct lu_buf           hsm_buf;
-       struct linkea_data      *ldata = &info->mti_link_data;
-       const char              *name = lname->ln_name;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lu_attr *la = &info->mti_la_for_fix;
+       struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
+       struct mdd_object *son = md2mdd_obj(child);
+       struct mdd_device *mdd = mdo2mdd(pobj);
+       struct lu_attr *attr = &ma->ma_attr;
+       struct thandle *handle;
+       struct lu_attr *pattr = &info->mti_pattr;
+       struct lu_buf acl_buf;
+       struct lu_buf def_acl_buf;
+       struct lu_buf hsm_buf;
+       struct linkea_data *ldata = &info->mti_link_data;
+       const char *name = lname->ln_name;
        struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
-       int                      rc;
-       int                      rc2;
+       int acl_size = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
+       int rc, rc2;
+
        ENTRY;
 
        rc = mdd_la_get(env, mdd_pobj, pattr);
@@ -2606,13 +2608,25 @@ int mdd_create(const struct lu_env *env, struct md_object *pobj,
        if (IS_ERR(handle))
                GOTO(out_free, rc = PTR_ERR(handle));
 
-       lu_buf_check_and_alloc(&info->mti_xattr_buf,
-                       min_t(unsigned int, mdd->mdd_dt_conf.ddp_max_ea_size,
-                             XATTR_SIZE_MAX));
-       acl_buf = info->mti_xattr_buf;
-       def_acl_buf.lb_buf = info->mti_key;
-       def_acl_buf.lb_len = sizeof(info->mti_key);
+use_bigger_buffer:
+       acl_buf = *lu_buf_check_and_alloc(&info->mti_xattr_buf, acl_size);
+       if (!acl_buf.lb_buf)
+               GOTO(out_stop, rc = -ENOMEM);
+       /* mti_big_buf is also used down below in mdd_changelog_ns_store(),
+        * but def_acl_buf is finished with it before then
+        */
+       def_acl_buf = *lu_buf_check_and_alloc(&info->mti_big_buf, acl_size);
+       if (!def_acl_buf.lb_buf)
+               GOTO(out_stop, rc = -ENOMEM);
+
        rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
+       if (unlikely(rc == -ERANGE &&
+                    acl_size == LUSTRE_POSIX_ACL_MAX_SIZE_OLD)) {
+               /* use maximum-sized xattr buffer for too-big default ACL */
+               acl_size = min_t(unsigned int, mdd->mdd_dt_conf.ddp_max_ea_size,
+                                XATTR_SIZE_MAX);
+               goto use_bigger_buffer;
+       }
        if (rc < 0)
                GOTO(out_stop, rc);
 
@@ -2763,49 +2777,6 @@ out_free:
        return rc;
 }
 
-/*
- * Get locks on parents in proper order
- * RETURN: < 0 - error, rename_order if successful
- */
-enum rename_order {
-        MDD_RN_SAME,
-        MDD_RN_SRCTGT,
-        MDD_RN_TGTSRC
-};
-
-static int mdd_rename_order(const struct lu_env *env,
-                           struct mdd_device *mdd,
-                           struct mdd_object *src_pobj,
-                           const struct lu_attr *pattr,
-                           struct mdd_object *tgt_pobj)
-{
-       /* order of locking, 1 - tgt-src, 0 - src-tgt*/
-       int rc;
-
-       ENTRY;
-       if (src_pobj == tgt_pobj)
-               RETURN(MDD_RN_SAME);
-
-       /* compared the parent child relationship of src_p & tgt_p */
-       if (lu_fid_eq(&mdd->mdd_root_fid, mdd_object_fid(src_pobj))) {
-               rc = MDD_RN_SRCTGT;
-       } else if (lu_fid_eq(&mdd->mdd_root_fid, mdd_object_fid(tgt_pobj))) {
-               rc = MDD_RN_TGTSRC;
-       } else {
-               rc = mdd_is_parent(env, mdd, src_pobj, pattr,
-                                  mdd_object_fid(tgt_pobj));
-               if (rc == -EREMOTE)
-                       rc = 0;
-
-               if (rc == 1)
-                       rc = MDD_RN_TGTSRC;
-               else if (rc == 0)
-                       rc = MDD_RN_SRCTGT;
-       }
-
-       RETURN(rc);
-}
-
 /* has not mdd_write{read}_lock on any obj yet. */
 static int mdd_rename_sanity_check(const struct lu_env *env,
                                    struct mdd_object *src_pobj,
@@ -2978,6 +2949,16 @@ static int mdd_declare_rename(const struct lu_env *env,
         return rc;
 }
 
+static int mdd_migrate_object(const struct lu_env *env,
+                             struct mdd_object *spobj,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
+                             struct md_op_spec *spec,
+                             struct md_attr *ma);
+
 /* src object can be remote that is why we use only fid and type of object */
 static int mdd_rename(const struct lu_env *env,
                       struct md_object *src_pobj, struct md_object *tgt_pobj,
@@ -3022,6 +3003,31 @@ static int mdd_rename(const struct lu_env *env,
        if (rc)
                GOTO(out_pending, rc);
 
+       /* if rename is cross MDTs, migrate symlink if it doesn't have other
+        * hard links, and target doesn't exist.
+        */
+       if (mdd_object_remote(mdd_sobj) && S_ISLNK(cattr->la_mode) &&
+           cattr->la_nlink == 1 && !tobj) {
+               struct md_op_spec *spec = &mdd_env_info(env)->mti_spec;
+               struct lu_device *ld = &mdd->mdd_md_dev.md_lu_dev;
+               struct lu_fid tfid;
+
+               rc = ld->ld_ops->ldo_fid_alloc(env, ld, &tfid, &tgt_pobj->mo_lu,
+                                              NULL);
+               if (rc < 0)
+                       GOTO(out_pending, rc);
+
+               mdd_tobj = mdd_object_find(env, mdd, &tfid);
+               if (IS_ERR(mdd_tobj))
+                       GOTO(out_pending, rc = PTR_ERR(mdd_tobj));
+
+               memset(spec, 0, sizeof(*spec));
+               rc = mdd_migrate_object(env, mdd_spobj, mdd_tpobj, mdd_sobj,
+                                       mdd_tobj, lsname, ltname, spec, ma);
+               mdd_object_put(env, mdd_tobj);
+               GOTO(out_pending, rc);
+       }
+
        rc = mdd_la_get(env, mdd_spobj, pattr);
        if (rc)
                GOTO(out_pending, rc);
@@ -3050,11 +3056,6 @@ static int mdd_rename(const struct lu_env *env,
        if (rc < 0)
                GOTO(out_pending, rc);
 
-       /* FIXME: Should consider tobj and sobj too in rename_lock. */
-       rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
-       if (rc < 0)
-               GOTO(out_pending, rc);
-
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
@@ -3643,7 +3644,8 @@ static int mdd_migrate_linkea_prepare(const struct lu_env *env,
                                      struct mdd_object *spobj,
                                      struct mdd_object *tpobj,
                                      struct mdd_object *sobj,
-                                     const struct lu_name *lname,
+                                     const struct lu_name *sname,
+                                     const struct lu_name *tname,
                                      const struct lu_attr *attr,
                                      struct linkea_data *ldata)
 {
@@ -3653,8 +3655,8 @@ static int mdd_migrate_linkea_prepare(const struct lu_env *env,
        ENTRY;
 
        memset(ldata, 0, sizeof(*ldata));
-       rc = mdd_linkea_prepare(env, sobj, mdd_object_fid(spobj), lname,
-                               mdd_object_fid(tpobj), lname, 1, 0, ldata);
+       rc = mdd_linkea_prepare(env, sobj, mdd_object_fid(spobj), sname,
+                               mdd_object_fid(tpobj), tname, 1, 0, ldata);
        if (rc)
                RETURN(rc);
 
@@ -3680,7 +3682,7 @@ static int mdd_migrate_linkea_prepare(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
-       rc = mdd_iterate_linkea(env, sobj, NULL, lname, mdd_object_fid(tpobj),
+       rc = mdd_iterate_linkea(env, sobj, NULL, tname, mdd_object_fid(tpobj),
                                ldata, &source_mdt_index, NULL,
                                mdd_is_link_on_source_mdt);
        RETURN(rc);
@@ -3690,7 +3692,8 @@ static int mdd_declare_migrate_update(const struct lu_env *env,
                                      struct mdd_object *spobj,
                                      struct mdd_object *tpobj,
                                      struct mdd_object *obj,
-                                     const struct lu_name *lname,
+                                     const struct lu_name *sname,
+                                     const struct lu_name *tname,
                                      struct lu_attr *attr,
                                      struct lu_attr *spattr,
                                      struct lu_attr *tpattr,
@@ -3702,7 +3705,7 @@ static int mdd_declare_migrate_update(const struct lu_env *env,
        struct lu_attr *la = &info->mti_la_for_fix;
        int rc;
 
-       rc = mdo_declare_index_delete(env, spobj, lname->ln_name, handle);
+       rc = mdo_declare_index_delete(env, spobj, sname->ln_name, handle);
        if (rc)
                return rc;
 
@@ -3714,7 +3717,7 @@ static int mdd_declare_migrate_update(const struct lu_env *env,
 
        rc = mdo_declare_index_insert(env, tpobj, mdd_object_fid(obj),
                                      attr->la_mode & S_IFMT,
-                                     lname->ln_name, handle);
+                                     tname->ln_name, handle);
        if (rc)
                return rc;
 
@@ -3747,7 +3750,8 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
                                      struct mdd_object *tpobj,
                                      struct mdd_object *sobj,
                                      struct mdd_object *tobj,
-                                     const struct lu_name *lname,
+                                     const struct lu_name *sname,
+                                     const struct lu_name *tname,
                                      struct lu_attr *spattr,
                                      struct lu_attr *tpattr,
                                      struct lu_attr *attr,
@@ -3784,7 +3788,7 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
        mdd_object_make_hint(env, tpobj, tobj, attr, spec, hint);
 
        rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
-                               lname, attr, handle, spec, ldata, NULL, NULL,
+                               tname, attr, handle, spec, ldata, NULL, NULL,
                                NULL, hint);
        if (rc)
                return rc;
@@ -3844,7 +3848,7 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
        }
 
        if (!S_ISDIR(attr->la_mode)) {
-               rc = mdd_iterate_linkea(env, sobj, tobj, lname,
+               rc = mdd_iterate_linkea(env, sobj, tobj, tname,
                                        mdd_object_fid(tpobj), ldata, NULL,
                                        handle, mdd_declare_update_link);
                if (rc)
@@ -3867,8 +3871,9 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
                        return rc;
        }
 
-       rc = mdd_declare_migrate_update(env, spobj, tpobj, tobj, lname, attr,
-                                       spattr, tpattr, ldata, ma, handle);
+       rc = mdd_declare_migrate_update(env, spobj, tpobj, tobj, sname, tname,
+                                       attr, spattr, tpattr, ldata, ma,
+                                       handle);
        return rc;
 }
 
@@ -3879,7 +3884,8 @@ static int mdd_migrate_update(const struct lu_env *env,
                              struct mdd_object *spobj,
                              struct mdd_object *tpobj,
                              struct mdd_object *obj,
-                             const struct lu_name *lname,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
                              struct lu_attr *attr,
                              struct lu_attr *spattr,
                              struct lu_attr *tpattr,
@@ -3893,18 +3899,18 @@ static int mdd_migrate_update(const struct lu_env *env,
 
        ENTRY;
 
-       CDEBUG(D_INFO, "update "DFID"/%s to "DFID"/"DFID"\n",
-              PFID(mdd_object_fid(spobj)), lname->ln_name,
-              PFID(mdd_object_fid(tpobj)), PFID(mdd_object_fid(obj)));
+       CDEBUG(D_INFO, "update "DFID" from "DFID"/%s to "DFID"/%s\n",
+              PFID(mdd_object_fid(obj)), PFID(mdd_object_fid(spobj)),
+              sname->ln_name, PFID(mdd_object_fid(tpobj)), tname->ln_name);
 
-       rc = __mdd_index_delete(env, spobj, lname->ln_name,
+       rc = __mdd_index_delete(env, spobj, sname->ln_name,
                                S_ISDIR(attr->la_mode), handle);
        if (rc)
                RETURN(rc);
 
        rc = __mdd_index_insert(env, tpobj, mdd_object_fid(obj),
                                attr->la_mode & S_IFMT,
-                               lname->ln_name, handle);
+                               tname->ln_name, handle);
        if (rc)
                RETURN(rc);
 
@@ -3960,7 +3966,8 @@ static int mdd_migrate_create(const struct lu_env *env,
                              struct mdd_object *tpobj,
                              struct mdd_object *sobj,
                              struct mdd_object *tobj,
-                             const struct lu_name *lname,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
                              struct lu_attr *spattr,
                              struct lu_attr *tpattr,
                              struct lu_attr *attr,
@@ -4032,7 +4039,7 @@ static int mdd_migrate_create(const struct lu_env *env,
 
        /* update links FID */
        if (!S_ISDIR(attr->la_mode)) {
-               rc = mdd_iterate_linkea(env, sobj, tobj, lname,
+               rc = mdd_iterate_linkea(env, sobj, tobj, tname,
                                        mdd_object_fid(tpobj), ldata,
                                        NULL, handle, mdd_update_link);
                if (rc)
@@ -4054,7 +4061,7 @@ static int mdd_migrate_create(const struct lu_env *env,
                        RETURN(rc);
        }
 
-       rc = mdd_migrate_update(env, spobj, tpobj, tobj, lname, attr,
+       rc = mdd_migrate_update(env, spobj, tpobj, tobj, sname, tname, attr,
                                spattr, tpattr, ldata, ma, handle);
 
        RETURN(rc);
@@ -4103,7 +4110,7 @@ static int mdd_migrate_cmd_check(struct mdd_device *mdd,
 }
 
 /**
- * Migrate directory or file.
+ * Internal function to migrate directory or file between MDTs.
  *
  * migrate source to target in following steps:
  *   1. create target, append source stripes after target's if it's directory,
@@ -4112,39 +4119,38 @@ static int mdd_migrate_cmd_check(struct mdd_device *mdd,
  *      update file linkea, and destroy source if it's not needed any more.
  *
  * \param[in] env      execution environment
- * \param[in] md_pobj  parent master object
- * \param[in] md_sobj  source object
- * \param[in] lname    file name
- * \param[in] md_tobj  target object
+ * \param[in] spobj    source parent object
+ * \param[in] tpobj    target parent object
+ * \param[in] sobj     source object
+ * \param[in] tobj     target object
+ * \param[in] sname    source file name
+ * \param[in] tname    target file name
  * \param[in] spec     target creation spec
  * \param[in] ma       used to update \a pobj mtime and ctime
  *
  * \retval             0 on success
  * \retval             -errno on failure
  */
-static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
-                      struct md_object *md_sobj, const struct lu_name *lname,
-                      struct md_object *md_tobj, struct md_op_spec *spec,
-                      struct md_attr *ma)
+static int mdd_migrate_object(const struct lu_env *env,
+                             struct mdd_object *spobj,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
+                             struct md_op_spec *spec,
+                             struct md_attr *ma)
 {
-       struct mdd_device *mdd = mdo2mdd(md_pobj);
        struct mdd_thread_info *info = mdd_env_info(env);
-       struct mdd_object *pobj = md2mdd_obj(md_pobj);
-       struct mdd_object *sobj = md2mdd_obj(md_sobj);
-       struct mdd_object *tobj = md2mdd_obj(md_tobj);
-       struct mdd_object *spobj = NULL;
-       struct mdd_object *tpobj = NULL;
+       struct mdd_device *mdd = mdo2mdd(&spobj->mod_obj);
        struct lu_attr *spattr = &info->mti_pattr;
        struct lu_attr *tpattr = &info->mti_tpattr;
        struct lu_attr *attr = &info->mti_cattr;
        struct linkea_data *ldata = &info->mti_link_data;
        struct dt_allocation_hint *hint = &info->mti_hint;
-       struct lu_fid *fid = &info->mti_fid2;
-       struct lu_buf pbuf = { NULL };
        struct lu_buf sbuf = { NULL };
        struct lmv_mds_md_v1 *lmv;
        struct thandle *handle;
-       bool nsonly = false;
        int rc;
 
        ENTRY;
@@ -4153,74 +4159,15 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
        if (rc)
                RETURN(rc);
 
-       /* locate source and target stripe on pobj, which are the real parent */
-       rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
-       if (rc < 0 && rc != -ENODATA)
-               RETURN(rc);
-
-       lmv = pbuf.lb_buf;
-       if (lmv) {
-               int index;
-
-               if (!lmv_is_sane(lmv))
-                       GOTO(out, rc = -EBADF);
-
-               /* locate target parent stripe */
-               /* fail check here to make sure top dir migration succeed. */
-               if (lmv_is_migrating(lmv) &&
-                   OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
-                       GOTO(out, rc = -EIO);
-
-               index = lmv_name_to_stripe_index(lmv, lname->ln_name,
-                                                lname->ln_namelen);
-               if (index < 0)
-                       GOTO(out, rc = index);
-
-               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
-               tpobj = mdd_object_find(env, mdd, fid);
-               if (IS_ERR(tpobj))
-                       GOTO(out, rc = PTR_ERR(tpobj));
-
-               /* locate source parent stripe */
-               if (lmv_is_layout_changing(lmv)) {
-                       index = lmv_name_to_stripe_index_old(lmv,
-                                                            lname->ln_name,
-                                                            lname->ln_namelen);
-                       if (index < 0)
-                               GOTO(out, rc = index);
-
-                       fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
-                       spobj = mdd_object_find(env, mdd, fid);
-                       if (IS_ERR(spobj))
-                               GOTO(out, rc = PTR_ERR(spobj));
-
-                       /* parent stripe unchanged */
-                       if (spobj == tpobj) {
-                               if (!lmv_is_restriping(lmv))
-                                       GOTO(out, rc = -EINVAL);
-                               GOTO(out, rc = -EALREADY);
-                       }
-                       nsonly = spec->sp_migrate_nsonly;
-               } else {
-                       spobj = tpobj;
-                       mdd_object_get(spobj);
-               }
-       } else {
-               tpobj = pobj;
-               spobj = pobj;
-               mdd_object_get(tpobj);
-               mdd_object_get(spobj);
-       }
-
        rc = mdd_la_get(env, spobj, spattr);
        if (rc)
-               GOTO(out, rc);
+               RETURN(rc);
 
        rc = mdd_la_get(env, tpobj, tpattr);
        if (rc)
-               GOTO(out, rc);
+               RETURN(rc);
 
-       if (S_ISDIR(attr->la_mode) && !nsonly) {
+       if (S_ISDIR(attr->la_mode) && !spec->sp_migrate_nsonly) {
                struct lmv_user_md_v1 *lum = spec->u.sp_ea.eadata;
 
                LASSERT(lum);
@@ -4241,7 +4188,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
                                GOTO(out, rc = -EBADF);
                        if (lmv_is_migrating(lmv)) {
                                rc = mdd_migrate_cmd_check(mdd, lmv, lum,
-                                                          lname);
+                                                          sname);
                                GOTO(out, rc);
                        }
                }
@@ -4250,8 +4197,8 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
                        GOTO(out, rc = -EALREADY);
 
                /* update namespace only if @sobj is on MDT where @tpobj is. */
-               if (!mdd_object_remote(tpobj))
-                       nsonly = true;
+               if (!mdd_object_remote(tpobj) && !mdd_object_remote(sobj))
+                       spec->sp_migrate_nsonly = true;
 
                if (S_ISLNK(attr->la_mode)) {
                        lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
@@ -4270,11 +4217,11 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
        }
 
        /* linkea needs update upon FID or parent stripe change */
-       rc = mdd_migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, lname,
-                                       attr, ldata);
+       rc = mdd_migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, sname,
+                                       tname, attr, ldata);
        if (rc > 0)
                /* update namespace only if @sobj has link on its MDT. */
-               nsonly = true;
+               spec->sp_migrate_nsonly = true;
        else if (rc < 0)
                GOTO(out, rc);
 
@@ -4287,53 +4234,155 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
        if (IS_ERR(handle))
                GOTO(out, rc = PTR_ERR(handle));
 
-       if (nsonly)
-               rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, lname,
-                                               attr, spattr, tpattr, ldata, ma,
-                                               handle);
+       if (spec->sp_migrate_nsonly)
+               rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, sname,
+                                               tname, attr, spattr, tpattr,
+                                               ldata, ma, handle);
        else
                rc = mdd_declare_migrate_create(env, spobj, tpobj, sobj, tobj,
-                                               lname, spattr, tpattr, attr,
-                                               &sbuf, ldata, ma, spec, hint,
-                                               handle);
+                                               sname, tname, spattr, tpattr,
+                                               attr, &sbuf, ldata, ma, spec,
+                                               hint, handle);
        if (rc)
-               GOTO(stop_trans, rc);
+               GOTO(stop, rc);
 
-       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, tname, sname,
                                         handle);
        if (rc)
-               GOTO(stop_trans, rc);
+               GOTO(stop, rc);
 
        rc = mdd_trans_start(env, mdd, handle);
        if (rc)
-               GOTO(stop_trans, rc);
+               GOTO(stop, rc);
 
-       if (nsonly)
-               rc = mdd_migrate_update(env, spobj, tpobj, sobj, lname, attr,
-                                       spattr, tpattr, ldata, ma, handle);
+       if (spec->sp_migrate_nsonly)
+               rc = mdd_migrate_update(env, spobj, tpobj, sobj, sname, tname,
+                                       attr, spattr, tpattr, ldata, ma,
+                                       handle);
        else
-               rc = mdd_migrate_create(env, spobj, tpobj, sobj, tobj, lname,
-                                       spattr, tpattr, attr, &sbuf, ldata, ma,
-                                       spec, hint, handle);
+               rc = mdd_migrate_create(env, spobj, tpobj, sobj, tobj, sname,
+                                       tname, spattr, tpattr, attr, &sbuf,
+                                       ldata, ma, spec, hint, handle);
        if (rc)
-               GOTO(stop_trans, rc);
+               GOTO(stop, rc);
 
        rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0,
-                                   nsonly ? sobj : tobj, mdd_object_fid(spobj),
-                                   mdd_object_fid(sobj), mdd_object_fid(tpobj),
-                                   lname, lname, handle);
+                                   spec->sp_migrate_nsonly ? sobj : tobj,
+                                   mdd_object_fid(spobj), mdd_object_fid(sobj),
+                                   mdd_object_fid(tpobj), tname, sname,
+                                   handle);
        if (rc)
-               GOTO(stop_trans, rc);
+               GOTO(stop, rc);
        EXIT;
 
-stop_trans:
+stop:
        rc = mdd_trans_stop(env, mdd, rc, handle);
 out:
+       lu_buf_free(&sbuf);
+
+       return rc;
+}
+
+/**
+ * Migrate directory or file between MDTs.
+ *
+ * \param[in] env      execution environment
+ * \param[in] md_pobj  parent master object
+ * \param[in] md_sobj  source object
+ * \param[in] lname    file name
+ * \param[in] md_tobj  target object
+ * \param[in] spec     target creation spec
+ * \param[in] ma       used to update \a pobj mtime and ctime
+ *
+ * \retval             0 on success
+ * \retval             -errno on failure
+ */
+static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
+                      struct md_object *md_sobj, const struct lu_name *lname,
+                      struct md_object *md_tobj, struct md_op_spec *spec,
+                      struct md_attr *ma)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_device *mdd = mdo2mdd(md_pobj);
+       struct mdd_object *pobj = md2mdd_obj(md_pobj);
+       struct mdd_object *sobj = md2mdd_obj(md_sobj);
+       struct mdd_object *tobj = md2mdd_obj(md_tobj);
+       struct mdd_object *spobj = NULL;
+       struct mdd_object *tpobj = NULL;
+       struct lu_buf pbuf = { NULL };
+       struct lu_fid *fid = &info->mti_fid2;
+       struct lmv_mds_md_v1 *lmv;
+       int rc;
+
+       ENTRY;
+
+       /* locate source and target stripe on pobj, which are the real parent */
+       rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
+       if (rc < 0 && rc != -ENODATA)
+               RETURN(rc);
+
+       lmv = pbuf.lb_buf;
+       if (lmv) {
+               int index;
+
+               if (!lmv_is_sane(lmv))
+                       GOTO(out, rc = -EBADF);
+
+               /* locate target parent stripe */
+               /* fail check here to make sure top dir migration succeed. */
+               if (lmv_is_migrating(lmv) &&
+                   OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+                       GOTO(out, rc = -EIO);
+
+               index = lmv_name_to_stripe_index(lmv, lname->ln_name,
+                                                lname->ln_namelen);
+               if (index < 0)
+                       GOTO(out, rc = index);
+
+               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
+               tpobj = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(tpobj))
+                       GOTO(out, rc = PTR_ERR(tpobj));
+
+               /* locate source parent stripe */
+               if (lmv_is_layout_changing(lmv)) {
+                       index = lmv_name_to_stripe_index_old(lmv,
+                                                            lname->ln_name,
+                                                            lname->ln_namelen);
+                       if (index < 0)
+                               GOTO(out, rc = index);
+
+                       fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
+                       spobj = mdd_object_find(env, mdd, fid);
+                       if (IS_ERR(spobj))
+                               GOTO(out, rc = PTR_ERR(spobj));
+
+                       /* parent stripe unchanged */
+                       if (spobj == tpobj) {
+                               if (!lmv_is_restriping(lmv))
+                                       GOTO(out, rc = -EINVAL);
+                               GOTO(out, rc = -EALREADY);
+                       }
+               } else {
+                       spobj = tpobj;
+                       mdd_object_get(spobj);
+               }
+       } else {
+               tpobj = pobj;
+               spobj = pobj;
+               mdd_object_get(tpobj);
+               mdd_object_get(spobj);
+       }
+
+       rc = mdd_migrate_object(env, spobj, tpobj, sobj, tobj, lname, lname,
+                               spec, ma);
+       GOTO(out, rc);
+
+out:
        if (!IS_ERR_OR_NULL(spobj))
                mdd_object_put(env, spobj);
        if (!IS_ERR_OR_NULL(tpobj))
                mdd_object_put(env, tpobj);
-       lu_buf_free(&sbuf);
        lu_buf_free(&pbuf);
 
        return rc;