Whamcloud - gitweb
LU-14430 mdd: fix inheritance of big default ACLs
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index b15e8b4..c0e27bf 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_MDS
 
-#include <linux/kthread.h>
-
 #include <obd_class.h>
 #include <obd_support.h>
 #include <lustre_mds.h>
 #include <lustre_fid.h>
+#include <lustre_lmv.h>
+#include <lustre_idmap.h>
 
 #include "mdd_internal.h"
 
@@ -72,12 +72,12 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
             const struct lu_attr *pattr, const struct lu_name *lname,
             struct lu_fid* fid, int mask)
 {
-       const char *name                = lname->ln_name;
-       const struct dt_key *key        = (const struct dt_key *)name;
-       struct mdd_object *mdd_obj      = md2mdd_obj(pobj);
-       struct mdd_device *m            = mdo2mdd(pobj);
-       struct dt_object *dir           = mdd_object_child(mdd_obj);
-        int rc;
+       const char *name = lname->ln_name;
+       const struct dt_key *key = (const struct dt_key *)name;
+       struct mdd_object *mdd_obj = md2mdd_obj(pobj);
+       struct dt_object *dir = mdd_object_child(mdd_obj);
+       int rc;
+
        ENTRY;
 
        if (unlikely(mdd_is_dead_obj(mdd_obj)))
@@ -87,12 +87,13 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                RETURN(-ESTALE);
 
        if (mdd_object_remote(mdd_obj)) {
-               CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
-                      mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
+               CDEBUG(D_INFO, "%s: Object "DFID" located on remote server\n",
+                      mdd_obj_dev_name(mdd_obj),
+                      PFID(mdd_object_fid(mdd_obj)));
        }
 
        rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
-                                           MOR_TGT_PARENT);
+                                           DT_TGT_PARENT);
        if (rc)
                RETURN(rc);
 
@@ -123,7 +124,7 @@ int mdd_lookup(const struct lu_env *env,
 }
 
 /** Read the link EA into a temp buffer.
- * Uses the mdd_thread_info::mti_big_buf since it is generally large.
+ * Uses the mdd_thread_info::mti_link_buf since it is generally large.
  * A pointer to the buffer is stored in \a ldata::ld_buf.
  *
  * \retval 0 or error
@@ -167,9 +168,9 @@ static int __mdd_links_read(const struct lu_env *env,
        return linkea_init(ldata);
 }
 
-static int mdd_links_read(const struct lu_env *env,
-                         struct mdd_object *mdd_obj,
-                         struct linkea_data *ldata)
+int mdd_links_read(const struct lu_env *env,
+                  struct mdd_object *mdd_obj,
+                  struct linkea_data *ldata)
 {
        int rc;
 
@@ -220,7 +221,10 @@ static inline int mdd_parent_fid(const struct lu_env *env,
 
        ENTRY;
 
-       LASSERT(S_ISDIR(mdd_object_type(obj)));
+       LASSERTF(S_ISDIR(mdd_object_type(obj)),
+                "%s: FID "DFID" is not a directory type = %o\n",
+                mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)),
+                mdd_object_type(obj));
 
        buf = lu_buf_check_and_alloc(buf, PATH_MAX);
        if (buf->lb_buf == NULL)
@@ -231,6 +235,9 @@ static inline int mdd_parent_fid(const struct lu_env *env,
        if (rc != 0)
                GOTO(lookup, rc);
 
+       /* the obj is not locked, don't cache attributes */
+       mdd_invalidate(env, &obj->mod_obj);
+
        LASSERT(ldata.ld_leh != NULL);
        /* Directory should only have 1 parent */
        if (ldata.ld_leh->leh_reccount > 1)
@@ -257,61 +264,56 @@ int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
 }
 
 /*
- * return 1: if lf is the fid of the ancestor of p1;
+ * return 1: if \a tfid is the fid of the ancestor of \a mo;
  * return 0: if not;
- *
- * return -EREMOTE: if remote object is found, in this
- * case fid of remote object is saved to @pf;
- *
  * otherwise: values < 0, errors.
  */
 static int mdd_is_parent(const struct lu_env *env,
                        struct mdd_device *mdd,
-                       struct mdd_object *p1,
+                       struct mdd_object *mo,
                        const struct lu_attr *attr,
-                       const struct lu_fid *lf,
-                       struct lu_fid *pf)
+                       const struct lu_fid *tfid)
 {
-        struct mdd_object *parent = NULL;
-        struct lu_fid *pfid;
-        int rc;
-        ENTRY;
+       struct mdd_object *mp;
+       struct lu_fid *pfid;
+       int rc;
+
+       LASSERT(!lu_fid_eq(mdd_object_fid(mo), tfid));
+       pfid = &mdd_env_info(env)->mti_fid;
+
+       if (mdd_is_root(mdd, mdd_object_fid(mo)))
+               return 0;
+
+       if (mdd_is_root(mdd, tfid))
+               return 1;
 
-        LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
-        pfid = &mdd_env_info(env)->mti_fid;
+       rc = mdd_parent_fid(env, mo, attr, pfid);
+       if (rc)
+               return rc;
+
+       while (1) {
+               if (lu_fid_eq(pfid, tfid))
+                       return 1;
+
+               if (mdd_is_root(mdd, pfid))
+                       return 0;
+
+               mp = mdd_object_find(env, mdd, pfid);
+               if (IS_ERR(mp))
+                       return PTR_ERR(mp);
 
-        /* Check for root first. */
-        if (mdd_is_root(mdd, mdo2fid(p1)))
-                RETURN(0);
+               if (!mdd_object_exists(mp)) {
+                       mdd_object_put(env, mp);
+                       return -ENOENT;
+               }
 
-        for(;;) {
-               /* this is done recursively */
-               rc = mdd_parent_fid(env, p1, attr, pfid);
+               rc = mdd_parent_fid(env, mp, attr, pfid);
+               mdd_object_put(env, mp);
                if (rc)
-                       GOTO(out, rc);
-                if (mdd_is_root(mdd, pfid))
-                        GOTO(out, rc = 0);
-               if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
-                       GOTO(out, rc = 0);
-                if (lu_fid_eq(pfid, lf))
-                        GOTO(out, rc = 1);
-               if (parent != NULL)
-                       mdd_object_put(env, parent);
-
-               parent = mdd_object_find(env, mdd, pfid);
-               if (IS_ERR(parent))
-                       GOTO(out, rc = PTR_ERR(parent));
-
-               if (!mdd_object_exists(parent))
-                       GOTO(out, rc = -EINVAL);
+                       return rc;
+       }
 
-               p1 = parent;
-        }
-        EXIT;
-out:
-        if (parent && !IS_ERR(parent))
-                mdd_object_put(env, parent);
-        return rc;
+       return 0;
 }
 
 /*
@@ -319,36 +321,27 @@ out:
  *
  * returns 1: if fid is ancestor of @mo;
  * returns 0: if fid is not an ancestor of @mo;
- *
- * returns EREMOTE if remote object is found, fid of remote object is saved to
- * @fid;
- *
  * returns < 0: if error
  */
 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
-                 const struct lu_fid *fid, struct lu_fid *sfid)
+                 const struct lu_fid *fid)
 {
        struct mdd_device *mdd = mdo2mdd(mo);
        struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
        int rc;
        ENTRY;
 
+       if (!mdd_object_exists(md2mdd_obj(mo)))
+               RETURN(-ENOENT);
+
        if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
-               RETURN(0);
+               RETURN(-ENOTDIR);
 
        rc = mdd_la_get(env, md2mdd_obj(mo), attr);
        if (rc != 0)
                RETURN(rc);
 
-       rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
-       if (rc == 0) {
-               /* found root */
-               fid_zero(sfid);
-       } else if (rc == 1) {
-               /* found @fid is parent */
-               *sfid = *fid;
-               rc = 0;
-       }
+       rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
        RETURN(rc);
 }
 
@@ -363,8 +356,7 @@ int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
  *           -ve        other error
  *
  */
-static int mdd_dir_is_empty(const struct lu_env *env,
-                            struct mdd_object *dir)
+int mdd_dir_is_empty(const struct lu_env *env, struct mdd_object *dir)
 {
        struct dt_it     *it;
        struct dt_object *obj;
@@ -396,8 +388,12 @@ static int mdd_dir_is_empty(const struct lu_env *env,
 
                iops->put(env, it);
                iops->fini(env, it);
-       } else
+       } else {
                result = PTR_ERR(it);
+               /* -ENODEV means no valid stripe */
+               if (result == -ENODEV)
+                       RETURN(0);
+       }
        RETURN(result);
 }
 
@@ -461,7 +457,7 @@ int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
        if (check_perm)
                rc = mdd_permission_internal_locked(env, pobj, pattr,
                                                    MAY_WRITE | MAY_EXEC,
-                                                   MOR_TGT_PARENT);
+                                                   DT_TGT_PARENT);
        RETURN(rc);
 }
 
@@ -482,7 +478,7 @@ int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
 
        rc = mdd_permission_internal_locked(env, pobj, pattr,
                                            MAY_WRITE | MAY_EXEC,
-                                           MOR_TGT_PARENT);
+                                           DT_TGT_PARENT);
        if (rc != 0)
                RETURN(rc);
 
@@ -536,7 +532,7 @@ static int mdd_may_delete_entry(const struct lu_env *env,
                int rc;
                rc = mdd_permission_internal_locked(env, pobj, pattr,
                                            MAY_WRITE | MAY_EXEC,
-                                           MOR_TGT_PARENT);
+                                           DT_TGT_PARENT);
                if (rc)
                        RETURN(rc);
        }
@@ -584,12 +580,11 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *tpobj,
        /* additional check the rename case */
        if (cattr) {
                if (S_ISDIR(cattr->la_mode)) {
-                       struct mdd_device *mdd = mdo2mdd(&tobj->mod_obj);
-
                        if (!S_ISDIR(tattr->la_mode))
                                RETURN(-ENOTDIR);
 
-                       if (lu_fid_eq(mdo2fid(tobj), &mdd->mdd_root_fid))
+                       if (mdd_is_root(mdo2mdd(&tobj->mod_obj),
+                                       mdd_object_fid(tobj)))
                                RETURN(-EBUSY);
                } else if (S_ISDIR(tattr->la_mode))
                        RETURN(-EISDIR);
@@ -681,15 +676,11 @@ static int __mdd_index_insert_only(const struct lu_env *env,
 
        if (dt_try_as_dir(env, next)) {
                struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
-               struct lu_ucred         *uc  = lu_ucred_check(env);
-               int                      ignore_quota;
 
                rec->rec_fid = lf;
                rec->rec_type = type;
-               ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
                rc = dt_insert(env, next, (const struct dt_rec *)rec,
-                              (const struct dt_key *)name, handle,
-                              ignore_quota);
+                              (const struct dt_key *)name, handle);
        } else {
                rc = -ENOTDIR;
        }
@@ -706,7 +697,7 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
 
        rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
        if (rc == 0 && S_ISDIR(type)) {
-               mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+               mdd_write_lock(env, pobj, DT_TGT_PARENT);
                mdo_ref_add(env, pobj, handle);
                mdd_write_unlock(env, pobj);
        }
@@ -719,17 +710,17 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
                              const char *name, int is_dir,
                              struct thandle *handle)
 {
-        int               rc;
-        ENTRY;
+       int rc;
+       ENTRY;
 
        rc = __mdd_index_delete_only(env, pobj, name, handle);
-        if (rc == 0 && is_dir) {
-                mdd_write_lock(env, pobj, MOR_TGT_PARENT);
-                mdo_ref_del(env, pobj, handle);
-                mdd_write_unlock(env, pobj);
-        }
+       if (rc == 0 && is_dir) {
+               mdd_write_lock(env, pobj, DT_TGT_PARENT);
+               mdo_ref_del(env, pobj, handle);
+               mdd_write_unlock(env, pobj);
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdd_llog_record_calc_size(const struct lu_env *env,
@@ -737,23 +728,24 @@ static int mdd_llog_record_calc_size(const struct lu_env *env,
                                     const struct lu_name *sname)
 {
        const struct lu_ucred   *uc = lu_ucred(env);
-       enum changelog_rec_flags crf = CLF_EXTRA_FLAGS;
-       enum changelog_rec_extra_flags crfe = CLFE_UIDGID;
+       enum changelog_rec_flags clf_flags = CLF_EXTRA_FLAGS;
+       enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
 
        if (sname != NULL)
-               crf |= CLF_RENAME;
+               clf_flags |= CLF_RENAME;
 
        if (uc != NULL && uc->uc_jobid[0] != '\0')
-               crf |= CLF_JOBID;
+               clf_flags |= CLF_JOBID;
 
        return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
-                            changelog_rec_offset(crf, crfe) +
+                            changelog_rec_offset(clf_flags, crfe) +
                             (tname != NULL ? tname->ln_namelen : 0) +
                             (sname != NULL ? 1 + sname->ln_namelen : 0));
 }
 
 int mdd_declare_changelog_store(const struct lu_env *env,
                                struct mdd_device *mdd,
+                               enum changelog_rec_type type,
                                const struct lu_name *tname,
                                const struct lu_name *sname,
                                struct thandle *handle)
@@ -766,8 +758,7 @@ int mdd_declare_changelog_store(const struct lu_env *env,
        int                              reclen;
        int                              rc;
 
-       /* Not recording */
-       if (!(mdd->mdd_cl.mc_flags & CLM_ON))
+       if (!mdd_changelog_enabled(env, mdd, type))
                return 0;
 
        reclen = mdd_llog_record_calc_size(env, tname, sname);
@@ -795,133 +786,44 @@ out_put:
        return rc;
 }
 
-struct mdd_changelog_gc {
-       struct mdd_device *mcgc_mdd;
-       bool mcgc_found;
-       __u32 mcgc_maxtime;
-       __u64 mcgc_maxindexes;
-       __u32 mcgc_id;
-};
-
-/* return first registered ChangeLog user idle since too long
- * use ChangeLog's user plain LLOG mtime for this */
-static int mdd_changelog_gc_cb(const struct lu_env *env,
-                              struct llog_handle *llh,
-                              struct llog_rec_hdr *hdr, void *data)
+int mdd_changelog_write_rec(const struct lu_env *env,
+                           struct llog_handle *loghandle,
+                           struct llog_rec_hdr *r,
+                           struct llog_cookie *cookie,
+                           int idx, struct thandle *th)
 {
-       struct llog_changelog_user_rec  *rec;
-       struct mdd_changelog_gc *mcgc = (struct mdd_changelog_gc *)data;
-       struct mdd_device *mdd = mcgc->mcgc_mdd;
-       ENTRY;
+       int rc;
 
-       if ((llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN) == 0)
-               RETURN(-ENXIO);
+       if (r->lrh_type == CHANGELOG_REC) {
+               struct mdd_device *mdd;
+               struct llog_changelog_rec *rec;
 
-       rec = container_of(hdr, struct llog_changelog_user_rec,
-                          cur_hdr);
+               mdd = lu2mdd_dev(loghandle->lgh_ctxt->loc_obd->obd_lu_dev);
+               rec = container_of(r, struct llog_changelog_rec, cr_hdr);
 
-       /* find oldest idle user, based on last record update/cancel time (new
-        * behavior), or for old user records, last record index vs current
-        * ChangeLog index. Late users with old record format will be treated
-        * first as we assume they could be idle since longer
-        */
-       if (rec->cur_time != 0) {
-               __u32 time_now = (__u32)get_seconds();
-               __u32 time_out = rec->cur_time +
-                                mdd->mdd_changelog_max_idle_time;
-               __u32 idle_time = time_now - rec->cur_time;
-
-               /* treat oldest idle user first, and if no old format user
-                * has been already selected
-                */
-               if (time_after32(time_now, time_out) &&
-                   idle_time > mcgc->mcgc_maxtime &&
-                   mcgc->mcgc_maxindexes == 0) {
-                       mcgc->mcgc_maxtime = idle_time;
-                       mcgc->mcgc_id = rec->cur_id;
-                       mcgc->mcgc_found = true;
-               }
-       } else {
-               /* old user record with no idle time stamp, so use empirical
-                * method based on its current index/position
-                */
-               __u64 idle_indexes;
+               spin_lock(&mdd->mdd_cl.mc_lock);
+               rec->cr.cr_index = mdd->mdd_cl.mc_index + 1;
+               spin_unlock(&mdd->mdd_cl.mc_lock);
 
-               idle_indexes = mdd->mdd_cl.mc_index - rec->cur_endrec;
+               rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
+                                               cookie, idx, th);
 
-               /* treat user with the oldest/smallest current index first */
-               if (idle_indexes >= mdd->mdd_changelog_max_idle_indexes &&
-                   idle_indexes > mcgc->mcgc_maxindexes) {
-                       mcgc->mcgc_maxindexes = idle_indexes;
-                       mcgc->mcgc_id = rec->cur_id;
-                       mcgc->mcgc_found = true;
+               /*
+                * if current llog is full, we will generate a new
+                * llog, and since it's actually not an error, let's
+                * avoid increasing index so that userspace apps
+                * should not see a gap in the changelog sequence
+                */
+               if (!(rc == -ENOSPC && llog_is_full(loghandle))) {
+                       spin_lock(&mdd->mdd_cl.mc_lock);
+                       ++mdd->mdd_cl.mc_index;
+                       spin_unlock(&mdd->mdd_cl.mc_lock);
                }
-
-       }
-       RETURN(0);
-}
-
-/* recover space from long-term inactive ChangeLog users */
-static int mdd_chlg_garbage_collect(void *data)
-{
-       struct mdd_device *mdd = (struct mdd_device *)data;
-       struct lu_env             *env = NULL;
-       int                        rc;
-       struct llog_ctxt *ctxt;
-       struct mdd_changelog_gc mcgc = {
-               .mcgc_mdd = mdd,
-               .mcgc_found = false,
-               .mcgc_maxtime = 0,
-               .mcgc_maxindexes = 0,
-       };
-       ENTRY;
-
-       CDEBUG(D_HA, "%s: ChangeLog garbage collect thread start\n",
-              mdd2obd_dev(mdd)->obd_name);
-
-       OBD_ALLOC_PTR(env);
-       if (env == NULL)
-               GOTO(out, rc = -ENOMEM);
-
-       rc = lu_env_init(env, LCT_MD_THREAD);
-       if (rc)
-               GOTO(out, rc);
-
-       for (;;) {
-               ctxt = llog_get_context(mdd2obd_dev(mdd),
-                                       LLOG_CHANGELOG_USER_ORIG_CTXT);
-               if (ctxt == NULL ||
-                   (ctxt->loc_handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT) == 0)
-                       GOTO(out_env, rc = -ENXIO);
-
-               rc = llog_cat_process(env, ctxt->loc_handle,
-                                     mdd_changelog_gc_cb, &mcgc, 0, 0);
-               if (rc != 0 || mcgc.mcgc_found == false)
-                       break;
-               llog_ctxt_put(ctxt);
-
-               CWARN("%s: Force deregister of ChangeLog user cl%d idle more "
-                     "than %us\n", mdd2obd_dev(mdd)->obd_name, mcgc.mcgc_id,
-                     mcgc.mcgc_maxtime);
-
-               mdd_changelog_user_purge(env, mdd, mcgc.mcgc_id);
-
-               /* try again to search for another candidate */
-               mcgc.mcgc_found = false;
-               mcgc.mcgc_maxtime = 0;
-               mcgc.mcgc_maxindexes = 0;
+       } else {
+               rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
+                                               cookie, idx, th);
        }
 
-out_env:
-       if (ctxt != NULL)
-               llog_ctxt_put(ctxt);
-
-       lu_env_fini(env);
-       GOTO(out, rc);
-out:
-       if (env)
-               OBD_FREE_PTR(env);
-       mdd->mdd_cl.mc_gc_task = NULL;
        return rc;
 }
 
@@ -939,7 +841,6 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
        struct llog_ctxt        *ctxt;
        struct thandle          *llog_th;
        int                      rc;
-       bool                     run_gc_task;
 
        rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) +
                                            changelog_rec_varsize(&rec->cr));
@@ -948,13 +849,6 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
        rec->cr_hdr.lrh_type = CHANGELOG_REC;
        rec->cr.cr_time = cl_time();
 
-       spin_lock(&mdd->mdd_cl.mc_lock);
-       /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
-        * but as long as the MDD transactions are ordered correctly for e.g.
-        * rename conflicts, I don't think this should matter. */
-       rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
-       spin_unlock(&mdd->mdd_cl.mc_lock);
-
        ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
        if (ctxt == NULL)
                return -ENXIO;
@@ -963,44 +857,42 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
        if (IS_ERR(llog_th))
                GOTO(out_put, rc = PTR_ERR(llog_th));
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_CHANGELOG_REORDER, cfs_fail_val);
        /* nested journal transaction */
        rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
 
        /* time to recover some space ?? */
+       if (likely(!mdd->mdd_changelog_gc ||
+                  mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
+                  mdd->mdd_changelog_min_gc_interval >=
+                       ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
+               /* save a spin_lock trip */
+               goto out_put;
        spin_lock(&mdd->mdd_cl.mc_lock);
-       if (unlikely(mdd->mdd_changelog_gc && (ktime_get_real_seconds() -
-           mdd->mdd_cl.mc_gc_time > mdd->mdd_changelog_min_gc_interval) &&
-           mdd->mdd_cl.mc_gc_task == NULL &&
-           llog_cat_free_space(ctxt->loc_handle) <=
-                               mdd->mdd_changelog_min_free_cat_entries)) {
-               CWARN("%s: low on changelog_catalog free entries, starting "
-                     "ChangeLog garbage collection thread\n", obd->obd_name);
-
-               /* indicate further kthread run will occur outside right after
-                * critical section
+       if (likely(mdd->mdd_changelog_gc &&
+                    mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
+                    ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
+                       mdd->mdd_changelog_min_gc_interval)) {
+               if (unlikely(llog_cat_free_space(ctxt->loc_handle) <=
+                            mdd->mdd_changelog_min_free_cat_entries ||
+                            OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD))) {
+                       CWARN("%s:%s low on changelog_catalog free entries, "
+                             "starting ChangeLog garbage collection thread\n",
+                             obd->obd_name,
+                             OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
+                               " simulate" : "");
+
+                       /* indicate further kthread run will occur outside
+                        * right after current journal transaction filling has
+                        * completed
+                        */
+                       mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
+               }
+               /* next check in mdd_changelog_min_gc_interval anyway
                 */
-               mdd->mdd_cl.mc_gc_task = (struct task_struct *)(-1);
-               run_gc_task = true;
+               mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
        }
        spin_unlock(&mdd->mdd_cl.mc_lock);
-       if (run_gc_task) {
-               struct task_struct *gc_task;
-
-               gc_task = kthread_run(mdd_chlg_garbage_collect, mdd,
-                                     "chlg_gc_thread");
-               if (IS_ERR(gc_task)) {
-                       CERROR("%s: cannot start ChangeLog garbage collection "
-                              "thread: rc = %ld\n", obd->obd_name,
-                              PTR_ERR(gc_task));
-                       mdd->mdd_cl.mc_gc_task = NULL;
-               } else {
-                       CDEBUG(D_HA, "%s: ChangeLog garbage collection thread "
-                              "has started with Pid %d\n", obd->obd_name,
-                              gc_task->pid);
-                       mdd->mdd_cl.mc_gc_task = gc_task;
-                       mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
-               }
-       }
 out_put:
        llog_ctxt_put(ctxt);
        if (rc > 0)
@@ -1014,12 +906,14 @@ static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
                                         const struct lu_name *sname)
 {
        struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
-       size_t extsize = sname->ln_namelen + 1;
+       size_t extsize;
 
        LASSERT(sfid != NULL);
        LASSERT(spfid != NULL);
        LASSERT(sname != NULL);
 
+       extsize = sname->ln_namelen + 1;
+
        rnm->cr_sfid = *sfid;
        rnm->cr_spfid = *spfid;
 
@@ -1054,6 +948,29 @@ void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
        uidgid->cr_gid = gid;
 }
 
+void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
+                                lnet_nid_t nid)
+{
+       struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
+
+       clnid->cr_nid = nid;
+}
+
+void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, u32 flags)
+{
+       struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
+
+       omd->cr_openflags = flags;
+}
+
+void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
+                                  const char *xattr_name)
+{
+       struct changelog_ext_xattr *xattr = changelog_rec_xattr(rec);
+
+       strlcpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
+}
+
 /** Store a namespace change changelog record
  * If this fails, we must fail the whole transaction; we don't
  * want the change to commit without the log entry.
@@ -1068,7 +985,7 @@ void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
 int mdd_changelog_ns_store(const struct lu_env *env,
                           struct mdd_device *mdd,
                           enum changelog_rec_type type,
-                          enum changelog_rec_flags crf,
+                          enum changelog_rec_flags clf_flags,
                           struct mdd_object *target,
                           const struct lu_fid *tpfid,
                           const struct lu_fid *sfid,
@@ -1085,11 +1002,7 @@ int mdd_changelog_ns_store(const struct lu_env *env,
        int                              rc;
        ENTRY;
 
-       /* Not recording */
-       if (!(mdd->mdd_cl.mc_flags & CLM_ON))
-               RETURN(0);
-
-       if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
+       if (!mdd_changelog_enabled(env, mdd, type))
                RETURN(0);
 
        LASSERT(tpfid != NULL);
@@ -1102,26 +1015,30 @@ int mdd_changelog_ns_store(const struct lu_env *env,
                RETURN(-ENOMEM);
        rec = buf->lb_buf;
 
-       crf &= CLF_FLAGMASK;
-       crf |= CLF_EXTRA_FLAGS;
+       clf_flags &= CLF_FLAGMASK;
+       clf_flags |= CLF_EXTRA_FLAGS;
 
-       if (uc != NULL && uc->uc_jobid[0] != '\0')
-               crf |= CLF_JOBID;
+       if (uc) {
+               if (uc->uc_jobid[0] != '\0')
+                       clf_flags |= CLF_JOBID;
+               xflags |= CLFE_UIDGID;
+               xflags |= CLFE_NID;
+       }
 
        if (sname != NULL)
-               crf |= CLF_RENAME;
+               clf_flags |= CLF_RENAME;
        else
-               crf |= CLF_VERSION;
+               clf_flags |= CLF_VERSION;
 
-       xflags |= CLFE_UIDGID;
+       rec->cr.cr_flags = clf_flags;
 
-       rec->cr.cr_flags = crf;
-
-       if (crf & CLF_EXTRA_FLAGS) {
+       if (clf_flags & CLF_EXTRA_FLAGS) {
                mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
                if (xflags & CLFE_UIDGID)
                        mdd_changelog_rec_extra_uidgid(&rec->cr,
                                                       uc->uc_uid, uc->uc_gid);
+               if (xflags & CLFE_NID)
+                       mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
        }
 
        rec->cr.cr_type = (__u32)type;
@@ -1129,14 +1046,14 @@ int mdd_changelog_ns_store(const struct lu_env *env,
        rec->cr.cr_namelen = tname->ln_namelen;
        memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
 
-       if (crf & CLF_RENAME)
+       if (clf_flags & CLF_RENAME)
                mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
 
-       if (crf & CLF_JOBID)
+       if (clf_flags & CLF_JOBID)
                mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
 
        if (likely(target != NULL)) {
-               rec->cr.cr_tfid = *mdo2fid(target);
+               rec->cr.cr_tfid = *mdd_object_fid(target);
                target->mod_cltime = ktime_get();
        } else {
                fid_zero(&rec->cr.cr_tfid);
@@ -1337,7 +1254,7 @@ static inline int mdd_links_del(const struct lu_env *env,
 /** Read the link EA into a temp buffer.
  * Uses the name_buf since it is generally large.
  * \retval IS_ERR err
- * \retval ptr to \a lu_buf (always \a mti_big_buf)
+ * \retval ptr to \a lu_buf (always \a mti_link_buf)
  */
 struct lu_buf *mdd_links_get(const struct lu_env *env,
                             struct mdd_object *mdd_obj)
@@ -1422,7 +1339,7 @@ static int mdd_declare_link(const struct lu_env *env,
                            struct lu_attr *la,
                            struct linkea_data *data)
 {
-       struct lu_fid tfid = *mdo2fid(c);
+       struct lu_fid tfid = *mdd_object_fid(c);
        int rc;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
@@ -1451,7 +1368,8 @@ static int mdd_declare_link(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
-       rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
+       rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
+                                        handle);
 
        return rc;
 }
@@ -1502,8 +1420,8 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
        /* Note: even this function will change ldata, but it comes from
         * thread_info, which is completely temporary and only seen in
         * this function, so we do not need reset ldata once it fails.*/
-       rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdo2fid(mdd_tobj),
-                               lname, 0, 0, ldata);
+       rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
+                               mdd_object_fid(mdd_tobj), lname, 0, 0, ldata);
        if (rc != 0)
                GOTO(stop, rc);
 
@@ -1516,7 +1434,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         if (rc)
                 GOTO(stop, rc);
 
-       mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+       mdd_write_lock(env, mdd_sobj, DT_TGT_CHILD);
        rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
                                   cattr);
        if (rc)
@@ -1528,7 +1446,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                        GOTO(out_unlock, rc);
        }
 
-       *tfid = *mdo2fid(mdd_sobj);
+       *tfid = *mdd_object_fid(mdd_sobj);
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
                tfid->f_oid = cfs_fail_val;
 
@@ -1549,7 +1467,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
        if (rc == 0)
                /* Note: The failure of links_add should not cause the
                 * link failure, so do not check return value. */
-               mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
+               mdd_links_add(env, mdd_sobj, mdd_object_fid(mdd_tobj),
                              lname, handle, ldata, 0);
 
        EXIT;
@@ -1557,8 +1475,8 @@ out_unlock:
        mdd_write_unlock(env, mdd_sobj);
        if (rc == 0)
                rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
-                                           mdo2fid(mdd_tobj), NULL, NULL,
-                                           lname, NULL, handle);
+                                           mdd_object_fid(mdd_tobj), NULL,
+                                           NULL, lname, NULL, handle);
 stop:
        rc = mdd_trans_stop(env, mdd, rc, handle);
        if (is_vmalloc_addr(ldata->ld_buf))
@@ -1575,9 +1493,6 @@ static int mdd_mark_orphan_object(const struct lu_env *env,
        struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
        int rc;
 
-       if (!S_ISDIR(mdd_object_type(obj)))
-               return 0;
-
        attr->la_valid = LA_FLAGS;
        attr->la_flags = LUSTRE_ORPHAN_FL;
 
@@ -1593,7 +1508,7 @@ static int mdd_declare_finish_unlink(const struct lu_env *env,
                                     struct mdd_object *obj,
                                     struct thandle *handle)
 {
-       int     rc;
+       int rc;
 
        /* Sigh, we do not know if the unlink object will become orphan in
         * declare phase, but fortunately the flags here does not matter
@@ -1606,7 +1521,7 @@ static int mdd_declare_finish_unlink(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
-       rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
+       rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
        if (rc != 0)
                return rc;
 
@@ -1616,7 +1531,7 @@ static int mdd_declare_finish_unlink(const struct lu_env *env,
 /* caller should take a lock before calling */
 int mdd_finish_unlink(const struct lu_env *env,
                      struct mdd_object *obj, struct md_attr *ma,
-                     const struct mdd_object *pobj,
+                     struct mdd_object *pobj,
                      const struct lu_name *lname,
                      struct thandle *th)
 {
@@ -1631,7 +1546,7 @@ int mdd_finish_unlink(const struct lu_env *env,
                 * will be deleted during mdd_close() */
                obj->mod_flags |= DEAD_OBJ;
                if (obj->mod_count) {
-                       rc = __mdd_orphan_add(env, obj, th);
+                       rc = mdd_orphan_insert(env, obj, th);
                        if (rc == 0)
                                CDEBUG(D_HA, "Object "DFID" is inserted into "
                                        "orphan list, open count = %d\n",
@@ -1645,7 +1560,7 @@ int mdd_finish_unlink(const struct lu_env *env,
                                        obj->mod_count);
 
                        /* mark object as an orphan here, not
-                        * before __mdd_orphan_add() as racing
+                        * before mdd_orphan_insert() as racing
                         * mdd_la_get() may propagate ORPHAN_OBJ
                         * causing the asserition */
                        rc = mdd_mark_orphan_object(env, obj, th, false);
@@ -1654,7 +1569,7 @@ int mdd_finish_unlink(const struct lu_env *env,
                }
        } else if (!is_dir) {
                /* old files may not have link ea; ignore errors */
-               mdd_links_del(env, obj, mdo2fid(pobj), lname, th);
+               mdd_links_del(env, obj, mdd_object_fid(pobj), lname, th);
        }
 
        RETURN(rc);
@@ -1726,7 +1641,8 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
                        return rc;
 
                /* FIXME: need changelog for remove entry */
-               rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
+               rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
+                                                NULL, handle);
        }
 
        return rc;
@@ -1788,6 +1704,9 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
        int rc, is_dir = 0, cl_flags = 0;
        ENTRY;
 
+       /* let shutdown to start */
+       CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 1);
+
        /* cobj == NULL means only delete name entry */
        if (likely(cobj != NULL)) {
                mdd_cobj = md2mdd_obj(cobj);
@@ -1831,7 +1750,7 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                GOTO(stop, rc);
 
        if (likely(mdd_cobj != NULL))
-               mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
+               mdd_write_lock(env, mdd_cobj, DT_TGT_CHILD);
 
        if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
                rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
@@ -1847,7 +1766,7 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                rc = mdo_ref_del(env, mdd_cobj, handle);
                if (rc != 0) {
                        __mdd_index_insert_only(env, mdd_pobj,
-                                               mdo2fid(mdd_cobj),
+                                               mdd_object_fid(mdd_cobj),
                                                mdd_object_type(mdd_cobj),
                                                name, handle);
                        GOTO(cleanup, rc);
@@ -1918,8 +1837,8 @@ cleanup:
 
                rc = mdd_changelog_ns_store(env, mdd,
                        is_dir ? CL_RMDIR : CL_UNLINK, cl_flags,
-                       mdd_cobj, mdo2fid(mdd_pobj), NULL, NULL, lname, NULL,
-                       handle);
+                       mdd_cobj, mdd_object_fid(mdd_pobj), NULL, NULL,
+                       lname, NULL, handle);
        }
 
 stop:
@@ -2003,7 +1922,8 @@ static int mdd_create_data(const struct lu_env *env,
        if (rc)
                GOTO(stop, rc);
 
-       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+       rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
+                                        handle);
        if (rc)
                GOTO(stop, rc);
 
@@ -2017,7 +1937,8 @@ static int mdd_create_data(const struct lu_env *env,
        if (rc)
                GOTO(stop, rc);
 
-       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
+       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle,
+                                     NULL);
 
 stop:
        rc = mdd_trans_stop(env, mdd, rc, handle);
@@ -2039,8 +1960,8 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
        if (!S_ISDIR(attr->la_mode))
                RETURN(0);
 
-       rc = mdo_declare_index_insert(env, child, mdo2fid(child), S_IFDIR,
-                                     dot, handle);
+       rc = mdo_declare_index_insert(env, child, mdd_object_fid(child),
+                                     S_IFDIR, dot, handle);
        if (rc != 0)
                RETURN(rc);
 
@@ -2048,8 +1969,8 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
        if (rc != 0)
                RETURN(rc);
 
-       rc = mdo_declare_index_insert(env, child, mdo2fid(parent), S_IFDIR,
-                                     dotdot, handle);
+       rc = mdo_declare_index_insert(env, child, mdd_object_fid(parent),
+                                     S_IFDIR, dotdot, handle);
 
        RETURN(rc);
 }
@@ -2057,16 +1978,16 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
 static int mdd_object_initialize(const struct lu_env *env,
                                 const struct lu_fid *pfid,
                                 struct mdd_object *child,
-                                struct lu_attr *attr, struct thandle *handle,
-                                const struct md_op_spec *spec)
+                                struct lu_attr *attr,
+                                struct thandle *handle)
 {
        int rc = 0;
        ENTRY;
 
        if (S_ISDIR(attr->la_mode)) {
-                /* Add "." and ".." for newly created dir */
-                mdo_ref_add(env, child, handle);
-                rc = __mdd_index_insert_only(env, child, mdo2fid(child),
+               /* Add "." and ".." for newly created dir */
+               mdo_ref_add(env, child, handle);
+               rc = __mdd_index_insert_only(env, child, mdd_object_fid(child),
                                             S_IFDIR, dot, handle);
                if (rc == 0)
                        rc = __mdd_index_insert_only(env, child, pfid, S_IFDIR,
@@ -2139,8 +2060,7 @@ static int mdd_create_sanity_check(const struct lu_env *env,
            spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
                const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
 
-               if (le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC &&
-                   le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_SPECIFIC &&
+               if (!lmv_user_magic_supported(le32_to_cpu(lum->lum_magic)) &&
                    le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
                        rc = -EINVAL;
                        CERROR("%s: invalid lmv_user_md: magic = %x, "
@@ -2157,12 +2077,24 @@ static int mdd_create_sanity_check(const struct lu_env *env,
        if (rc != 0)
                RETURN(rc);
 
-        /* sgid check */
+       /* sgid check */
        if (pattr->la_mode & S_ISGID) {
+               struct lu_ucred *uc = lu_ucred(env);
+
                cattr->la_gid = pattr->la_gid;
+
+               /* Directories are special, and always inherit S_ISGID */
                if (S_ISDIR(cattr->la_mode)) {
                        cattr->la_mode |= S_ISGID;
                        cattr->la_valid |= LA_MODE;
+               } else if ((cattr->la_mode & (S_ISGID | S_IXGRP))
+                               == (S_ISGID | S_IXGRP) &&
+                          !lustre_in_group_p(uc,
+                                             (cattr->la_valid & LA_GID) ?
+                                             cattr->la_gid : pattr->la_gid) &&
+                          !md_capable(uc, CFS_CAP_FSETID)) {
+                       cattr->la_mode &= ~S_ISGID;
+                       cattr->la_valid |= LA_MODE;
                }
        }
 
@@ -2212,6 +2144,7 @@ static int mdd_declare_create_object(const struct lu_env *env,
                                     const struct md_op_spec *spec,
                                     struct lu_buf *def_acl_buf,
                                     struct lu_buf *acl_buf,
+                                    struct lu_buf *hsm_buf,
                                     struct dt_allocation_hint *hint)
 {
        const struct lu_buf *buf;
@@ -2222,8 +2155,8 @@ static int mdd_declare_create_object(const struct lu_env *env,
        if (rc)
                GOTO(out, rc);
 
-#ifdef CONFIG_FS_POSIX_ACL
-       if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
+       if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
                /* if dir, then can inherit default ACl */
                rc = mdo_declare_xattr_set(env, c, def_acl_buf,
                                           XATTR_NAME_ACL_DEFAULT,
@@ -2232,7 +2165,7 @@ static int mdd_declare_create_object(const struct lu_env *env,
                        GOTO(out, rc);
        }
 
-       if (acl_buf->lb_len > 0) {
+       if (acl_buf && acl_buf->lb_len > 0) {
                rc = mdo_declare_attr_set(env, c, attr, handle);
                if (rc)
                        GOTO(out, rc);
@@ -2248,14 +2181,24 @@ static int mdd_declare_create_object(const struct lu_env *env,
                GOTO(out, rc);
 
        /* replay case, create LOV EA from client data */
-       if (spec->no_create ||
+       if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
            (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
                buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
                                        spec->u.sp_ea.eadatalen);
-               rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
-                                          handle);
+               rc = mdo_declare_xattr_set(env, c, buf,
+                                          S_ISDIR(attr->la_mode) ?
+                                               XATTR_NAME_LMV : XATTR_NAME_LOV,
+                                          LU_XATTR_CREATE, handle);
                if (rc)
                        GOTO(out, rc);
+
+               if (spec->sp_cr_flags & MDS_OPEN_PCC) {
+                       rc = mdo_declare_xattr_set(env, c, hsm_buf,
+                                                  XATTR_NAME_HSM,
+                                                  0, handle);
+                       if (rc)
+                               GOTO(out, rc);
+               }
        }
 
        if (S_ISLNK(attr->la_mode)) {
@@ -2279,6 +2222,16 @@ static int mdd_declare_create_object(const struct lu_env *env,
                if (rc < 0)
                        GOTO(out, rc);
        }
+
+       if (spec->sp_cr_file_encctx != NULL) {
+               buf = mdd_buf_get_const(env, spec->sp_cr_file_encctx,
+                                       spec->sp_cr_file_encctx_size);
+               rc = mdo_declare_xattr_set(env, c, buf,
+                                          LL_XATTR_NAME_ENCRYPTION_CONTEXT, 0,
+                                          handle);
+               if (rc < 0)
+                       GOTO(out, rc);
+       }
 out:
        return rc;
 }
@@ -2292,12 +2245,13 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                              struct linkea_data *ldata,
                              struct lu_buf *def_acl_buf,
                              struct lu_buf *acl_buf,
+                             struct lu_buf *hsm_buf,
                              struct dt_allocation_hint *hint)
 {
        int rc;
 
        rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
-                                      def_acl_buf, acl_buf, hint);
+                                      def_acl_buf, acl_buf, hsm_buf, hint);
        if (rc)
                GOTO(out, rc);
 
@@ -2308,14 +2262,16 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
        }
 
        if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
-               rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
+               rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
                if (rc)
                        GOTO(out, rc);
        } else {
-               struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
+               struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
+               enum changelog_rec_type type;
 
-               rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
-                                             name->ln_name, handle);
+               rc = mdo_declare_index_insert(env, p, mdd_object_fid(c),
+                                             attr->la_mode, name->ln_name,
+                                             handle);
                if (rc != 0)
                        return rc;
 
@@ -2329,7 +2285,12 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                if (rc)
                        return rc;
 
-               rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
+               type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
+                      S_ISREG(attr->la_mode) ? CL_CREATE :
+                      S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
+
+               rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
+                                                handle);
                if (rc)
                        return rc;
        }
@@ -2342,6 +2303,7 @@ static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
                        struct lu_buf *acl_buf)
 {
        int     rc;
+
        ENTRY;
 
        if (S_ISLNK(la->la_mode)) {
@@ -2350,7 +2312,7 @@ static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
                RETURN(0);
        }
 
-       mdd_read_lock(env, pobj, MOR_TGT_PARENT);
+       mdd_read_lock(env, pobj, DT_TGT_PARENT);
        rc = mdo_xattr_get(env, pobj, def_acl_buf,
                           XATTR_NAME_ACL_DEFAULT);
        mdd_read_unlock(env, pobj);
@@ -2386,13 +2348,14 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
                             struct mdd_object *son, struct lu_attr *attr,
                             struct md_op_spec *spec, struct lu_buf *acl_buf,
                             struct lu_buf *def_acl_buf,
+                            struct lu_buf *hsm_buf,
                             struct dt_allocation_hint *hint,
-                            struct thandle *handle)
+                            struct thandle *handle, bool initsecctx)
 {
        const struct lu_buf *buf;
        int rc;
 
-       mdd_write_lock(env, son, MOR_TGT_CHILD);
+       mdd_write_lock(env, son, DT_TGT_CHILD);
        rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
                                        hint);
        if (rc)
@@ -2402,8 +2365,8 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
         * created in declare phase, they also needs to be added to master
         * object as sub-directory entry. So it has to initialize the master
         * object, then set dir striped EA.(in mdo_xattr_set) */
-       rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
-                                  spec);
+       rc = mdd_object_initialize(env, mdd_object_fid(pobj), son, attr,
+                                  handle);
        if (rc != 0)
                GOTO(err_destroy, rc);
 
@@ -2429,13 +2392,26 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
                                        spec->u.sp_ea.eadatalen);
                rc = mdo_xattr_set(env, son, buf,
                                   S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
-                                                           XATTR_NAME_LOV, 0,
-                                  handle);
+                                                           XATTR_NAME_LOV,
+                                  LU_XATTR_CREATE, handle);
+               if (rc != 0)
+                       GOTO(err_destroy, rc);
+       }
+
+       if (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_PCC) {
+               struct md_hsm mh;
+
+               memset(&mh, 0, sizeof(mh));
+               mh.mh_flags = HS_EXISTS | HS_ARCHIVED | HS_RELEASED;
+               mh.mh_arch_id = spec->sp_archive_id;
+               lustre_hsm2buf(hsm_buf->lb_buf, &mh);
+               rc = mdo_xattr_set(env, son, hsm_buf, XATTR_NAME_HSM,
+                                  0, handle);
                if (rc != 0)
                        GOTO(err_destroy, rc);
        }
 
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
        if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
            S_ISDIR(attr->la_mode)) {
                /* set default acl */
@@ -2456,24 +2432,20 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
 #endif
 
        if (S_ISLNK(attr->la_mode)) {
-               struct lu_ucred  *uc = lu_ucred_assert(env);
                struct dt_object *dt = mdd_object_child(son);
                const char *target_name = spec->u.sp_symname;
                int sym_len = strlen(target_name);
                loff_t pos = 0;
 
                buf = mdd_buf_get_const(env, target_name, sym_len);
-               rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
-                                               uc->uc_cap &
-                                               CFS_CAP_SYS_RESOURCE_MASK);
-
+               rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle);
                if (rc == sym_len)
                        rc = 0;
                else
                        GOTO(err_initlized, rc = -EFAULT);
        }
 
-       if (spec->sp_cr_file_secctx_name != NULL) {
+       if (initsecctx && spec->sp_cr_file_secctx_name != NULL) {
                buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
                                        spec->sp_cr_file_secctx_size);
                rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
@@ -2482,6 +2454,16 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
                        GOTO(err_initlized, rc);
        }
 
+       if (spec->sp_cr_file_encctx != NULL) {
+               buf = mdd_buf_get_const(env, spec->sp_cr_file_encctx,
+                                       spec->sp_cr_file_encctx_size);
+               rc = mdo_xattr_set(env, son, buf,
+                                  LL_XATTR_NAME_ENCRYPTION_CONTEXT, 0,
+                                  handle);
+               if (rc < 0)
+                       GOTO(err_initlized, rc);
+       }
+
 err_initlized:
        if (unlikely(rc != 0)) {
                int rc2;
@@ -2587,25 +2569,27 @@ stop:
  * \retval             0 on success
  * \retval             negative errno on failure
  */
-static int mdd_create(const struct lu_env *env, struct md_object *pobj,
+int mdd_create(const struct lu_env *env, struct md_object *pobj,
                      const struct lu_name *lname, struct md_object *child,
                      struct md_op_spec *spec, struct md_attr *ma)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct lu_attr          *la = &info->mti_la_for_fix;
-       struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
-       struct mdd_object       *son = md2mdd_obj(child);
-       struct mdd_device       *mdd = mdo2mdd(pobj);
-       struct lu_attr          *attr = &ma->ma_attr;
-       struct thandle          *handle;
-       struct lu_attr          *pattr = &info->mti_pattr;
-       struct lu_buf           acl_buf;
-       struct lu_buf           def_acl_buf;
-       struct linkea_data      *ldata = &info->mti_link_data;
-       const char              *name = lname->ln_name;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lu_attr *la = &info->mti_la_for_fix;
+       struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
+       struct mdd_object *son = md2mdd_obj(child);
+       struct mdd_device *mdd = mdo2mdd(pobj);
+       struct lu_attr *attr = &ma->ma_attr;
+       struct thandle *handle;
+       struct lu_attr *pattr = &info->mti_pattr;
+       struct lu_buf acl_buf;
+       struct lu_buf def_acl_buf;
+       struct lu_buf hsm_buf;
+       struct linkea_data *ldata = &info->mti_link_data;
+       const char *name = lname->ln_name;
        struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
-       int                      rc;
-       int                      rc2;
+       int acl_size = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
+       int rc, rc2;
+
        ENTRY;
 
        rc = mdd_la_get(env, mdd_pobj, pattr);
@@ -2624,15 +2608,39 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        if (IS_ERR(handle))
                GOTO(out_free, rc = PTR_ERR(handle));
 
-       lu_buf_check_and_alloc(&info->mti_xattr_buf,
-                              mdd->mdd_dt_conf.ddp_max_ea_size);
-       acl_buf = info->mti_xattr_buf;
-       def_acl_buf.lb_buf = info->mti_key;
-       def_acl_buf.lb_len = sizeof(info->mti_key);
+use_bigger_buffer:
+       acl_buf = *lu_buf_check_and_alloc(&info->mti_xattr_buf, acl_size);
+       if (!acl_buf.lb_buf)
+               GOTO(out_stop, rc = -ENOMEM);
+       /* mti_big_buf is also used down below in mdd_changelog_ns_store(),
+        * but def_acl_buf is finished with it before then
+        */
+       def_acl_buf = *lu_buf_check_and_alloc(&info->mti_big_buf, acl_size);
+       if (!def_acl_buf.lb_buf)
+               GOTO(out_stop, rc = -ENOMEM);
+
        rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
+       if (unlikely(rc == -ERANGE &&
+                    acl_size == LUSTRE_POSIX_ACL_MAX_SIZE_OLD)) {
+               /* use maximum-sized xattr buffer for too-big default ACL */
+               acl_size = min_t(unsigned int, mdd->mdd_dt_conf.ddp_max_ea_size,
+                                XATTR_SIZE_MAX);
+               goto use_bigger_buffer;
+       }
        if (rc < 0)
                GOTO(out_stop, rc);
 
+       if (S_ISDIR(attr->la_mode)) {
+               struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
+
+               /*
+                * migrate may create 1-stripe directory, so lod_ah_init()
+                * doesn't adjust stripe count from lmu.
+                */
+               if (lmu && lmu->lum_stripe_count == cpu_to_le32(1))
+                       lmu->lum_stripe_count = 0;
+       }
+
        mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
 
        memset(ldata, 0, sizeof(*ldata));
@@ -2648,9 +2656,18 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                                        lname, 1, 0, ldata);
        }
 
+       if (spec->sp_cr_flags & MDS_OPEN_PCC) {
+               LASSERT(spec->sp_cr_flags & MDS_OPEN_HAS_EA);
+
+               memset(&hsm_buf, 0, sizeof(hsm_buf));
+               lu_buf_alloc(&hsm_buf, sizeof(struct hsm_attrs));
+               if (hsm_buf.lb_buf == NULL)
+                       GOTO(out_stop, rc = -ENOMEM);
+       }
+
        rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
                                handle, spec, ldata, &def_acl_buf, &acl_buf,
-                               hint);
+                               &hsm_buf, hint);
        if (rc)
                GOTO(out_stop, rc);
 
@@ -2659,23 +2676,23 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                GOTO(out_stop, rc);
 
        rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
-                              &def_acl_buf, hint, handle);
+                              &def_acl_buf, &hsm_buf, hint, handle, true);
        if (rc != 0)
                GOTO(out_stop, rc);
 
        if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
-               mdd_write_lock(env, son, MOR_TGT_CHILD);
+               mdd_write_lock(env, son, DT_TGT_CHILD);
                son->mod_flags |= VOLATILE_OBJ;
-               rc = __mdd_orphan_add(env, son, handle);
+               rc = mdd_orphan_insert(env, son, handle);
                GOTO(out_volatile, rc);
        } else {
-               rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
+               rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(son),
                                        attr->la_mode, name, handle);
                if (rc != 0)
                        GOTO(err_created, rc);
 
-               mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
-                             ldata, 1);
+               mdd_links_add(env, son, mdd_object_fid(mdd_pobj), lname,
+                             handle, ldata, 1);
 
                /* update parent directory mtime/ctime */
                *la = *attr;
@@ -2689,7 +2706,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
 err_insert:
        if (rc != 0) {
                if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
-                       rc2 = __mdd_orphan_del(env, son, handle);
+                       rc2 = mdd_orphan_delete(env, son, handle);
                else
                        rc2 = __mdd_index_delete(env, mdd_pobj, name,
                                                 S_ISDIR(attr->la_mode),
@@ -2698,7 +2715,7 @@ err_insert:
                        goto out_stop;
 
 err_created:
-               mdd_write_lock(env, son, MOR_TGT_CHILD);
+               mdd_write_lock(env, son, DT_TGT_CHILD);
                if (S_ISDIR(attr->la_mode)) {
                        /* Drop the reference, no need to delete "."/"..",
                         * because the object is to be destroyed directly. */
@@ -2725,14 +2742,14 @@ out_volatile:
                mdd_write_unlock(env, son);
        }
 
-       if (rc == 0 && fid_is_namespace_visible(mdo2fid(son)) &&
+       if (rc == 0 && fid_is_namespace_visible(mdd_object_fid(son)) &&
            likely((spec->sp_cr_flags & MDS_OPEN_VOLATILE) == 0))
                rc = mdd_changelog_ns_store(env, mdd,
                                S_ISDIR(attr->la_mode) ? CL_MKDIR :
                                S_ISREG(attr->la_mode) ? CL_CREATE :
                                S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
-                               0, son, mdo2fid(mdd_pobj), NULL, NULL, lname,
-                               NULL, handle);
+                               0, son, mdd_object_fid(mdd_pobj), NULL, NULL,
+                               lname, NULL, handle);
 out_stop:
        rc2 = mdd_trans_stop(env, mdd, rc, handle);
        if (rc == 0) {
@@ -2750,6 +2767,9 @@ out_free:
                /* if we vmalloced a large buffer drop it */
                lu_buf_free(ldata->ld_buf);
 
+       if (spec->sp_cr_flags & MDS_OPEN_PCC)
+               lu_buf_free(&hsm_buf);
+
        /* The child object shouldn't be cached anymore */
        if (rc)
                set_bit(LU_OBJECT_HEARD_BANSHEE,
@@ -2757,49 +2777,6 @@ out_free:
        return rc;
 }
 
-/*
- * Get locks on parents in proper order
- * RETURN: < 0 - error, rename_order if successful
- */
-enum rename_order {
-        MDD_RN_SAME,
-        MDD_RN_SRCTGT,
-        MDD_RN_TGTSRC
-};
-
-static int mdd_rename_order(const struct lu_env *env,
-                            struct mdd_device *mdd,
-                            struct mdd_object *src_pobj,
-                           const struct lu_attr *pattr,
-                            struct mdd_object *tgt_pobj)
-{
-        /* order of locking, 1 - tgt-src, 0 - src-tgt*/
-        int rc;
-        ENTRY;
-
-        if (src_pobj == tgt_pobj)
-                RETURN(MDD_RN_SAME);
-
-        /* compared the parent child relationship of src_p&tgt_p */
-        if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
-                rc = MDD_RN_SRCTGT;
-        } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
-                rc = MDD_RN_TGTSRC;
-        } else {
-               rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
-                                  NULL);
-                if (rc == -EREMOTE)
-                        rc = 0;
-
-                if (rc == 1)
-                        rc = MDD_RN_TGTSRC;
-                else if (rc == 0)
-                        rc = MDD_RN_SRCTGT;
-        }
-
-        RETURN(rc);
-}
-
 /* has not mdd_write{read}_lock on any obj yet. */
 static int mdd_rename_sanity_check(const struct lu_env *env,
                                    struct mdd_object *src_pobj,
@@ -2864,28 +2841,27 @@ static int mdd_declare_rename(const struct lu_env *env,
                              struct linkea_data *ldata,
                              struct thandle *handle)
 {
-       struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
+       struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
        int rc;
 
        LASSERT(ma->ma_attr.la_valid & LA_CTIME);
        la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
 
-        LASSERT(mdd_spobj);
-        LASSERT(mdd_tpobj);
-        LASSERT(mdd_sobj);
+       LASSERT(mdd_spobj);
+       LASSERT(mdd_tpobj);
+       LASSERT(mdd_sobj);
 
-        /* name from source dir */
-        rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
-        if (rc)
-                return rc;
+       /* name from source dir */
+       rc = mdo_declare_index_delete(env, mdd_spobj, sname->ln_name, handle);
+       if (rc)
+               return rc;
 
-        /* .. from source child */
-        if (S_ISDIR(mdd_object_type(mdd_sobj))) {
-                /* source child can be directory,
-                 * counted by source dir's nlink */
-                rc = mdo_declare_ref_del(env, mdd_spobj, handle);
-                if (rc)
-                        return rc;
+       /* .. from source child */
+       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
+               /* source child can be directory, count by source dir's nlink */
+               rc = mdo_declare_ref_del(env, mdd_spobj, handle);
+               if (rc)
+                       return rc;
                if (mdd_spobj != mdd_tpobj) {
                        rc = mdo_declare_index_delete(env, mdd_sobj, dotdot,
                                                      handle);
@@ -2893,7 +2869,7 @@ static int mdd_declare_rename(const struct lu_env *env,
                                return rc;
 
                        rc = mdo_declare_index_insert(env, mdd_sobj,
-                                                     mdo2fid(mdd_tpobj),
+                                                     mdd_object_fid(mdd_tpobj),
                                                      S_IFDIR, dotdot, handle);
                        if (rc != 0)
                                return rc;
@@ -2925,7 +2901,7 @@ static int mdd_declare_rename(const struct lu_env *env,
                return rc;
 
        /* new name */
-       rc = mdo_declare_index_insert(env, mdd_tpobj, mdo2fid(mdd_sobj),
+       rc = mdo_declare_index_insert(env, mdd_tpobj, mdd_object_fid(mdd_sobj),
                                      mdd_object_type(mdd_sobj),
                                      tname->ln_name, handle);
        if (rc != 0)
@@ -2965,13 +2941,24 @@ static int mdd_declare_rename(const struct lu_env *env,
                        return rc;
         }
 
-       rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
+       rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
+                                        handle);
         if (rc)
                 return rc;
 
         return rc;
 }
 
+static int mdd_migrate_object(const struct lu_env *env,
+                             struct mdd_object *spobj,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
+                             struct md_op_spec *spec,
+                             struct md_attr *ma);
+
 /* src object can be remote that is why we use only fid and type of object */
 static int mdd_rename(const struct lu_env *env,
                       struct md_object *src_pobj, struct md_object *tgt_pobj,
@@ -2993,8 +2980,8 @@ static int mdd_rename(const struct lu_env *env,
        struct lu_attr *tpattr = MDD_ENV_VAR(env, tpattr);
        struct thandle *handle;
        struct linkea_data  *ldata = &mdd_env_info(env)->mti_link_data;
-       const struct lu_fid *tpobj_fid = mdo2fid(mdd_tpobj);
-       const struct lu_fid *spobj_fid = mdo2fid(mdd_spobj);
+       const struct lu_fid *tpobj_fid = mdd_object_fid(mdd_tpobj);
+       const struct lu_fid *spobj_fid = mdd_object_fid(mdd_spobj);
        bool is_dir;
        bool tobj_ref = 0;
        bool tobj_locked = 0;
@@ -3002,6 +2989,9 @@ static int mdd_rename(const struct lu_env *env,
        int rc, rc2;
        ENTRY;
 
+       /* let unlink to complete and commit */
+       CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 2 + cfs_fail_val);
+
        if (tobj)
                mdd_tobj = md2mdd_obj(tobj);
 
@@ -3013,6 +3003,31 @@ static int mdd_rename(const struct lu_env *env,
        if (rc)
                GOTO(out_pending, rc);
 
+       /* if rename is cross MDTs, migrate symlink if it doesn't have other
+        * hard links, and target doesn't exist.
+        */
+       if (mdd_object_remote(mdd_sobj) && S_ISLNK(cattr->la_mode) &&
+           cattr->la_nlink == 1 && !tobj) {
+               struct md_op_spec *spec = &mdd_env_info(env)->mti_spec;
+               struct lu_device *ld = &mdd->mdd_md_dev.md_lu_dev;
+               struct lu_fid tfid;
+
+               rc = ld->ld_ops->ldo_fid_alloc(env, ld, &tfid, &tgt_pobj->mo_lu,
+                                              NULL);
+               if (rc < 0)
+                       GOTO(out_pending, rc);
+
+               mdd_tobj = mdd_object_find(env, mdd, &tfid);
+               if (IS_ERR(mdd_tobj))
+                       GOTO(out_pending, rc = PTR_ERR(mdd_tobj));
+
+               memset(spec, 0, sizeof(*spec));
+               rc = mdd_migrate_object(env, mdd_spobj, mdd_tpobj, mdd_sobj,
+                                       mdd_tobj, lsname, ltname, spec, ma);
+               mdd_object_put(env, mdd_tobj);
+               GOTO(out_pending, rc);
+       }
+
        rc = mdd_la_get(env, mdd_spobj, pattr);
        if (rc)
                GOTO(out_pending, rc);
@@ -3041,11 +3056,6 @@ static int mdd_rename(const struct lu_env *env,
        if (rc < 0)
                GOTO(out_pending, rc);
 
-       /* FIXME: Should consider tobj and sobj too in rename_lock. */
-       rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
-       if (rc < 0)
-               GOTO(out_pending, rc);
-
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
@@ -3108,13 +3118,13 @@ static int mdd_rename(const struct lu_env *env,
                GOTO(fixup_tpobj, rc);
 
        /* Update the linkEA for the source object */
-       mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
-       rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
-                             mdo2fid(mdd_tpobj), ltname, handle, ldata,
-                             0, 0);
+       mdd_write_lock(env, mdd_sobj, DT_SRC_CHILD);
+       rc = mdd_links_rename(env, mdd_sobj, mdd_object_fid(mdd_spobj),
+                             lsname, mdd_object_fid(mdd_tpobj), ltname,
+                             handle, ldata, 0, 0);
        if (rc == -ENOENT)
                /* Old files might not have EA entry */
-               mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
+               mdd_links_add(env, mdd_sobj, mdd_object_fid(mdd_spobj),
                              lsname, handle, NULL, 0);
        mdd_write_unlock(env, mdd_sobj);
        /* We don't fail the transaction if the link ea can't be
@@ -3127,7 +3137,7 @@ static int mdd_rename(const struct lu_env *env,
          * it must be local one.
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
-                mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
+               mdd_write_lock(env, mdd_tobj, DT_TGT_CHILD);
                tobj_locked = 1;
                 if (mdd_is_dead_obj(mdd_tobj)) {
                         /* shld not be dead, something is wrong */
@@ -3228,9 +3238,9 @@ fixup_tpobj:
                        }
 
                        rc2 = __mdd_index_insert(env, mdd_tpobj,
-                                                 mdo2fid(mdd_tobj),
-                                                 mdd_object_type(mdd_tobj),
-                                                 tname, handle);
+                                                mdd_object_fid(mdd_tobj),
+                                                mdd_object_type(mdd_tobj),
+                                                tname, handle);
                        if (rc2 != 0)
                                CWARN("tp obj fix error: rc = %d\n", rc2);
                }
@@ -3278,270 +3288,93 @@ out_pending:
 }
 
 /**
- * During migration once the parent FID has been changed,
- * we need update the parent FID in linkea.
+ * Check whether we should migrate the file/dir
+ * return val
+ *     < 0  permission check failed or other error.
+ *     = 0  the file can be migrated.
  **/
-static int mdd_linkea_update_child_internal(const struct lu_env *env,
-                                           struct mdd_object *parent,
-                                           struct mdd_object *newparent,
-                                           struct mdd_object *child,
-                                           const char *name, int namelen,
-                                           struct thandle *handle,
-                                           bool declare)
+static int mdd_migrate_sanity_check(const struct lu_env *env,
+                                   struct mdd_device *mdd,
+                                   struct mdd_object *spobj,
+                                   struct mdd_object *tpobj,
+                                   struct mdd_object *sobj,
+                                   struct mdd_object *tobj,
+                                   const struct lu_attr *spattr,
+                                   const struct lu_attr *tpattr,
+                                   const struct lu_attr *attr)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct linkea_data      ldata = { NULL };
-       struct lu_buf           *buf = &info->mti_link_buf;
-       int                     count;
-       int                     rc = 0;
+       int rc;
 
        ENTRY;
 
-       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
-       if (buf->lb_buf == NULL)
-               RETURN(-ENOMEM);
-
-       ldata.ld_buf = buf;
-       rc = mdd_links_read(env, child, &ldata);
-       if (rc != 0) {
-               if (rc == -ENOENT || rc == -ENODATA)
-                       rc = 0;
-               RETURN(rc);
+       if (!mdd_object_remote(sobj)) {
+               mdd_read_lock(env, sobj, DT_SRC_CHILD);
+               if (sobj->mod_count > 0) {
+                       CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
+                              mdd_obj_dev_name(sobj),
+                              PFID(mdd_object_fid(sobj)),
+                              sobj->mod_count);
+                       mdd_read_unlock(env, sobj);
+                       RETURN(-EBUSY);
+               }
+               mdd_read_unlock(env, sobj);
        }
 
-       LASSERT(ldata.ld_leh != NULL);
-       ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
-       for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
-               struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
-               struct lu_name lname;
-               struct lu_fid  fid;
-
-               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
-                                   &lname, &fid);
-
-               if (strncmp(lname.ln_name, name, namelen) != 0 ||
-                   !lu_fid_eq(&fid, mdd_object_fid(parent))) {
-                       ldata.ld_lee = (struct link_ea_entry *)
-                                      ((char *)ldata.ld_lee +
-                                       ldata.ld_reclen);
-                       continue;
-               }
+       if (mdd_object_exists(tobj))
+               RETURN(-EEXIST);
 
-               CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
-                      mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
-                      lname.ln_namelen, lname.ln_name,
-                      PFID(mdd_object_fid(newparent)));
-               /* update to the new parent fid */
-               linkea_entry_pack(ldata.ld_lee, &lname,
-                                 mdd_object_fid(newparent));
-               if (declare)
-                       rc = mdd_declare_links_add(env, child, handle, &ldata);
-               else
-                       rc = mdd_links_write(env, child, &ldata, handle);
-               break;
-       }
+       rc = mdd_rename_sanity_check(env, spobj, spattr, tpobj, tpattr, sobj,
+                                    attr, NULL, NULL);
        RETURN(rc);
 }
 
-static int mdd_linkea_declare_update_child(const struct lu_env *env,
-                                          struct mdd_object *parent,
-                                          struct mdd_object *newparent,
-                                          struct mdd_object *child,
-                                          const char *name, int namelen,
-                                          struct thandle *handle)
-{
-       return mdd_linkea_update_child_internal(env, parent, newparent,
-                                               child, name,
-                                               namelen, handle, true);
-}
-
-static int mdd_linkea_update_child(const struct lu_env *env,
-                                  struct mdd_object *parent,
-                                  struct mdd_object *newparent,
-                                  struct mdd_object *child,
-                                  const char *name, int namelen,
-                                  struct thandle *handle)
+typedef int (*mdd_xattr_cb)(const struct lu_env *env,
+                           struct mdd_object *obj,
+                           const struct lu_buf *buf,
+                           const char *name,
+                           int fl, struct thandle *handle);
+
+/* iterate xattrs, but ignore LMA, LMV, and LINKEA if 'skip_linkea' is set. */
+static int mdd_iterate_xattrs(const struct lu_env *env,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             bool skip_linkea,
+                             struct thandle *handle,
+                             mdd_xattr_cb cb)
 {
-       return mdd_linkea_update_child_internal(env, parent, newparent,
-                                               child, name,
-                                               namelen, handle, false);
-}
+       struct mdd_thread_info *info = mdd_env_info(env);
+       char *xname;
+       struct lu_buf list_xbuf;
+       struct lu_buf cbxbuf;
+       struct lu_buf xbuf = { NULL };
+       int list_xsize;
+       int xlen;
+       int rem;
+       int xsize;
+       int rc;
 
-static int mdd_update_linkea_internal(const struct lu_env *env,
-                                     struct mdd_object *mdd_pobj,
-                                     struct mdd_object *mdd_sobj,
-                                     struct mdd_object *mdd_tobj,
-                                     const struct lu_name *child_name,
-                                     struct linkea_data *ldata,
-                                     struct thandle *handle,
-                                     int declare)
-{
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       int                     count;
-       int                     rc = 0;
        ENTRY;
 
-       LASSERT(ldata->ld_buf != NULL);
-       LASSERT(ldata->ld_leh != NULL);
+       /* retrieve xattr list from the old object */
+       list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
+       if (list_xsize == -ENODATA)
+               RETURN(0);
 
-       /* If it is mulitple links file, we need update the name entry for
-        * all parent */
-       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
-       for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
-               struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-               struct mdd_object       *pobj;
-               struct lu_name          lname;
-               struct lu_fid           fid;
-
-               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
-                                   &lname, &fid);
-               pobj = mdd_object_find(env, mdd, &fid);
-               if (IS_ERR(pobj)) {
-                       CWARN("%s: cannot find obj "DFID": rc = %ld\n",
-                             mdd2obd_dev(mdd)->obd_name, PFID(&fid),
-                             PTR_ERR(pobj));
-                       continue;
-               }
-
-               if (!mdd_object_exists(pobj)) {
-                       CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
-                             mdd2obd_dev(mdd)->obd_name, PFID(&fid));
-                       goto next_put;
-               }
-
-               if (pobj == mdd_pobj &&
-                   lname.ln_namelen == child_name->ln_namelen &&
-                   strncmp(lname.ln_name, child_name->ln_name,
-                           lname.ln_namelen) == 0) {
-                       CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
-                             mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
-                             PFID(&fid));
-                       goto next_put;
-               }
-
-               CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
-                      mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
-                      PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
-
-               if (declare) {
-                       /* Remove source name from source directory */
-                       /* Insert new fid with target name into target dir */
-                       rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
-                                                     handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
-
-                       rc = mdo_declare_index_insert(env, pobj,
-                                       mdd_object_fid(mdd_tobj),
-                                       mdd_object_type(mdd_tobj),
-                                       lname.ln_name, handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
-
-                       rc = mdo_declare_ref_add(env, mdd_tobj, handle);
-                       if (rc)
-                               GOTO(next_put, rc);
-
-                       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-                       if (rc)
-                               GOTO(next_put, rc);
-               } else {
-                       char *tmp_name = info->mti_key;
-
-                       if (lname.ln_namelen >= sizeof(info->mti_key)) {
-                               /* lnamelen is too big(> NAME_MAX + 16),
-                                * something wrong about this linkea, let's
-                                * skip it */
-                               CWARN("%s: the name %.*s is too long under "
-                                     DFID"\n", mdd2obd_dev(mdd)->obd_name,
-                                     lname.ln_namelen, lname.ln_name,
-                                     PFID(&fid));
-                               goto next_put;
-                       }
-
-                       /* Note: lname might be without \0 at the end, see
-                        * linkea_entry_unpack(), let's add extra \0 by
-                        * snprintf */
-                       snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
-                                lname.ln_namelen, lname.ln_name);
-                       lname.ln_name = tmp_name;
-
-                       /* Let's check if this linkEA still valid, before
-                        * it might be packed into the RPC buffer. */
-                       rc = mdd_lookup(env, &pobj->mod_obj, &lname,
-                                       &info->mti_fid, NULL);
-                       if (rc < 0 || !lu_fid_eq(&info->mti_fid,
-                                                mdd_object_fid(mdd_sobj)))
-                               GOTO(next_put, rc == -ENOENT ? 0 : rc);
-
-                       rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
-
-                       rc = __mdd_index_insert(env, pobj,
-                                       mdd_object_fid(mdd_tobj),
-                                       mdd_object_type(mdd_tobj),
-                                       tmp_name, handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
-
-                       mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
-                       rc = mdo_ref_add(env, mdd_tobj, handle);
-                       mdd_write_unlock(env, mdd_tobj);
-                       if (rc)
-                               GOTO(next_put, rc);
-
-                       mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
-                       mdo_ref_del(env, mdd_sobj, handle);
-                       mdd_write_unlock(env, mdd_sobj);
-               }
-next_put:
-               mdd_object_put(env, pobj);
-               if (rc != 0)
-                       break;
-
-               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
-                                                        ldata->ld_reclen);
-       }
-
-       RETURN(rc);
-}
-
-static int mdd_migrate_xattrs(const struct lu_env *env,
-                             struct mdd_object *mdd_sobj,
-                             struct mdd_object *mdd_tobj)
-{
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       char                    *xname;
-       struct thandle          *handle;
-       struct lu_buf           xbuf;
-       int                     xlen;
-       int                     rem;
-       int                     xsize;
-       int                     list_xsize;
-       struct lu_buf           list_xbuf;
-       int                     rc;
-
-       /* retrieve xattr list from the old object */
-       list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
-       if (list_xsize == -ENODATA)
-               return 0;
-
-       if (list_xsize < 0)
-               return list_xsize;
+       if (list_xsize < 0)
+               RETURN(list_xsize);
 
        lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
        if (info->mti_big_buf.lb_buf == NULL)
-               return -ENOMEM;
+               RETURN(-ENOMEM);
 
        list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
        list_xbuf.lb_len = list_xsize;
-       rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
+       rc = mdo_xattr_list(env, sobj, &list_xbuf);
        if (rc < 0)
-               return rc;
+               RETURN(rc);
+
+       rem = rc;
        rc = 0;
-       rem = list_xsize;
        xname = list_xbuf.lb_buf;
        while (rem > 0) {
                xlen = strnlen(xname, rem - 1) + 1;
@@ -3549,981 +3382,1547 @@ static int mdd_migrate_xattrs(const struct lu_env *env,
                    strcmp(XATTR_NAME_LMV, xname) == 0)
                        goto next;
 
-               /* For directory, if there are default layout, migrate here */
-               if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
-                   !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
+               if (skip_linkea &&
+                   strcmp(XATTR_NAME_LINK, xname) == 0)
                        goto next;
 
-               xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
+               xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
                if (xsize == -ENODATA)
                        goto next;
                if (xsize < 0)
-                       GOTO(out, rc);
+                       GOTO(out, rc = xsize);
 
-               lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
-               if (info->mti_link_buf.lb_buf == NULL)
+               lu_buf_check_and_alloc(&xbuf, xsize);
+               if (xbuf.lb_buf == NULL)
                        GOTO(out, rc = -ENOMEM);
 
-               xbuf.lb_len = xsize;
-               xbuf.lb_buf = info->mti_link_buf.lb_buf;
-               rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
+               rc = mdo_xattr_get(env, sobj, &xbuf, xname);
                if (rc == -ENODATA)
                        goto next;
                if (rc < 0)
                        GOTO(out, rc);
 
-               handle = mdd_trans_create(env, mdd);
-               if (IS_ERR(handle))
-                       GOTO(out, rc = PTR_ERR(handle));
-
-               rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
-                                          handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-               /* Note: this transaction is part of migration, and it is not
-                * the last step of migration, so we set th_local = 1 to avoid
-                * update last rcvd for this transaction */
-               handle->th_local = 1;
-               rc = mdd_trans_start(env, mdd, handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-
-again:
-               rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
-               if (rc == -EEXIST)
-                       GOTO(stop_trans, rc = 0);
-
+               cbxbuf = xbuf;
+               cbxbuf.lb_len = xsize;
+repeat:
+               rc = cb(env, tobj, &cbxbuf, xname, 0, handle);
                if (unlikely(rc == -ENOSPC &&
                             strcmp(xname, XATTR_NAME_LINK) == 0)) {
                        rc = linkea_overflow_shrink(
-                                       (struct linkea_data *)(xbuf.lb_buf));
+                                       (struct linkea_data *)(cbxbuf.lb_buf));
                        if (likely(rc > 0)) {
-                               xbuf.lb_len = rc;
-                               goto again;
+                               cbxbuf.lb_len = rc;
+                               goto repeat;
                        }
                }
 
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-stop_trans:
-               rc = mdd_trans_stop(env, mdd, rc, handle);
-               if (rc != 0)
+               if (rc)
                        GOTO(out, rc);
 next:
+               xname += xlen;
                rem -= xlen;
-               memmove(xname, xname + xlen, rem);
        }
+
 out:
-       return rc;
+       lu_buf_free(&xbuf);
+       RETURN(rc);
 }
 
-static int mdd_declare_migrate_create(const struct lu_env *env,
-                                     struct mdd_object *mdd_pobj,
-                                     struct mdd_object *mdd_sobj,
-                                     struct mdd_object *mdd_tobj,
-                                     struct md_op_spec *spec,
-                                     struct lu_attr *la,
-                                     union lmv_mds_md *mgr_ea,
-                                     struct linkea_data *ldata,
-                                     struct thandle *handle)
+typedef int (*mdd_linkea_cb)(const struct lu_env *env,
+                            struct mdd_object *sobj,
+                            struct mdd_object *tobj,
+                            const struct lu_name *sname,
+                            const struct lu_fid *sfid,
+                            const struct lu_name *lname,
+                            const struct lu_fid *fid,
+                            void *opaque,
+                            struct thandle *handle);
+
+static int mdd_declare_update_link(const struct lu_env *env,
+                                  struct mdd_object *sobj,
+                                  struct mdd_object *tobj,
+                                  const struct lu_name *tname,
+                                  const struct lu_fid *tpfid,
+                                  const struct lu_name *lname,
+                                  const struct lu_fid *fid,
+                                  void *unused,
+                                  struct thandle *handle)
 {
-       struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
-       const struct lu_buf     *buf;
-       int                     rc;
-       int                     mgr_easize;
-
-       rc = mdd_declare_create_object_internal(env, mdd_pobj, mdd_tobj, la,
-                                               handle, spec, NULL);
-       if (rc != 0)
-               return rc;
-
-       rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
-                                          handle);
-       if (rc != 0)
-               return rc;
+       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+       struct mdd_object *pobj;
+       int rc;
 
-       if (S_ISLNK(la->la_mode)) {
-               const char *target_name = spec->u.sp_symname;
-               int sym_len = strlen(target_name);
-               const struct lu_buf *buf;
+       /* ignore tobj */
+       if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+           !strcmp(tname->ln_name, lname->ln_name))
+               return 0;
 
-               buf = mdd_buf_get_const(env, target_name, sym_len);
-               rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
-                                            buf, 0, handle);
-               if (rc != 0)
-                       return rc;
-       } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
-               rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
-               if (rc != 0)
-                       return rc;
-       }
+       pobj = mdd_object_find(env, mdd, fid);
+       if (IS_ERR(pobj))
+               return PTR_ERR(pobj);
 
-       if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
-               buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
-                                       spec->u.sp_ea.eadatalen);
-               rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
-                                          0, handle);
-               if (rc)
-                       return rc;
-       }
 
-       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
-       buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
-       rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
-                                  0, handle);
+       rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+       if (!rc)
+               rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
+                                             mdd_object_type(sobj),
+                                             lname->ln_name, handle);
+       mdd_object_put(env, pobj);
        if (rc)
                return rc;
 
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
-       rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
+       rc = mdo_declare_ref_add(env, tobj, handle);
+       if (rc)
+               return rc;
 
+       rc = mdo_declare_ref_del(env, sobj, handle);
        return rc;
 }
 
-static int mdd_migrate_create(const struct lu_env *env,
-                             struct mdd_object *mdd_pobj,
-                             struct mdd_object *mdd_sobj,
-                             struct mdd_object *mdd_tobj,
-                             const struct lu_name *lname,
-                             struct lu_attr *la)
+static int mdd_update_link(const struct lu_env *env,
+                          struct mdd_object *sobj,
+                          struct mdd_object *tobj,
+                          const struct lu_name *tname,
+                          const struct lu_fid *tpfid,
+                          const struct lu_name *lname,
+                          const struct lu_fid *fid,
+                          void *unused,
+                          struct thandle *handle)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct md_op_spec       *spec = &info->mti_spec;
-       struct lu_buf           lmm_buf = { NULL };
-       struct lu_buf           link_buf = { NULL };
-       struct lu_buf            mgr_buf;
-       struct thandle          *handle;
-       struct lmv_mds_md_v1    *mgr_ea;
-       struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
-       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
-       int                     mgr_easize;
-       struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
-       int                     rc;
-       ENTRY;
-
-       /* prepare spec for create */
-       memset(spec, 0, sizeof(*spec));
-       spec->sp_cr_lookup = 0;
-       spec->sp_feat = &dt_directory_features;
-       if (S_ISLNK(la->la_mode)) {
-               const struct lu_buf *buf;
+       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+       struct mdd_object *pobj;
+       int rc;
 
-               buf = lu_buf_check_and_alloc(
-                               &mdd_env_info(env)->mti_big_buf,
-                               la->la_size + 1);
-               link_buf = *buf;
-               link_buf.lb_len = la->la_size + 1;
-               memset(link_buf.lb_buf, 0, link_buf.lb_len);
-               rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
-               if (rc <= 0) {
-                       rc = rc != 0 ? rc : -EFAULT;
-                       CERROR("%s: "DFID" readlink failed: rc = %d\n",
-                              mdd2obd_dev(mdd)->obd_name,
-                              PFID(mdd_object_fid(mdd_sobj)), rc);
-                       RETURN(rc);
-               }
-               spec->u.sp_symname = link_buf.lb_buf;
-       } else if (S_ISREG(la->la_mode)) {
-               /* retrieve lov of the old object */
-               rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
-               if (rc != 0 && rc != -ENODATA)
-                       RETURN(rc);
-               if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
-                       spec->u.sp_ea.eadata = lmm_buf.lb_buf;
-                       spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
-                       spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
-               }
-       } else if (S_ISDIR(la->la_mode)) {
-               rc = mdd_links_read_with_rec(env, mdd_sobj, ldata);
-               if (rc == -ENODATA) {
-                       /* ignore the non-linkEA error */
-                       ldata = NULL;
-                       rc = 0;
-               }
-               if (rc < 0)
-                       RETURN(rc);
-       }
+       ENTRY;
 
-       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
-       lu_buf_check_and_alloc(&info->mti_xattr_buf, mgr_easize);
-       mgr_buf.lb_buf = info->mti_xattr_buf.lb_buf;
-       mgr_buf.lb_len = mgr_easize;
-       mgr_ea = mgr_buf.lb_buf;
-       memset(mgr_ea, 0, sizeof(*mgr_ea));
-       mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
-       mgr_ea->lmv_stripe_count = cpu_to_le32(2);
-       mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
-       mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
-       fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
-       fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
-
-       mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
+       LASSERT(lu_name_is_valid(lname));
 
-       handle = mdd_trans_create(env, mdd);
-       if (IS_ERR(handle))
-               GOTO(out_free, rc = PTR_ERR(handle));
+       /* ignore tobj */
+       if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+           !strncmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
+               RETURN(0);
 
-       /* Note: this transaction is part of migration, and it is not
-        * the last step of migration, so we set th_local = 1 to avoid
-        * update last rcvd for this transaction */
-       handle->th_local = 1;
-       rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj, spec,
-                                       la, mgr_buf.lb_buf, ldata, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
+              PFID(fid), PNAME(lname), PFID(mdd_object_fid(tobj)));
 
-       rc = mdd_trans_start(env, mdd, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       pobj = mdd_object_find(env, mdd, fid);
+       if (IS_ERR(pobj)) {
+               CWARN("%s: cannot find obj "DFID": %ld\n",
+                     mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
+               RETURN(PTR_ERR(pobj));
+       }
 
-       /* don't set nlink from the original object */
-       la->la_valid &= ~LA_NLINK;
+       if (!mdd_object_exists(pobj)) {
+               CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
+               mdd_object_put(env, pobj);
+               RETURN(-ENOENT);
+       }
 
-       /* create the target object */
-       rc = mdd_create_object(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
-                              hint, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       mdd_write_lock(env, pobj, DT_TGT_PARENT);
+       rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
+       if (!rc)
+               rc = __mdd_index_insert_only(env, pobj, mdd_object_fid(tobj),
+                                            mdd_object_type(sobj),
+                                            lname->ln_name, handle);
+       mdd_write_unlock(env, pobj);
+       mdd_object_put(env, pobj);
+       if (rc)
+               RETURN(rc);
 
-       if (S_ISDIR(la->la_mode) && ldata != NULL) {
-               rc = mdd_links_write(env, mdd_tobj, ldata, handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-       }
+       mdd_write_lock(env, tobj, DT_TGT_CHILD);
+       rc = mdo_ref_add(env, tobj, handle);
+       mdd_write_unlock(env, tobj);
+       if (rc)
+               RETURN(rc);
 
-       /* Set MIGRATE EA on the source inode, so once the migration needs
-        * to be re-done during failover, the re-do process can locate the
-        * target object which is already being created. */
-       rc = mdo_xattr_set(env, mdd_sobj, &mgr_buf, XATTR_NAME_LMV, 0, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       mdd_write_lock(env, sobj, DT_SRC_CHILD);
+       rc = mdo_ref_del(env, sobj, handle);
+       mdd_write_unlock(env, sobj);
 
-       /* Set immutable flag, so any modification is disabled until
-        * the migration is done. Once the migration is interrupted,
-        * if the resume process find the migrating object has both
-        * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
-        * flag and approve the migration */
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
-       rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
-stop_trans:
-       if (handle != NULL)
-               rc = mdd_trans_stop(env, mdd, rc, handle);
-out_free:
-       if (lmm_buf.lb_buf != NULL)
-               OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
        RETURN(rc);
 }
 
-static int mdd_migrate_entries(const struct lu_env *env,
-                              struct mdd_object *mdd_sobj,
-                              struct mdd_object *mdd_tobj)
+static inline int mdd_fld_lookup(const struct lu_env *env,
+                                struct mdd_device *mdd,
+                                const struct lu_fid *fid,
+                                __u32 *mdt_index)
 {
-       struct dt_object        *next = mdd_object_child(mdd_sobj);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
-       struct thandle          *handle;
-       struct dt_it            *it;
-       const struct dt_it_ops  *iops;
-       int                      result;
-       struct lu_dirent        *ent;
-       int                      rc;
-       ENTRY;
-
-       OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
-       if (ent == NULL)
-               RETURN(-ENOMEM);
-
-       if (!dt_try_as_dir(env, next))
-               GOTO(out_ent, rc = -ENOTDIR);
-       /*
-        * iterate directories
-        */
-       iops = &next->do_index_ops->dio_it;
-       it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
-       if (IS_ERR(it))
-               GOTO(out_ent, rc = PTR_ERR(it));
-
-       rc = iops->load(env, it, 0);
-       if (rc == 0)
-               rc = iops->next(env, it);
-       else if (rc > 0)
-               rc = 0;
-       /*
-        * At this point and across for-loop:
-        *
-        *  rc == 0 -> ok, proceed.
-        *  rc >  0 -> end of directory.
-        *  rc <  0 -> error.
-        */
-       do {
-               struct mdd_object       *child;
-               char                    *name = mdd_env_info(env)->mti_key;
-               int                     len;
-               int                     is_dir;
-               bool                    target_exist = false;
-
-               len = iops->key_size(env, it);
-               if (len == 0)
-                       goto next;
+       struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
+       struct seq_server_site *ss;
+       int rc;
 
-               result = iops->rec(env, it, (struct dt_rec *)ent,
-                                  LUDA_FID | LUDA_TYPE);
-               if (result == -ESTALE)
-                       goto next;
-               if (result != 0) {
-                       rc = result;
-                       goto out;
-               }
+       ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
 
-               fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
+       range->lsr_flags = LU_SEQ_RANGE_MDT;
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
+       if (rc)
+               return rc;
 
-               /* Insert new fid with target name into target dir */
-               if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
-                   (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
-                    ent->lde_name[1] == '.'))
-                       goto next;
+       *mdt_index = range->lsr_index;
 
-               child = mdd_object_find(env, mdd, &ent->lde_fid);
-               if (IS_ERR(child))
-                       GOTO(out, rc = PTR_ERR(child));
+       return 0;
+}
 
-               /* child may not exist, but lu_object_attr will assert this,
-                * get type from loh_attr directly */
-               is_dir = S_ISDIR(child->mod_obj.mo_lu.lo_header->loh_attr);
+static int mdd_is_link_on_source_mdt(const struct lu_env *env,
+                                    struct mdd_object *sobj,
+                                    struct mdd_object *tobj,
+                                    const struct lu_name *tname,
+                                    const struct lu_fid *tpfid,
+                                    const struct lu_name *lname,
+                                    const struct lu_fid *fid,
+                                    void *opaque,
+                                    struct thandle *handle)
+{
+       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+       __u32 source_mdt_index = *(__u32 *)opaque;
+       __u32 link_mdt_index;
+       int rc;
 
-               mdd_write_lock(env, child, MOR_SRC_CHILD);
+       ENTRY;
 
-               snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
+       /* ignore tobj */
+       if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+           !strcmp(tname->ln_name, lname->ln_name))
+               return 0;
 
-               /* Check whether the name has been inserted to the target */
-               if (dt_try_as_dir(env, dt_tobj)) {
-                       struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+       rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
+       if (rc)
+               RETURN(rc);
 
-                       rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
-                                      (struct dt_key *)name);
-                       if (unlikely(rc == 0))
-                               target_exist = true;
-               }
+       RETURN(link_mdt_index == source_mdt_index);
+}
 
-               handle = mdd_trans_create(env, mdd);
-               if (IS_ERR(handle))
-                       GOTO(out_put, rc = PTR_ERR(handle));
-
-               /* Note: this transaction is part of migration, and it is not
-                * the last step of migration, so we set th_local = 1 to avoid
-                * updating last rcvd for this transaction */
-               handle->th_local = 1;
-               if (likely(!target_exist)) {
-                       rc = mdo_declare_index_insert(env, mdd_tobj,
-                               &ent->lde_fid,
-                               child->mod_obj.mo_lu.lo_header->loh_attr,
-                               name, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+static int mdd_iterate_linkea(const struct lu_env *env,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *tname,
+                             const struct lu_fid *tpfid,
+                             struct linkea_data *ldata,
+                             void *opaque,
+                             struct thandle *handle,
+                             mdd_linkea_cb cb)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       char *filename = info->mti_name;
+       struct lu_name lname;
+       struct lu_fid fid;
+       int rc = 0;
 
-                       if (is_dir) {
-                               rc = mdo_declare_ref_add(env, mdd_tobj, handle);
-                               if (rc != 0)
-                                       GOTO(out_put, rc);
-                       }
-               }
+       if (!ldata->ld_buf)
+               return 0;
 
-               rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
-               if (rc != 0)
-                       GOTO(out_put, rc);
+       for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
+            linkea_next_entry(ldata)) {
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
+                                   &fid);
 
-               if (is_dir) {
-                       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+               /* Note: lname might miss \0 at the end */
+               snprintf(filename, sizeof(info->mti_name), "%.*s",
+                        lname.ln_namelen, lname.ln_name);
+               lname.ln_name = filename;
 
-                       /* Update .. for child */
-                       rc = mdo_declare_index_delete(env, child, dotdot,
-                                                     handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+               CDEBUG(D_INFO, DFID"/"DNAME"\n", PFID(&fid), PNAME(&lname));
 
-                       rc = mdo_declare_index_insert(env, child,
-                                                     mdd_object_fid(mdd_tobj),
-                                                     S_IFDIR, dotdot, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
+               rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
+                       handle);
+       }
 
-               rc = mdd_linkea_declare_update_child(env, mdd_sobj,mdd_tobj,
-                                                    child, name,
-                                                    strlen(name),
-                                                    handle);
-               if (rc != 0)
-                       GOTO(out_put, rc);
+       return rc;
+}
 
-               rc = mdd_trans_start(env, mdd, handle);
-               if (rc != 0) {
-                       CERROR("%s: transaction start failed: rc = %d\n",
-                              mdd2obd_dev(mdd)->obd_name, rc);
-                       GOTO(out_put, rc);
-               }
+/**
+ * Prepare linkea, and check whether file needs migrate: if source still has
+ * link on source MDT, no need to migrate, just update namespace on source and
+ * target parents.
+ *
+ * \retval     0 do migrate
+ * \retval     1 don't migrate
+ * \retval     -errno on failure
+ */
+static int mdd_migrate_linkea_prepare(const struct lu_env *env,
+                                     struct mdd_device *mdd,
+                                     struct mdd_object *spobj,
+                                     struct mdd_object *tpobj,
+                                     struct mdd_object *sobj,
+                                     const struct lu_name *sname,
+                                     const struct lu_name *tname,
+                                     const struct lu_attr *attr,
+                                     struct linkea_data *ldata)
+{
+       __u32 source_mdt_index;
+       int rc;
 
-               if (likely(!target_exist)) {
-                       rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
-                               child->mod_obj.mo_lu.lo_header->loh_attr, name,
-                               handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
+       ENTRY;
 
-               rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
-               if (rc != 0)
-                       GOTO(out_put, rc);
+       memset(ldata, 0, sizeof(*ldata));
+       rc = mdd_linkea_prepare(env, sobj, mdd_object_fid(spobj), sname,
+                               mdd_object_fid(tpobj), tname, 1, 0, ldata);
+       if (rc)
+               RETURN(rc);
 
-               if (is_dir) {
-                       rc = __mdd_index_delete_only(env, child, dotdot,
-                                                    handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+       /*
+        * Then it will check if the file should be migrated. If the file has
+        * mulitple links, we only need migrate the file if all of its entries
+        * has been migrated to the remote MDT.
+        */
+       if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
+               RETURN(0);
 
-                       rc = __mdd_index_insert_only(env, child,
-                                        mdd_object_fid(mdd_tobj), S_IFDIR,
-                                        dotdot, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
+       /* If there are still links locally, don't migrate this file */
+       LASSERT(ldata->ld_leh != NULL);
 
-               rc = mdd_linkea_update_child(env, mdd_sobj, mdd_tobj,
-                                            child, name,
-                                            strlen(name), handle);
+       /*
+        * If linkEA is overflow, it means there are some unknown name entries
+        * under unknown parents, which will prevent the migration.
+        */
+       if (unlikely(ldata->ld_leh->leh_overflow_time))
+               RETURN(-EOVERFLOW);
 
-out_put:
-               mdd_write_unlock(env, child);
-               mdd_object_put(env, child);
-               rc = mdd_trans_stop(env, mdd, rc, handle);
-               if (rc != 0)
-                       GOTO(out, rc);
-next:
-               result = iops->next(env, it);
-               if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
-                       GOTO(out, rc = -EINTR);
+       rc = mdd_fld_lookup(env, mdd, mdd_object_fid(sobj), &source_mdt_index);
+       if (rc)
+               RETURN(rc);
 
-               if (result == -ESTALE)
-                       goto next;
-       } while (result == 0);
-out:
-       iops->put(env, it);
-       iops->fini(env, it);
-out_ent:
-       OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
+       rc = mdd_iterate_linkea(env, sobj, NULL, tname, mdd_object_fid(tpobj),
+                               ldata, &source_mdt_index, NULL,
+                               mdd_is_link_on_source_mdt);
        RETURN(rc);
 }
 
-static int mdd_declare_update_linkea(const struct lu_env *env,
-                                    struct mdd_object *mdd_pobj,
-                                    struct mdd_object *mdd_sobj,
-                                    struct mdd_object *mdd_tobj,
-                                    const struct lu_name *child_name,
-                                    struct linkea_data *ldata,
-                                    struct thandle *handle)
-{
-       return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                         child_name, ldata, handle, 1);
-}
-
-static int mdd_update_linkea(const struct lu_env *env,
-                            struct mdd_object *mdd_pobj,
-                            struct mdd_object *mdd_sobj,
-                            struct mdd_object *mdd_tobj,
-                            const struct lu_name *child_name,
-                            struct linkea_data *ldata,
-                            struct thandle *handle)
-{
-       return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                         child_name, ldata, handle, 0);
-}
-
-static int mdd_declare_migrate_update_name(const struct lu_env *env,
-                                          struct mdd_object *mdd_pobj,
-                                          struct mdd_object *mdd_sobj,
-                                          struct mdd_object *mdd_tobj,
-                                          const struct lu_name *lname,
-                                          struct lu_attr *la,
-                                          struct lu_attr *parent_la,
-                                          struct linkea_data *ldata,
-                                          struct thandle *handle)
+static int mdd_declare_migrate_update(const struct lu_env *env,
+                                     struct mdd_object *spobj,
+                                     struct mdd_object *tpobj,
+                                     struct mdd_object *obj,
+                                     const struct lu_name *sname,
+                                     const struct lu_name *tname,
+                                     struct lu_attr *attr,
+                                     struct lu_attr *spattr,
+                                     struct lu_attr *tpattr,
+                                     struct linkea_data *ldata,
+                                     struct md_attr *ma,
+                                     struct thandle *handle)
 {
-       struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lu_attr *la = &info->mti_la_for_fix;
        int rc;
 
-       /* Revert IMMUTABLE flag */
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
-       rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
-       if (rc != 0)
-               return rc;
-
-       /* delete entry from source dir */
-       rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
-       if (rc != 0)
+       rc = mdo_declare_index_delete(env, spobj, sname->ln_name, handle);
+       if (rc)
                return rc;
 
-       if (ldata->ld_buf != NULL) {
-               rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
-                                              mdd_tobj, lname, ldata, handle);
-               if (rc != 0)
+       if (S_ISDIR(attr->la_mode)) {
+               rc = mdo_declare_ref_del(env, spobj, handle);
+               if (rc)
                        return rc;
        }
 
-       if (S_ISREG(mdd_object_type(mdd_sobj))) {
-               rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
-                                          handle);
-               if (rc != 0)
+       rc = mdo_declare_index_insert(env, tpobj, mdd_object_fid(obj),
+                                     attr->la_mode & S_IFMT,
+                                     tname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_declare_links_add(env, obj, handle, ldata);
+       if (rc)
+               return rc;
+
+       if (S_ISDIR(attr->la_mode)) {
+               rc = mdo_declare_ref_add(env, tpobj, handle);
+               if (rc)
                        return rc;
+       }
+
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdo_declare_attr_set(env, spobj, la, handle);
+       if (rc)
+               return rc;
+
+       if (tpobj != spobj) {
+               rc = mdo_declare_attr_set(env, tpobj, la, handle);
+               if (rc)
+                       return rc;
+       }
+
+       return rc;
+}
+
+static int mdd_declare_migrate_create(const struct lu_env *env,
+                                     struct mdd_object *spobj,
+                                     struct mdd_object *tpobj,
+                                     struct mdd_object *sobj,
+                                     struct mdd_object *tobj,
+                                     const struct lu_name *sname,
+                                     const struct lu_name *tname,
+                                     struct lu_attr *spattr,
+                                     struct lu_attr *tpattr,
+                                     struct lu_attr *attr,
+                                     struct lu_buf *sbuf,
+                                     struct linkea_data *ldata,
+                                     struct md_attr *ma,
+                                     struct md_op_spec *spec,
+                                     struct dt_allocation_hint *hint,
+                                     struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct md_layout_change *mlc = &info->mti_mlc;
+       struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
+       int rc;
+
+       ENTRY;
+
+       if (S_ISDIR(attr->la_mode)) {
+               struct lmv_user_md *lum = spec->u.sp_ea.eadata;
+
+               mlc->mlc_opc = MD_LAYOUT_DETACH;
+               rc = mdo_declare_layout_change(env, sobj, mlc, handle);
+               if (rc)
+                       return rc;
+
+               lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
+       } else if (S_ISLNK(attr->la_mode)) {
+               spec->u.sp_symname = sbuf->lb_buf;
+       } else if (S_ISREG(attr->la_mode)) {
+               spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
+               spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
+       }
+
+       mdd_object_make_hint(env, tpobj, tobj, attr, spec, hint);
+
+       rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
+                               tname, attr, handle, spec, ldata, NULL, NULL,
+                               NULL, hint);
+       if (rc)
+               return rc;
+
+       /*
+        * tobj mode will be used in mdo_declare_layout_change(), but it's not
+        * createb yet, copy from sobj.
+        */
+       tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
+       tobj->mod_obj.mo_lu.lo_header->loh_attr |=
+               sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
+
+       if (S_ISDIR(attr->la_mode)) {
+               if (!lmv) {
+                       /* if sobj is not striped, fake a 1-stripe LMV */
+                       LASSERT(sizeof(info->mti_key) >
+                               lmv_mds_md_size(1, LMV_MAGIC_V1));
+                       lmv = (typeof(lmv))info->mti_key;
+                       memset(lmv, 0, sizeof(*lmv));
+                       lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+                       lmv->lmv_stripe_count = cpu_to_le32(1);
+                       lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
+                       fid_le_to_cpu(&lmv->lmv_stripe_fids[0],
+                                     mdd_object_fid(sobj));
+                       mlc->mlc_buf.lb_buf = lmv;
+                       mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
+               } else {
+                       mlc->mlc_buf = *sbuf;
+               }
+               mlc->mlc_opc = MD_LAYOUT_ATTACH;
+               rc = mdo_declare_layout_change(env, tobj, mlc, handle);
+               if (rc)
+                       return rc;
+       }
+
+       rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
+                               mdo_declare_xattr_set);
+       if (rc)
+               return rc;
+
+       if (S_ISREG(attr->la_mode)) {
+               struct lu_buf fid_buf;
 
                handle->th_complex = 1;
-               rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
-                                          XATTR_NAME_FID,
-                                          LU_XATTR_REPLACE, handle);
-               if (rc < 0)
+
+               /* target may be remote, update PFID via sobj. */
+               fid_buf.lb_buf = (void *)mdd_object_fid(tobj);
+               fid_buf.lb_len = sizeof(struct lu_fid);
+               rc = mdo_declare_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID,
+                                          0, handle);
+               if (rc)
+                       return rc;
+
+               rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+               if (rc)
                        return rc;
        }
 
-       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
-               rc = mdo_declare_ref_del(env, mdd_pobj, handle);
-               if (rc != 0)
+       if (!S_ISDIR(attr->la_mode)) {
+               rc = mdd_iterate_linkea(env, sobj, tobj, tname,
+                                       mdd_object_fid(tpobj), ldata, NULL,
+                                       handle, mdd_declare_update_link);
+               if (rc)
+                       return rc;
+       }
+
+       if (!S_ISDIR(attr->la_mode) || lmv) {
+               rc = mdo_declare_ref_del(env, sobj, handle);
+               if (rc)
                        return rc;
+
+               if (S_ISDIR(attr->la_mode)) {
+                       rc = mdo_declare_ref_del(env, sobj, handle);
+                       if (rc)
+                               return rc;
+               }
+
+               rc = mdo_declare_destroy(env, sobj, handle);
+               if (rc)
+                       return rc;
+       }
+
+       rc = mdd_declare_migrate_update(env, spobj, tpobj, tobj, sname, tname,
+                                       attr, spattr, tpattr, ldata, ma,
+                                       handle);
+       return rc;
+}
+
+/**
+ * migrate dirent from \a spobj to \a tpobj.
+ **/
+static int mdd_migrate_update(const struct lu_env *env,
+                             struct mdd_object *spobj,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *obj,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
+                             struct lu_attr *attr,
+                             struct lu_attr *spattr,
+                             struct lu_attr *tpattr,
+                             struct linkea_data *ldata,
+                             struct md_attr *ma,
+                             struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lu_attr *la = &info->mti_la_for_fix;
+       int rc;
+
+       ENTRY;
+
+       CDEBUG(D_INFO, "update "DFID" from "DFID"/%s to "DFID"/%s\n",
+              PFID(mdd_object_fid(obj)), PFID(mdd_object_fid(spobj)),
+              sname->ln_name, PFID(mdd_object_fid(tpobj)), tname->ln_name);
+
+       rc = __mdd_index_delete(env, spobj, sname->ln_name,
+                               S_ISDIR(attr->la_mode), handle);
+       if (rc)
+               RETURN(rc);
+
+       rc = __mdd_index_insert(env, tpobj, mdd_object_fid(obj),
+                               attr->la_mode & S_IFMT,
+                               tname->ln_name, handle);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_links_write(env, obj, ldata, handle);
+       if (rc)
+               RETURN(rc);
+
+       la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+       la->la_valid = LA_CTIME | LA_MTIME;
+       mdd_write_lock(env, spobj, DT_SRC_PARENT);
+       rc = mdd_update_time(env, spobj, spattr, la, handle);
+       mdd_write_unlock(env, spobj);
+       if (rc)
+               RETURN(rc);
+
+       if (tpobj != spobj) {
+               la->la_valid = LA_CTIME | LA_MTIME;
+               mdd_write_lock(env, tpobj, DT_TGT_PARENT);
+               rc = mdd_update_time(env, tpobj, tpattr, la, handle);
+               mdd_write_unlock(env, tpobj);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       RETURN(rc);
+}
+
+/**
+ * Migrate file/dir to target MDT.
+ *
+ * Create target according to \a spec, and then migrate xattrs, if it's
+ * directory, migrate source stripes to target.
+ *
+ * \param[in] env      execution environment
+ * \param[in] spobj    source parent object
+ * \param[in] tpobj    target parent object
+ * \param[in] sobj     source object
+ * \param[in] tobj     target object
+ * \param[in] lname    file name
+ * \param[in] spattr   source parent attributes
+ * \param[in] tpattr   target parent attributes
+ * \param[in] attr     source attributes
+ * \param[in] sbuf     source LMV buf
+ * \param[in] spec     migrate create spec
+ * \param[in] hint     target creation hint
+ * \param[in] handle   tranasction handle
+ *
+ * \retval     0 on success
+ * \retval     -errno on failure
+ **/
+static int mdd_migrate_create(const struct lu_env *env,
+                             struct mdd_object *spobj,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
+                             struct lu_attr *spattr,
+                             struct lu_attr *tpattr,
+                             struct lu_attr *attr,
+                             const struct lu_buf *sbuf,
+                             struct linkea_data *ldata,
+                             struct md_attr *ma,
+                             struct md_op_spec *spec,
+                             struct dt_allocation_hint *hint,
+                             struct thandle *handle)
+{
+       int rc;
+
+       ENTRY;
+
+       /*
+        * migrate sobj stripes to tobj if it's directory:
+        * 1. detach stripes from sobj.
+        * 2. attach stripes to tobj, see mdd_declare_migrate_mdt().
+        * 3. create stripes for tobj, see lod_xattr_set_lmv().
+        */
+       if (S_ISDIR(attr->la_mode)) {
+               struct mdd_thread_info *info = mdd_env_info(env);
+               struct md_layout_change *mlc = &info->mti_mlc;
+
+               mlc->mlc_opc = MD_LAYOUT_DETACH;
+
+               mdd_write_lock(env, sobj, DT_SRC_PARENT);
+               rc = mdo_layout_change(env, sobj, mlc, handle);
+               mdd_write_unlock(env, sobj);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       /* don't set nlink from sobj */
+       attr->la_valid &= ~LA_NLINK;
+
+       rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, NULL,
+                              hint, handle, false);
+       if (rc)
+               RETURN(rc);
+
+       mdd_write_lock(env, tobj, DT_TGT_CHILD);
+       rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle, mdo_xattr_set);
+       mdd_write_unlock(env, tobj);
+       if (rc)
+               RETURN(rc);
+
+       /* for regular file, update OST objects XATTR_NAME_FID */
+       if (S_ISREG(attr->la_mode)) {
+               struct lu_buf fid_buf;
+
+               /* target may be remote, update PFID via sobj. */
+               fid_buf.lb_buf = (void *)mdd_object_fid(tobj);
+               fid_buf.lb_len = sizeof(struct lu_fid);
+               rc = mdo_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID, 0,
+                                  handle);
+               if (rc)
+                       RETURN(rc);
+
+               /* delete LOV to avoid deleting OST objs when destroying sobj */
+               mdd_write_lock(env, sobj, DT_SRC_CHILD);
+               rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+               mdd_write_unlock(env, sobj);
+               /* O_DELAY_CREATE file may not have LOV, ignore -ENODATA */
+               if (rc && rc != -ENODATA)
+                       RETURN(rc);
+               rc = 0;
+       }
+
+       /* update links FID */
+       if (!S_ISDIR(attr->la_mode)) {
+               rc = mdd_iterate_linkea(env, sobj, tobj, tname,
+                                       mdd_object_fid(tpobj), ldata,
+                                       NULL, handle, mdd_update_link);
+               if (rc)
+                       RETURN(rc);
        }
 
-       /* new name */
-       rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
-                                     mdd_object_type(mdd_tobj),
-                                     lname->ln_name, handle);
-       if (rc != 0)
-               return rc;
+       /* don't destroy sobj if it's plain directory */
+       if (!S_ISDIR(attr->la_mode) || sbuf->lb_buf) {
+               mdd_write_lock(env, sobj, DT_SRC_CHILD);
+               rc = mdo_ref_del(env, sobj, handle);
+               if (!rc) {
+                       if (S_ISDIR(attr->la_mode))
+                               rc = mdo_ref_del(env, sobj, handle);
+                       if (!rc)
+                               rc = mdo_destroy(env, sobj, handle);
+               }
+               mdd_write_unlock(env, sobj);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       rc = mdd_migrate_update(env, spobj, tpobj, tobj, sname, tname, attr,
+                               spattr, tpattr, ldata, ma, handle);
+
+       RETURN(rc);
+}
+
+/* NB: if user issued different migrate command, we can't adjust it silently
+ * here, because this command will decide target MDT in subdir migration in
+ * LMV.
+ */
+static int mdd_migrate_cmd_check(struct mdd_device *mdd,
+                                const struct lmv_mds_md_v1 *lmv,
+                                const struct lmv_user_md_v1 *lum,
+                                const struct lu_name *lname)
+{
+       __u32 lum_stripe_count = lum->lum_stripe_count;
+       __u32 lmv_hash_type = lmv->lmv_hash_type;
+       char *mdt_hash_name[] = { "none",
+                                 LMV_HASH_NAME_ALL_CHARS,
+                                 LMV_HASH_NAME_FNV_1A_64,
+                                 LMV_HASH_NAME_CRUSH,
+       };
+
+       if (!lmv_is_sane(lmv))
+               return -EBADF;
+
+       /* if stripe_count unspecified, set to 1 */
+       if (!lum_stripe_count)
+               lum_stripe_count = cpu_to_le32(1);
+
+       lmv_hash_type &= ~cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
+
+       /* TODO: check specific MDTs */
+       if (lum_stripe_count != lmv->lmv_migrate_offset ||
+           lum->lum_stripe_offset != lmv->lmv_master_mdt_index ||
+           (lum->lum_hash_type && lum->lum_hash_type != lmv_hash_type)) {
+               CERROR("%s: '"DNAME"' migration was interrupted, run 'lfs migrate -m %d -c %d -H %s "DNAME"' to finish migration.\n",
+                       mdd2obd_dev(mdd)->obd_name, PNAME(lname),
+                       le32_to_cpu(lmv->lmv_master_mdt_index),
+                       le32_to_cpu(lmv->lmv_migrate_offset),
+                       mdt_hash_name[le32_to_cpu(lmv_hash_type)],
+                       PNAME(lname));
+               return -EPERM;
+       }
+
+       return -EALREADY;
+}
+
+/**
+ * Internal function to migrate directory or file between MDTs.
+ *
+ * migrate source to target in following steps:
+ *   1. create target, append source stripes after target's if it's directory,
+ *      migrate xattrs and update fid of source links.
+ *   2. update namespace: migrate dirent from source parent to target parent,
+ *      update file linkea, and destroy source if it's not needed any more.
+ *
+ * \param[in] env      execution environment
+ * \param[in] spobj    source parent object
+ * \param[in] tpobj    target parent object
+ * \param[in] sobj     source object
+ * \param[in] tobj     target object
+ * \param[in] sname    source file name
+ * \param[in] tname    target file name
+ * \param[in] spec     target creation spec
+ * \param[in] ma       used to update \a pobj mtime and ctime
+ *
+ * \retval             0 on success
+ * \retval             -errno on failure
+ */
+static int mdd_migrate_object(const struct lu_env *env,
+                             struct mdd_object *spobj,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
+                             struct md_op_spec *spec,
+                             struct md_attr *ma)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_device *mdd = mdo2mdd(&spobj->mod_obj);
+       struct lu_attr *spattr = &info->mti_pattr;
+       struct lu_attr *tpattr = &info->mti_tpattr;
+       struct lu_attr *attr = &info->mti_cattr;
+       struct linkea_data *ldata = &info->mti_link_data;
+       struct dt_allocation_hint *hint = &info->mti_hint;
+       struct lu_buf sbuf = { NULL };
+       struct lmv_mds_md_v1 *lmv;
+       struct thandle *handle;
+       int rc;
+
+       ENTRY;
+
+       rc = mdd_la_get(env, sobj, attr);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_la_get(env, spobj, spattr);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_la_get(env, tpobj, tpattr);
+       if (rc)
+               RETURN(rc);
+
+       if (S_ISDIR(attr->la_mode) && !spec->sp_migrate_nsonly) {
+               struct lmv_user_md_v1 *lum = spec->u.sp_ea.eadata;
+
+               LASSERT(lum);
+
+               /* if user use default value '0' for stripe_count, we need to
+                * adjust it to '1' to create a 1-stripe directory.
+                */
+               if (lum->lum_stripe_count == 0)
+                       lum->lum_stripe_count = cpu_to_le32(1);
+
+               rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
+               if (rc && rc != -ENODATA)
+                       GOTO(out, rc);
+
+               lmv = sbuf.lb_buf;
+               if (lmv) {
+                       if (!lmv_is_sane(lmv))
+                               GOTO(out, rc = -EBADF);
+                       if (lmv_is_migrating(lmv)) {
+                               rc = mdd_migrate_cmd_check(mdd, lmv, lum,
+                                                          sname);
+                               GOTO(out, rc);
+                       }
+               }
+       } else if (!S_ISDIR(attr->la_mode)) {
+               if (spobj == tpobj)
+                       GOTO(out, rc = -EALREADY);
+
+               /* update namespace only if @sobj is on MDT where @tpobj is. */
+               if (!mdd_object_remote(tpobj) && !mdd_object_remote(sobj))
+                       spec->sp_migrate_nsonly = true;
+
+               if (S_ISLNK(attr->la_mode)) {
+                       lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
+                       if (!sbuf.lb_buf)
+                               GOTO(out, rc = -ENOMEM);
+
+                       rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
+                       if (rc <= 0) {
+                               rc = rc ?: -EFAULT;
+                               CERROR("%s: "DFID" readlink failed: rc = %d\n",
+                                      mdd2obd_dev(mdd)->obd_name,
+                                      PFID(mdd_object_fid(sobj)), rc);
+                               GOTO(out, rc);
+                       }
+               }
+       }
+
+       /* linkea needs update upon FID or parent stripe change */
+       rc = mdd_migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, sname,
+                                       tname, attr, ldata);
+       if (rc > 0)
+               /* update namespace only if @sobj has link on its MDT. */
+               spec->sp_migrate_nsonly = true;
+       else if (rc < 0)
+               GOTO(out, rc);
+
+       rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
+                                     spattr, tpattr, attr);
+       if (rc)
+               GOTO(out, rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               GOTO(out, rc = PTR_ERR(handle));
+
+       if (spec->sp_migrate_nsonly)
+               rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, sname,
+                                               tname, attr, spattr, tpattr,
+                                               ldata, ma, handle);
+       else
+               rc = mdd_declare_migrate_create(env, spobj, tpobj, sobj, tobj,
+                                               sname, tname, spattr, tpattr,
+                                               attr, &sbuf, ldata, ma, spec,
+                                               hint, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, tname, sname,
+                                        handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       if (spec->sp_migrate_nsonly)
+               rc = mdd_migrate_update(env, spobj, tpobj, sobj, sname, tname,
+                                       attr, spattr, tpattr, ldata, ma,
+                                       handle);
+       else
+               rc = mdd_migrate_create(env, spobj, tpobj, sobj, tobj, sname,
+                                       tname, spattr, tpattr, attr, &sbuf,
+                                       ldata, ma, spec, hint, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0,
+                                   spec->sp_migrate_nsonly ? sobj : tobj,
+                                   mdd_object_fid(spobj), mdd_object_fid(sobj),
+                                   mdd_object_fid(tpobj), tname, sname,
+                                   handle);
+       if (rc)
+               GOTO(stop, rc);
+       EXIT;
+
+stop:
+       rc = mdd_trans_stop(env, mdd, rc, handle);
+out:
+       lu_buf_free(&sbuf);
+
+       return rc;
+}
+
+/**
+ * Migrate directory or file between MDTs.
+ *
+ * \param[in] env      execution environment
+ * \param[in] md_pobj  parent master object
+ * \param[in] md_sobj  source object
+ * \param[in] lname    file name
+ * \param[in] md_tobj  target object
+ * \param[in] spec     target creation spec
+ * \param[in] ma       used to update \a pobj mtime and ctime
+ *
+ * \retval             0 on success
+ * \retval             -errno on failure
+ */
+static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
+                      struct md_object *md_sobj, const struct lu_name *lname,
+                      struct md_object *md_tobj, struct md_op_spec *spec,
+                      struct md_attr *ma)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_device *mdd = mdo2mdd(md_pobj);
+       struct mdd_object *pobj = md2mdd_obj(md_pobj);
+       struct mdd_object *sobj = md2mdd_obj(md_sobj);
+       struct mdd_object *tobj = md2mdd_obj(md_tobj);
+       struct mdd_object *spobj = NULL;
+       struct mdd_object *tpobj = NULL;
+       struct lu_buf pbuf = { NULL };
+       struct lu_fid *fid = &info->mti_fid2;
+       struct lmv_mds_md_v1 *lmv;
+       int rc;
+
+       ENTRY;
+
+       /* locate source and target stripe on pobj, which are the real parent */
+       rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
+       if (rc < 0 && rc != -ENODATA)
+               RETURN(rc);
+
+       lmv = pbuf.lb_buf;
+       if (lmv) {
+               int index;
+
+               if (!lmv_is_sane(lmv))
+                       GOTO(out, rc = -EBADF);
+
+               /* locate target parent stripe */
+               /* fail check here to make sure top dir migration succeed. */
+               if (lmv_is_migrating(lmv) &&
+                   OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+                       GOTO(out, rc = -EIO);
+
+               index = lmv_name_to_stripe_index(lmv, lname->ln_name,
+                                                lname->ln_namelen);
+               if (index < 0)
+                       GOTO(out, rc = index);
+
+               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
+               tpobj = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(tpobj))
+                       GOTO(out, rc = PTR_ERR(tpobj));
+
+               /* locate source parent stripe */
+               if (lmv_is_layout_changing(lmv)) {
+                       index = lmv_name_to_stripe_index_old(lmv,
+                                                            lname->ln_name,
+                                                            lname->ln_namelen);
+                       if (index < 0)
+                               GOTO(out, rc = index);
+
+                       fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
+                       spobj = mdd_object_find(env, mdd, fid);
+                       if (IS_ERR(spobj))
+                               GOTO(out, rc = PTR_ERR(spobj));
+
+                       /* parent stripe unchanged */
+                       if (spobj == tpobj) {
+                               if (!lmv_is_restriping(lmv))
+                                       GOTO(out, rc = -EINVAL);
+                               GOTO(out, rc = -EALREADY);
+                       }
+               } else {
+                       spobj = tpobj;
+                       mdd_object_get(spobj);
+               }
+       } else {
+               tpobj = pobj;
+               spobj = pobj;
+               mdd_object_get(tpobj);
+               mdd_object_get(spobj);
+       }
+
+       rc = mdd_migrate_object(env, spobj, tpobj, sobj, tobj, lname, lname,
+                               spec, ma);
+       GOTO(out, rc);
+
+out:
+       if (!IS_ERR_OR_NULL(spobj))
+               mdd_object_put(env, spobj);
+       if (!IS_ERR_OR_NULL(tpobj))
+               mdd_object_put(env, tpobj);
+       lu_buf_free(&pbuf);
+
+       return rc;
+}
+
+static int mdd_declare_1sd_collapse(const struct lu_env *env,
+                                   struct mdd_object *pobj,
+                                   struct mdd_object *obj,
+                                   struct mdd_object *stripe,
+                                   struct lu_attr *attr,
+                                   struct md_layout_change *mlc,
+                                   struct lu_name *lname,
+                                   struct thandle *handle)
+{
+       int rc;
+
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+       rc = mdo_declare_layout_change(env, obj, mlc, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_insert(env, stripe, mdd_object_fid(pobj),
+                                     S_IFDIR, dotdot, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_iterate_xattrs(env, obj, stripe, false, handle,
+                               mdo_declare_xattr_set);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_attr_set(env, stripe, attr, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(stripe),
+                                     attr->la_mode, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_ref_del(env, obj, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_ref_del(env, obj, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_destroy(env, obj, handle);
+       if (rc)
+               return rc;
+
+       return rc;
+}
+
+/* transform one-stripe directory to a plain directory */
+static int mdd_1sd_collapse(const struct lu_env *env,
+                           struct mdd_object *pobj,
+                           struct mdd_object *obj,
+                           struct mdd_object *stripe,
+                           struct lu_attr *attr,
+                           struct md_layout_change *mlc,
+                           struct lu_name *lname,
+                           struct thandle *handle)
+{
+       int rc;
+
+       ENTRY;
+
+       /* replace 1-stripe directory with its stripe */
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+
+       mdd_write_lock(env, obj, DT_SRC_PARENT);
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       mdd_write_unlock(env, obj);
+       if (rc)
+               RETURN(rc);
+
+       mdd_write_lock(env, pobj, DT_SRC_PARENT);
+       mdd_write_lock(env, obj, DT_SRC_CHILD);
+
+       /* insert dotdot to stripe which points to parent */
+       rc = __mdd_index_insert_only(env, stripe, mdd_object_fid(pobj),
+                                    S_IFDIR, dotdot, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       /* copy xattrs including linkea */
+       rc = mdd_iterate_xattrs(env, obj, stripe, false, handle, mdo_xattr_set);
+       if (rc)
+               GOTO(out, rc);
+
+       /* delete LMV */
+       rc = mdo_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL);
-       if (rc != 0)
-               return rc;
+       /* don't set nlink from parent */
+       attr->la_valid &= ~LA_NLINK;
 
-       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
-               rc = mdo_declare_ref_add(env, mdd_pobj, handle);
-               if (rc != 0)
-                       return rc;
-       }
+       rc = mdo_attr_set(env, stripe, attr, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       /* delete old object */
-       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-       if (rc != 0)
-               return rc;
+       /* delete dir name from parent */
+       rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
-               /* delete old object */
-               rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-               if (rc != 0)
-                       return rc;
-               /* set nlink to 0 */
-               rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
-               if (rc != 0)
-                       return rc;
-       }
+       /* insert stripe to parent with dir name */
+       rc = __mdd_index_insert_only(env, pobj, mdd_object_fid(stripe),
+                                    attr->la_mode, lname->ln_name, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
+       /* destroy dir obj */
+       rc = mdo_ref_del(env, obj, handle);
        if (rc)
-               return rc;
+               GOTO(out, rc);
 
-       rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
-       if (rc != 0)
-               return rc;
+       rc = mdo_ref_del(env, obj, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       rc = mdd_declare_changelog_store(env, mdd, lname, NULL, handle);
+       rc = mdo_destroy(env, obj, handle);
+       if (rc)
+               GOTO(out, rc);
+
+       EXIT;
+out:
+       mdd_write_unlock(env, obj);
+       mdd_write_unlock(env, pobj);
 
        return rc;
 }
 
-static int mdd_migrate_update_name(const struct lu_env *env,
-                                  struct mdd_object *mdd_pobj,
-                                  struct mdd_object *mdd_sobj,
-                                  struct mdd_object *mdd_tobj,
-                                  const struct lu_name *lname,
-                                  struct md_attr *ma)
+/*
+ * shrink directory stripes after migration/merge
+ */
+int mdd_dir_layout_shrink(const struct lu_env *env,
+                         struct md_object *md_obj,
+                         struct md_layout_change *mlc)
 {
-       struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
-       struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
-       struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
-       struct thandle          *handle;
-       int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
-       const char              *name = lname->ln_name;
-       int                     rc;
-       ENTRY;
+       struct mdd_device *mdd = mdo2mdd(md_obj);
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_object *obj = md2mdd_obj(md_obj);
+       struct mdd_object *pobj = NULL;
+       struct mdd_object *stripe = NULL;
+       struct lu_attr *attr = &info->mti_pattr;
+       struct lu_fid *fid = &info->mti_fid2;
+       struct lu_name lname = { NULL };
+       struct lu_buf lmv_buf = { NULL };
+       struct lmv_mds_md_v1 *lmv;
+       struct lmv_user_md *lmu;
+       struct thandle *handle;
+       int rc;
 
-       /* update time for parent */
-       LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-       p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
-       p_la->la_valid = LA_CTIME;
+       ENTRY;
 
-       rc = mdd_la_get(env, mdd_sobj, so_attr);
-       if (rc != 0)
+       rc = mdd_la_get(env, obj, attr);
+       if (rc)
                RETURN(rc);
 
-       ldata->ld_buf = NULL;
-       rc = mdd_links_read(env, mdd_sobj, ldata);
-       if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
+       if (!S_ISDIR(attr->la_mode))
+               RETURN(-ENOTDIR);
+
+       rc = mdd_stripe_get(env, obj, &lmv_buf, XATTR_NAME_LMV);
+       if (rc < 0)
                RETURN(rc);
 
-       handle = mdd_trans_create(env, mdd);
-       if (IS_ERR(handle))
-               RETURN(PTR_ERR(handle));
+       lmv = lmv_buf.lb_buf;
+       if (!lmv_is_sane(lmv))
+               RETURN(-EBADF);
 
-       rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                            lname, so_attr, p_la, ldata,
-                                            handle);
-       if (rc != 0) {
-               /* If the migration can not be fit in one transaction, just
-                * leave it in the original MDT */
-               if (rc == -E2BIG)
-                       GOTO(stop_trans, rc = 0);
-               else
-                       GOTO(stop_trans, rc);
-       }
+       lmu = mlc->mlc_buf.lb_buf;
 
-       CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
-              mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
-              PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
-              PFID(mdd_object_fid(mdd_tobj)));
+       /* adjust the default value '0' to '1' */
+       if (lmu->lum_stripe_count == 0)
+               lmu->lum_stripe_count = cpu_to_le32(1);
 
-       rc = mdd_trans_start(env, mdd, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       /* these were checked in MDT */
+       LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
+               le32_to_cpu(lmv->lmv_stripe_count));
+       LASSERT(!lmv_is_splitting(lmv));
+       LASSERT(lmv_is_migrating(lmv) || lmv_is_merging(lmv));
 
-       /* Revert IMMUTABLE flag */
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
-       rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       /* if dir stripe count will be shrunk to 1, it needs to be transformed
+        * to a plain dir, which will cause FID change and namespace update.
+        */
+       if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
+               struct linkea_data *ldata = &info->mti_link_data;
+               char *filename = info->mti_name;
 
-       /* Remove source name from source directory */
-       rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+               rc = mdd_links_read(env, obj, ldata);
+               if (rc)
+                       GOTO(out, rc);
 
-       if (ldata->ld_buf != NULL) {
-               rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                      lname, ldata, handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
+               if (ldata->ld_leh->leh_reccount > 1)
+                       GOTO(out, rc = -EINVAL);
 
-               /*  linkea update might decrease the source object
-                *  nlink, let's get the attr again after ref_del */
-               rc = mdd_la_get(env, mdd_sobj, so_attr);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-       }
+               linkea_first_entry(ldata);
+               if (!ldata->ld_lee)
+                       GOTO(out, rc = -ENODATA);
 
-       if (S_ISREG(so_attr->la_mode)) {
-               if (so_attr->la_nlink == 1) {
-                       rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
-                                          handle);
-                       if (rc != 0 && rc != -ENODATA)
-                               GOTO(stop_trans, rc);
-
-                       rc = mdo_xattr_set(env, mdd_tobj, NULL,
-                                          XATTR_NAME_FID,
-                                          LU_XATTR_REPLACE, handle);
-                       if (rc < 0)
-                               GOTO(stop_trans, rc);
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
+                                   fid);
+
+               /* Note: lname might miss \0 at the end */
+               snprintf(filename, sizeof(info->mti_name), "%.*s",
+                        lname.ln_namelen, lname.ln_name);
+               lname.ln_name = filename;
+
+               pobj = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(pobj)) {
+                       rc = PTR_ERR(pobj);
+                       pobj = NULL;
+                       GOTO(out, rc);
                }
-       }
 
-       /* Insert new fid with target name into target dir */
-       rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
-                               mdd_object_type(mdd_tobj), name, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
 
-       mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+               stripe = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(stripe)) {
+                       mdd_object_put(env, pobj);
+                       pobj = NULL;
+                       GOTO(out, rc = PTR_ERR(stripe));
+               }
+       }
 
-       mdd_sobj->mod_flags |= DEAD_OBJ;
-       rc = mdd_mark_orphan_object(env, mdd_sobj, handle, false);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               GOTO(out, rc = PTR_ERR(handle));
 
-       rc = __mdd_orphan_add(env, mdd_sobj, handle);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       mlc->mlc_opc = MD_LAYOUT_SHRINK;
+       rc = mdo_declare_layout_change(env, obj, mlc, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       mdo_ref_del(env, mdd_sobj, handle);
-       if (is_dir)
-               mdo_ref_del(env, mdd_sobj, handle);
+       if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
+               rc = mdd_declare_1sd_collapse(env, pobj, obj, stripe, attr, mlc,
+                                             &lname, handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
+       }
 
-       /* Get the attr again after ref_del */
-       rc = mdd_la_get(env, mdd_sobj, so_attr);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
+                                        handle);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       ma->ma_attr = *so_attr;
-       ma->ma_valid |= MA_INODE;
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       mdd_write_lock(env, obj, DT_SRC_PARENT);
+       mlc->mlc_opc = MD_LAYOUT_SHRINK;
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       mdd_write_unlock(env, obj);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
-                              mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
-                              mdo2fid(mdd_pobj), lname, lname, handle);
-       if (rc != 0) {
-               CWARN("%s: changelog for migrate %s "DFID
-                     "under "DFID" failed: rc = %d\n",
-                     mdd2obd_dev(mdd)->obd_name, lname->ln_name,
-                     PFID(mdd_object_fid(mdd_sobj)),
-                     PFID(mdd_object_fid(mdd_pobj)), rc);
-               /* Sigh, there are no easy way to migrate back the object, so
-                * let's reset the result to 0 for now XXX */
-               rc = 0;
+       if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
+               rc = mdd_1sd_collapse(env, pobj, obj, stripe, attr, mlc, &lname,
+                                     handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
        }
-out_unlock:
-       mdd_write_unlock(env, mdd_sobj);
+
+       rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
+                                           XATTR_NAME_LMV, handle);
+       GOTO(stop_trans, rc);
 
 stop_trans:
        rc = mdd_trans_stop(env, mdd, rc, handle);
-
-       RETURN(rc);
+out:
+       if (pobj) {
+               mdd_object_put(env, stripe);
+               mdd_object_put(env, pobj);
+       }
+       lu_buf_free(&lmv_buf);
+       return rc;
 }
 
-static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
-                         const struct lu_fid *fid, __u32 *mdt_index)
+static int mdd_dir_declare_split_plain(const struct lu_env *env,
+                                       struct mdd_device *mdd,
+                                       struct mdd_object *pobj,
+                                       struct mdd_object *obj,
+                                       struct mdd_object *tobj,
+                                       struct md_layout_change *mlc,
+                                       struct dt_allocation_hint *hint,
+                                       struct thandle *handle)
 {
-       struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
-       struct seq_server_site *ss;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       const struct lu_name *lname = mlc->mlc_name;
+       struct lu_attr *la = &info->mti_la_for_fix;
+       struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata;
+       struct linkea_data *ldata = &info->mti_link_data;
+       struct lmv_mds_md_v1 *lmv;
+       __u32 count;
        int rc;
 
-       ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+       rc = mdo_declare_layout_change(env, obj, mlc, handle);
+       if (rc)
+               return rc;
 
-       range->lsr_flags = LU_SEQ_RANGE_MDT;
-       rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
-       if (rc != 0)
+       memset(ldata, 0, sizeof(*ldata));
+       rc = mdd_linkea_prepare(env, obj, NULL, NULL, mdd_object_fid(pobj),
+                               lname, 1, 0, ldata);
+       if (rc)
                return rc;
 
-       *mdt_index = range->lsr_index;
+       count = lum->lum_stripe_count;
+       lum->lum_stripe_count = 0;
+       mdd_object_make_hint(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
+                            hint);
+       rc = mdd_declare_create(env, mdo2mdd(&pobj->mod_obj), pobj, tobj,
+                               lname, mlc->mlc_attr, handle, mlc->mlc_spec,
+                               ldata, NULL, NULL, NULL, hint);
+       if (rc)
+               return rc;
 
-       return 0;
+       /* tobj mode will be used in lod_declare_xattr_set(), but it's not
+        * created yet.
+        */
+       tobj->mod_obj.mo_lu.lo_header->loh_attr |= S_IFDIR;
+
+       lmv = (typeof(lmv))info->mti_key;
+       memset(lmv, 0, sizeof(*lmv));
+       lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+       lmv->lmv_stripe_count = cpu_to_le32(1);
+       lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
+       fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdd_object_fid(obj));
+
+       mlc->mlc_opc = MD_LAYOUT_ATTACH;
+       mlc->mlc_buf.lb_buf = lmv;
+       mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
+       rc = mdo_declare_layout_change(env, tobj, mlc, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_iterate_xattrs(env, obj, tobj, true, handle,
+                               mdo_declare_xattr_set);
+       if (rc)
+               return rc;
+
+       lum->lum_stripe_count = count;
+       mlc->mlc_opc = MD_LAYOUT_SPLIT;
+       rc = mdo_declare_layout_change(env, tobj, mlc, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
+                                     S_IFDIR, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdo_declare_attr_set(env, obj, la, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_attr_set(env, pobj, la, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+                                        handle);
+       return rc;
 }
+
 /**
- * Check whether we should migrate the file/dir
- * return val
- *     < 0  permission check failed or other error.
- *     = 0  the file can be migrated.
- *     > 0  the file does not need to be migrated, mostly
- *          for multiple link file
- **/
-static int mdd_migrate_sanity_check(const struct lu_env *env,
-                                   struct mdd_object *pobj,
-                                   const struct lu_attr *pattr,
-                                   struct mdd_object *sobj,
-                                   struct lu_attr *sattr)
+ * plain directory split:
+ * 1. create \a tobj as plain directory.
+ * 2. append \a obj as first stripe of \a tobj.
+ * 3. migrate xattrs from \a obj to \a tobj.
+ * 4. split \a tobj to specific stripe count.
+ */
+static int mdd_dir_split_plain(const struct lu_env *env,
+                               struct mdd_device *mdd,
+                               struct mdd_object *pobj,
+                               struct mdd_object *obj,
+                               struct mdd_object *tobj,
+                               struct md_layout_change *mlc,
+                               struct dt_allocation_hint *hint,
+                               struct thandle *handle)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct linkea_data      *ldata = &info->mti_link_data;
-       struct mdd_device       *mdd = mdo2mdd(&pobj->mod_obj);
-       int                     mgr_easize;
-       struct lu_buf           *mgr_buf;
-       int                     count;
-       int                     rc;
-       __u64 mdt_index;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lu_attr *pattr = &info->mti_pattr;
+       struct lu_attr *la = &info->mti_la_for_fix;
+       const struct lu_name *lname = mlc->mlc_name;
+       struct linkea_data *ldata = &info->mti_link_data;
+       int rc;
+
        ENTRY;
 
-       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
-       mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
-       if (mgr_buf->lb_buf == NULL)
-               RETURN(-ENOMEM);
+       /* copy linkea out and set on target later */
+       rc = mdd_links_read(env, obj, ldata);
+       if (rc)
+               RETURN(rc);
 
-       rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
-       if (rc > 0) {
-               union lmv_mds_md *lmm = mgr_buf->lb_buf;
-
-               /* If the object has migrateEA, it means IMMUTE flag
-                * is being set by previous migration process, so it
-                * needs to override the IMMUTE flag, otherwise the
-                * following sanity check will fail */
-               if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
-                                               LMV_HASH_FLAG_MIGRATION) {
-                       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
-
-                       sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
-                       CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
-                              mdd2obd_dev(mdd)->obd_name,
-                              PFID(mdd_object_fid(sobj)));
-               }
-       }
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       if (rc)
+               RETURN(rc);
 
-       rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
-                                    sobj, sattr, NULL, NULL);
-       if (rc != 0)
+       /* don't set nlink from obj */
+       mlc->mlc_attr->la_valid &= ~LA_NLINK;
+
+       rc = mdd_create_object(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
+                              NULL, NULL, NULL, hint, handle, false);
+       if (rc)
                RETURN(rc);
 
-       /* Then it will check if the file should be migrated. If the file
-        * has mulitple links, we only need migrate the file if all of its
-        * entries has been migrated to the remote MDT */
-       if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
-               RETURN(0);
+       rc = mdd_iterate_xattrs(env, obj, tobj, true, handle, mdo_xattr_set);
+       if (rc)
+               RETURN(rc);
 
-       rc = mdd_links_read(env, sobj, ldata);
-       if (rc != 0) {
-               /* For multiple links files, if there are no linkEA data at all,
-                * means the file might be created before linkEA is enabled, and
-                * all of its links should not be migrated yet, otherwise it
-                * should have some linkEA there */
-               if (rc == -ENOENT || rc == -ENODATA)
-                       RETURN(1);
+       rc = mdd_links_write(env, tobj, ldata, handle);
+       if (rc)
                RETURN(rc);
-       }
 
-       mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
-       /* If there are still links locally, then the file will not be
-        * migrated. */
-       LASSERT(ldata->ld_leh != NULL);
+       rc = __mdd_index_delete(env, pobj, lname->ln_name, true, handle);
+       if (rc)
+               RETURN(rc);
 
-       /* If the linkEA is overflow, then means there are some unknown name
-        * entries under unknown parents, that will prevent the migration. */
-       if (unlikely(ldata->ld_leh->leh_overflow_time))
-               RETURN(1);
+       rc = __mdd_index_insert(env, pobj, mdd_object_fid(tobj), S_IFDIR,
+                               lname->ln_name, handle);
+       if (rc)
+               RETURN(rc);
 
-       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
-       for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
-               struct lu_name          lname;
-               struct lu_fid           fid;
-               __u32                   parent_mdt_index;
+       la->la_ctime = la->la_mtime = mlc->mlc_attr->la_mtime;
+       la->la_valid = LA_CTIME | LA_MTIME;
 
-               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
-                                   &lname, &fid);
-               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
-                                                        ldata->ld_reclen);
+       mdd_write_lock(env, obj, DT_SRC_CHILD);
+       rc = mdd_update_time(env, tobj, mlc->mlc_attr, la, handle);
+       mdd_write_unlock(env, obj);
+       if (rc)
+               RETURN(rc);
 
-               rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
-               if (rc != 0)
-                       RETURN(rc);
+       rc = mdd_la_get(env, pobj, pattr);
+       if (rc)
+               RETURN(rc);
 
-               /* Migrate the object only if none of its parents are on the
-                * current MDT. */
-               if (parent_mdt_index != mdt_index)
-                       continue;
+       la->la_valid = LA_CTIME | LA_MTIME;
 
-               CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
-                      PFID(mdd_object_fid(sobj)), lname.ln_namelen,
-                      lname.ln_name, PFID(&fid));
-               rc = 1;
-               break;
-       }
+       mdd_write_lock(env, pobj, DT_SRC_PARENT);
+       rc = mdd_update_time(env, pobj, pattr, la, handle);
+       mdd_write_unlock(env, pobj);
+       if (rc)
+               RETURN(rc);
 
+       /* FID changes, record it as CL_MIGRATE */
+       rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
+                                   mdd_object_fid(pobj), mdd_object_fid(obj),
+                                   mdd_object_fid(pobj), lname, lname, handle);
        RETURN(rc);
 }
 
-static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
-                      struct md_object *sobj, const struct lu_name *lname,
-                      struct md_object *tobj, struct md_attr *ma)
+int mdd_dir_layout_split(const struct lu_env *env, struct md_object *o,
+                        struct md_layout_change *mlc)
 {
-       struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
-       struct mdd_device       *mdd = mdo2mdd(pobj);
-       struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
-       struct mdd_object       *mdd_tobj = md2mdd_obj(tobj);
-       struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
-       struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
-       bool                    created = false;
-       int                     rc;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_device *mdd = mdo2mdd(o);
+       struct mdd_object *obj = md2mdd_obj(o);
+       struct mdd_object *pobj = md2mdd_obj(mlc->mlc_parent);
+       struct mdd_object *tobj = md2mdd_obj(mlc->mlc_target);
+       struct dt_allocation_hint *hint = &info->mti_hint;
+       bool is_plain = false;
+       struct thandle *handle;
+       int rc;
 
        ENTRY;
-       /* If the file will being migrated, it will check whether
-        * the file is being opened by someone else right now */
-       mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
-       if (mdd_sobj->mod_count > 0) {
-               CDEBUG(D_OTHER,
-                      "%s: "DFID"%s is already opened count %d: rc = %d\n",
-                      mdd2obd_dev(mdd)->obd_name,
-                      PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
-                      mdd_sobj->mod_count, -EBUSY);
-               mdd_read_unlock(env, mdd_sobj);
-               GOTO(put, rc = -EBUSY);
-       }
-       mdd_read_unlock(env, mdd_sobj);
 
-       rc = mdd_la_get(env, mdd_sobj, so_attr);
-       if (rc != 0)
-               GOTO(put, rc);
+       LASSERT(S_ISDIR(mdd_object_type(obj)));
 
-       rc = mdd_la_get(env, mdd_pobj, pattr);
-       if (rc != 0)
-               GOTO(put, rc);
+       rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LMV);
+       if (rc == -ENODATA)
+               is_plain = true;
+       else if (rc < 0)
+               RETURN(rc);
 
-       rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
-       if (rc != 0) {
-               if (rc > 0)
-                       rc = 0;
-               GOTO(put, rc);
-       }
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
 
-       /* Sigh, it is impossible to finish all of migration in a single
-        * transaction, for example migrating big directory entries to the
-        * new MDT, it needs insert all of name entries of children in the
-        * new directory.
-        *
-        * So migration will be done in multiple steps and transactions.
-        *
-        * 1. create an orphan object on the remote MDT in one transaction.
-        * 2. migrate extend attributes to the new target file/directory.
-        * 3. For directory, migrate the entries to the new MDT and update
-        * linkEA of each children. Because we can not migrate all entries
-        * in a single transaction, so the migrating directory will become
-        * a striped directory during migration, so once the process is
-        * interrupted, the directory is still accessible. (During lookup,
-        * client will locate the name by searching both original and target
-        * object).
-        * 4. Finally, update the name/FID to point to the new file/directory
-        * in a separate transaction.
-        */
+       if (is_plain) {
+               rc = mdd_dir_declare_split_plain(env, mdd, pobj, obj, tobj, mlc,
+                                                hint, handle);
+       } else {
+               mlc->mlc_opc = MD_LAYOUT_SPLIT;
+               rc = mdo_declare_layout_change(env, obj, mlc, handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
 
-       /* step 1: Check whether the orphan object has been created, and create
-        * orphan object on the remote MDT if needed */
-       if (!mdd_object_exists(mdd_tobj)) {
-               rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                       lname, so_attr);
-               if (rc != 0)
-                       GOTO(put, rc);
-               created = true;
+               rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL,
+                                                NULL, handle);
        }
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       LASSERT(mdd_object_exists(mdd_tobj));
-       /* step 2: migrate xattr */
-       rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
-       if (rc != 0)
-               GOTO(put, rc);
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       /* step 3: migrate name entries to the orphan object */
-       if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
-               rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
-               if (rc != 0)
-                       GOTO(put, rc);
-               if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
-                                                 OBD_FAIL_MDS_REINT_NET_REP)))
-                       GOTO(put, rc = 0);
+       if (is_plain) {
+               rc = mdd_dir_split_plain(env, mdd, pobj, obj, tobj, mlc, hint,
+                                        handle);
        } else {
-               OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
+               mdd_write_lock(env, obj, DT_TGT_CHILD);
+               rc = mdo_xattr_set(env, obj, NULL, XATTR_NAME_LMV,
+                                  LU_XATTR_CREATE, handle);
+               mdd_write_unlock(env, obj);
+               if (rc)
+                       GOTO(stop_trans, rc);
+
+               rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
+                                                   XATTR_NAME_LMV, handle);
        }
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       LASSERT(mdd_object_exists(mdd_tobj));
-       /* step 4: update name entry to the new object */
-       rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
-                                    ma);
-       if (rc != 0)
-               GOTO(put, rc);
+       EXIT;
 
-       /* newly created target was not locked, don't cache its attributes */
-       if (created)
-               mdd_invalidate(env, tobj);
-put:
-       RETURN(rc);
+stop_trans:
+       rc = mdd_trans_stop(env, mdd, rc, handle);
+
+       return rc;
 }
 
 const struct md_dir_operations mdd_dir_ops = {