Whamcloud - gitweb
LU-9679 modules: convert MIN/MAX to kernel style
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index 38eef37..9247abb 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -46,6 +42,8 @@
 #include <obd_support.h>
 #include <lustre_mds.h>
 #include <lustre_fid.h>
+#include <lustre_lmv.h>
+#include <lustre_idmap.h>
 
 #include "mdd_internal.h"
 
@@ -53,8 +51,8 @@ static const char dot[] = ".";
 static const char dotdot[] = "..";
 
 static struct lu_name lname_dotdot = {
-        (char *) dotdot,
-        sizeof(dotdot) - 1
+       .ln_name        = (char *) dotdot,
+       .ln_namelen     = sizeof(dotdot) - 1,
 };
 
 static inline int
@@ -94,7 +92,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
        }
 
        rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
-                                           MOR_TGT_PARENT);
+                                           DT_TGT_PARENT);
        if (rc)
                RETURN(rc);
 
@@ -124,6 +122,77 @@ int mdd_lookup(const struct lu_env *env,
         RETURN(rc);
 }
 
+/** Read the link EA into a temp buffer.
+ * Uses the mdd_thread_info::mti_big_buf since it is generally large.
+ * A pointer to the buffer is stored in \a ldata::ld_buf.
+ *
+ * \retval 0 or error
+ */
+static int __mdd_links_read(const struct lu_env *env,
+                           struct mdd_object *mdd_obj,
+                           struct linkea_data *ldata)
+{
+       int rc;
+
+       if (!mdd_object_exists(mdd_obj))
+               return -ENODATA;
+
+       /* First try a small buf */
+       LASSERT(env != NULL);
+       ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
+                                              PAGE_SIZE);
+       if (ldata->ld_buf->lb_buf == NULL)
+               return -ENOMEM;
+
+       rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
+       if (rc == -ERANGE) {
+               /* Buf was too small, figure out what we need. */
+               lu_buf_free(ldata->ld_buf);
+               rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+                                  XATTR_NAME_LINK);
+               if (rc < 0)
+                       return rc;
+               ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+               if (ldata->ld_buf->lb_buf == NULL)
+                       return -ENOMEM;
+               rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+                                 XATTR_NAME_LINK);
+       }
+       if (rc < 0) {
+               lu_buf_free(ldata->ld_buf);
+               ldata->ld_buf = NULL;
+               return rc;
+       }
+
+       return linkea_init(ldata);
+}
+
+static int mdd_links_read(const struct lu_env *env,
+                         struct mdd_object *mdd_obj,
+                         struct linkea_data *ldata)
+{
+       int rc;
+
+       rc = __mdd_links_read(env, mdd_obj, ldata);
+       if (!rc)
+               rc = linkea_init(ldata);
+
+       return rc;
+}
+
+static int mdd_links_read_with_rec(const struct lu_env *env,
+                                  struct mdd_object *mdd_obj,
+                                  struct linkea_data *ldata)
+{
+       int rc;
+
+       rc = __mdd_links_read(env, mdd_obj, ldata);
+       if (!rc)
+               rc = linkea_init_with_rec(ldata);
+
+       return rc;
+}
+
 /**
  * Get parent FID of the directory
  *
@@ -151,17 +220,23 @@ static inline int mdd_parent_fid(const struct lu_env *env,
 
        ENTRY;
 
-       LASSERT(S_ISDIR(mdd_object_type(obj)));
+       LASSERTF(S_ISDIR(mdd_object_type(obj)),
+                "%s: FID "DFID" is not a directory type = %o\n",
+                mdd_obj_dev_name(obj), PFID(mdd_object_fid(obj)),
+                mdd_object_type(obj));
 
        buf = lu_buf_check_and_alloc(buf, PATH_MAX);
        if (buf->lb_buf == NULL)
                GOTO(lookup, rc = 0);
 
        ldata.ld_buf = buf;
-       rc = mdd_links_read(env, obj, &ldata);
+       rc = mdd_links_read_with_rec(env, obj, &ldata);
        if (rc != 0)
                GOTO(lookup, rc);
 
+       /* the obj is not locked, don't cache attributes */
+       mdd_invalidate(env, &obj->mod_obj);
+
        LASSERT(ldata.ld_leh != NULL);
        /* Directory should only have 1 parent */
        if (ldata.ld_leh->leh_reccount > 1)
@@ -188,61 +263,56 @@ int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
 }
 
 /*
- * return 1: if lf is the fid of the ancestor of p1;
+ * return 1: if \a tfid is the fid of the ancestor of \a mo;
  * return 0: if not;
- *
- * return -EREMOTE: if remote object is found, in this
- * case fid of remote object is saved to @pf;
- *
  * otherwise: values < 0, errors.
  */
 static int mdd_is_parent(const struct lu_env *env,
                        struct mdd_device *mdd,
-                       struct mdd_object *p1,
+                       struct mdd_object *mo,
                        const struct lu_attr *attr,
-                       const struct lu_fid *lf,
-                       struct lu_fid *pf)
+                       const struct lu_fid *tfid)
 {
-        struct mdd_object *parent = NULL;
-        struct lu_fid *pfid;
-        int rc;
-        ENTRY;
+       struct mdd_object *mp;
+       struct lu_fid *pfid;
+       int rc;
+
+       LASSERT(!lu_fid_eq(mdo2fid(mo), tfid));
+       pfid = &mdd_env_info(env)->mti_fid;
 
-        LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
-        pfid = &mdd_env_info(env)->mti_fid;
+       if (mdd_is_root(mdd, mdo2fid(mo)))
+               return 0;
+
+       if (mdd_is_root(mdd, tfid))
+               return 1;
+
+       rc = mdd_parent_fid(env, mo, attr, pfid);
+       if (rc)
+               return rc;
+
+       while (1) {
+               if (lu_fid_eq(pfid, tfid))
+                       return 1;
+
+               if (mdd_is_root(mdd, pfid))
+                       return 0;
 
-        /* Check for root first. */
-        if (mdd_is_root(mdd, mdo2fid(p1)))
-                RETURN(0);
+               mp = mdd_object_find(env, mdd, pfid);
+               if (IS_ERR(mp))
+                       return PTR_ERR(mp);
 
-        for(;;) {
-               /* this is done recursively */
-               rc = mdd_parent_fid(env, p1, attr, pfid);
+               if (!mdd_object_exists(mp)) {
+                       mdd_object_put(env, mp);
+                       return -ENOENT;
+               }
+
+               rc = mdd_parent_fid(env, mp, attr, pfid);
+               mdd_object_put(env, mp);
                if (rc)
-                       GOTO(out, rc);
-                if (mdd_is_root(mdd, pfid))
-                        GOTO(out, rc = 0);
-               if (lu_fid_eq(pfid, &mdd->mdd_local_root_fid))
-                       GOTO(out, rc = 0);
-                if (lu_fid_eq(pfid, lf))
-                        GOTO(out, rc = 1);
-               if (parent != NULL)
-                       mdd_object_put(env, parent);
-
-               parent = mdd_object_find(env, mdd, pfid);
-               if (IS_ERR(parent))
-                       GOTO(out, rc = PTR_ERR(parent));
-
-               if (!mdd_object_exists(parent))
-                       GOTO(out, rc = -EINVAL);
+                       return rc;
+       }
 
-               p1 = parent;
-        }
-        EXIT;
-out:
-        if (parent && !IS_ERR(parent))
-                mdd_object_put(env, parent);
-        return rc;
+       return 0;
 }
 
 /*
@@ -250,36 +320,27 @@ out:
  *
  * returns 1: if fid is ancestor of @mo;
  * returns 0: if fid is not an ancestor of @mo;
- *
- * returns EREMOTE if remote object is found, fid of remote object is saved to
- * @fid;
- *
  * returns < 0: if error
  */
 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
-                 const struct lu_fid *fid, struct lu_fid *sfid)
+                 const struct lu_fid *fid)
 {
        struct mdd_device *mdd = mdo2mdd(mo);
        struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
        int rc;
        ENTRY;
 
+       if (!mdd_object_exists(md2mdd_obj(mo)))
+               RETURN(-ENOENT);
+
        if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
-               RETURN(0);
+               RETURN(-ENOTDIR);
 
        rc = mdd_la_get(env, md2mdd_obj(mo), attr);
        if (rc != 0)
                RETURN(rc);
 
-       rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid, sfid);
-       if (rc == 0) {
-               /* found root */
-               fid_zero(sfid);
-       } else if (rc == 1) {
-               /* found @fid is parent */
-               *sfid = *fid;
-               rc = 0;
-       }
+       rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), attr, fid);
        RETURN(rc);
 }
 
@@ -327,8 +388,12 @@ static int mdd_dir_is_empty(const struct lu_env *env,
 
                iops->put(env, it);
                iops->fini(env, it);
-       } else
+       } else {
                result = PTR_ERR(it);
+               /* -ENODEV means no valid stripe */
+               if (result == -ENODEV)
+                       RETURN(0);
+       }
        RETURN(result);
 }
 
@@ -392,7 +457,7 @@ int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
        if (check_perm)
                rc = mdd_permission_internal_locked(env, pobj, pattr,
                                                    MAY_WRITE | MAY_EXEC,
-                                                   MOR_TGT_PARENT);
+                                                   DT_TGT_PARENT);
        RETURN(rc);
 }
 
@@ -413,7 +478,7 @@ int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
 
        rc = mdd_permission_internal_locked(env, pobj, pattr,
                                            MAY_WRITE | MAY_EXEC,
-                                           MOR_TGT_PARENT);
+                                           DT_TGT_PARENT);
        if (rc != 0)
                RETURN(rc);
 
@@ -467,7 +532,7 @@ static int mdd_may_delete_entry(const struct lu_env *env,
                int rc;
                rc = mdd_permission_internal_locked(env, pobj, pattr,
                                            MAY_WRITE | MAY_EXEC,
-                                           MOR_TGT_PARENT);
+                                           DT_TGT_PARENT);
                if (rc)
                        RETURN(rc);
        }
@@ -612,15 +677,11 @@ static int __mdd_index_insert_only(const struct lu_env *env,
 
        if (dt_try_as_dir(env, next)) {
                struct dt_insert_rec    *rec = &mdd_env_info(env)->mti_dt_rec;
-               struct lu_ucred         *uc  = lu_ucred_check(env);
-               int                      ignore_quota;
 
                rec->rec_fid = lf;
                rec->rec_type = type;
-               ignore_quota = uc ? uc->uc_cap & CFS_CAP_SYS_RESOURCE_MASK : 1;
                rc = dt_insert(env, next, (const struct dt_rec *)rec,
-                              (const struct dt_key *)name, handle,
-                              ignore_quota);
+                              (const struct dt_key *)name, handle);
        } else {
                rc = -ENOTDIR;
        }
@@ -637,7 +698,7 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
 
        rc = __mdd_index_insert_only(env, pobj, lf, type, name, handle);
        if (rc == 0 && S_ISDIR(type)) {
-               mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+               mdd_write_lock(env, pobj, DT_TGT_PARENT);
                mdo_ref_add(env, pobj, handle);
                mdd_write_unlock(env, pobj);
        }
@@ -650,17 +711,17 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
                              const char *name, int is_dir,
                              struct thandle *handle)
 {
-        int               rc;
-        ENTRY;
+       int rc;
+       ENTRY;
 
        rc = __mdd_index_delete_only(env, pobj, name, handle);
-        if (rc == 0 && is_dir) {
-                mdd_write_lock(env, pobj, MOR_TGT_PARENT);
-                mdo_ref_del(env, pobj, handle);
-                mdd_write_unlock(env, pobj);
-        }
+       if (rc == 0 && is_dir) {
+               mdd_write_lock(env, pobj, DT_TGT_PARENT);
+               mdo_ref_del(env, pobj, handle);
+               mdd_write_unlock(env, pobj);
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdd_llog_record_calc_size(const struct lu_env *env,
@@ -668,23 +729,24 @@ static int mdd_llog_record_calc_size(const struct lu_env *env,
                                     const struct lu_name *sname)
 {
        const struct lu_ucred   *uc = lu_ucred(env);
-       enum changelog_rec_flags crf = 0;
-       size_t                   hdr_size = sizeof(struct llog_changelog_rec) -
-                                           sizeof(struct changelog_rec);
+       enum changelog_rec_flags clf_flags = CLF_EXTRA_FLAGS;
+       enum changelog_rec_extra_flags crfe = CLFE_UIDGID | CLFE_NID;
 
        if (sname != NULL)
-               crf |= CLF_RENAME;
+               clf_flags |= CLF_RENAME;
 
        if (uc != NULL && uc->uc_jobid[0] != '\0')
-               crf |= CLF_JOBID;
+               clf_flags |= CLF_JOBID;
 
-       return llog_data_len(hdr_size + changelog_rec_offset(crf) +
+       return llog_data_len(LLOG_CHANGELOG_HDR_SZ +
+                            changelog_rec_offset(clf_flags, crfe) +
                             (tname != NULL ? tname->ln_namelen : 0) +
                             (sname != NULL ? 1 + sname->ln_namelen : 0));
 }
 
 int mdd_declare_changelog_store(const struct lu_env *env,
                                struct mdd_device *mdd,
+                               enum changelog_rec_type type,
                                const struct lu_name *tname,
                                const struct lu_name *sname,
                                struct thandle *handle)
@@ -697,8 +759,7 @@ int mdd_declare_changelog_store(const struct lu_env *env,
        int                              reclen;
        int                              rc;
 
-       /* Not recording */
-       if (!(mdd->mdd_cl.mc_flags & CLM_ON))
+       if (!mdd_changelog_enabled(env, mdd, type))
                return 0;
 
        reclen = mdd_llog_record_calc_size(env, tname, sname);
@@ -726,6 +787,47 @@ out_put:
        return rc;
 }
 
+int mdd_changelog_write_rec(const struct lu_env *env,
+                           struct llog_handle *loghandle,
+                           struct llog_rec_hdr *r,
+                           struct llog_cookie *cookie,
+                           int idx, struct thandle *th)
+{
+       int rc;
+
+       if (r->lrh_type == CHANGELOG_REC) {
+               struct mdd_device *mdd;
+               struct llog_changelog_rec *rec;
+
+               mdd = lu2mdd_dev(loghandle->lgh_ctxt->loc_obd->obd_lu_dev);
+               rec = container_of0(r, struct llog_changelog_rec, cr_hdr);
+
+               spin_lock(&mdd->mdd_cl.mc_lock);
+               rec->cr.cr_index = mdd->mdd_cl.mc_index + 1;
+               spin_unlock(&mdd->mdd_cl.mc_lock);
+
+               rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
+                                               cookie, idx, th);
+
+               /*
+                * if current llog is full, we will generate a new
+                * llog, and since it's actually not an error, let's
+                * avoid increasing index so that userspace apps
+                * should not see a gap in the changelog sequence
+                */
+               if (!(rc == -ENOSPC && llog_is_full(loghandle))) {
+                       spin_lock(&mdd->mdd_cl.mc_lock);
+                       ++mdd->mdd_cl.mc_index;
+                       spin_unlock(&mdd->mdd_cl.mc_lock);
+               }
+       } else {
+               rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
+                                               cookie, idx, th);
+       }
+
+       return rc;
+}
+
 /** Add a changelog entry \a rec to the changelog llog
  * \param mdd
  * \param rec
@@ -748,13 +850,6 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
        rec->cr_hdr.lrh_type = CHANGELOG_REC;
        rec->cr.cr_time = cl_time();
 
-       spin_lock(&mdd->mdd_cl.mc_lock);
-       /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
-        * but as long as the MDD transactions are ordered correctly for e.g.
-        * rename conflicts, I don't think this should matter. */
-       rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
-       spin_unlock(&mdd->mdd_cl.mc_lock);
-
        ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
        if (ctxt == NULL)
                return -ENXIO;
@@ -763,9 +858,42 @@ int mdd_changelog_store(const struct lu_env *env, struct mdd_device *mdd,
        if (IS_ERR(llog_th))
                GOTO(out_put, rc = PTR_ERR(llog_th));
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_CHANGELOG_REORDER, cfs_fail_val);
        /* nested journal transaction */
        rc = llog_add(env, ctxt->loc_handle, &rec->cr_hdr, NULL, llog_th);
 
+       /* time to recover some space ?? */
+       if (likely(!mdd->mdd_changelog_gc ||
+                  mdd->mdd_cl.mc_gc_task != MDD_CHLG_GC_NONE ||
+                  mdd->mdd_changelog_min_gc_interval >=
+                       ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time))
+               /* save a spin_lock trip */
+               goto out_put;
+       spin_lock(&mdd->mdd_cl.mc_lock);
+       if (likely(mdd->mdd_changelog_gc &&
+                    mdd->mdd_cl.mc_gc_task == MDD_CHLG_GC_NONE &&
+                    ktime_get_real_seconds() - mdd->mdd_cl.mc_gc_time >
+                       mdd->mdd_changelog_min_gc_interval)) {
+               if (unlikely(llog_cat_free_space(ctxt->loc_handle) <=
+                            mdd->mdd_changelog_min_free_cat_entries ||
+                            OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD))) {
+                       CWARN("%s:%s low on changelog_catalog free entries, "
+                             "starting ChangeLog garbage collection thread\n",
+                             obd->obd_name,
+                             OBD_FAIL_CHECK(OBD_FAIL_FORCE_GC_THREAD) ?
+                               " simulate" : "");
+
+                       /* indicate further kthread run will occur outside
+                        * right after current journal transaction filling has
+                        * completed
+                        */
+                       mdd->mdd_cl.mc_gc_task = MDD_CHLG_GC_NEED;
+               }
+               /* next check in mdd_changelog_min_gc_interval anyway
+                */
+               mdd->mdd_cl.mc_gc_time = ktime_get_real_seconds();
+       }
+       spin_unlock(&mdd->mdd_cl.mc_lock);
 out_put:
        llog_ctxt_put(ctxt);
        if (rc > 0)
@@ -778,13 +906,15 @@ static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
                                         const struct lu_fid *spfid,
                                         const struct lu_name *sname)
 {
-       struct changelog_ext_rename     *rnm = changelog_rec_rename(rec);
-       size_t                           extsize = sname->ln_namelen + 1;
+       struct changelog_ext_rename *rnm = changelog_rec_rename(rec);
+       size_t extsize;
 
        LASSERT(sfid != NULL);
        LASSERT(spfid != NULL);
        LASSERT(sname != NULL);
 
+       extsize = sname->ln_namelen + 1;
+
        rnm->cr_sfid = *sfid;
        rnm->cr_spfid = *spfid;
 
@@ -795,7 +925,7 @@ static void mdd_changelog_rec_ext_rename(struct changelog_rec *rec,
 
 void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
 {
-       struct changelog_ext_jobid      *jid = changelog_rec_jobid(rec);
+       struct changelog_ext_jobid *jid = changelog_rec_jobid(rec);
 
        if (jobid == NULL || jobid[0] == '\0')
                return;
@@ -803,6 +933,45 @@ void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
        strlcpy(jid->cr_jobid, jobid, sizeof(jid->cr_jobid));
 }
 
+void mdd_changelog_rec_ext_extra_flags(struct changelog_rec *rec, __u64 eflags)
+{
+       struct changelog_ext_extra_flags *ef = changelog_rec_extra_flags(rec);
+
+       ef->cr_extra_flags = eflags;
+}
+
+void mdd_changelog_rec_extra_uidgid(struct changelog_rec *rec,
+                                   __u64 uid, __u64 gid)
+{
+       struct changelog_ext_uidgid *uidgid = changelog_rec_uidgid(rec);
+
+       uidgid->cr_uid = uid;
+       uidgid->cr_gid = gid;
+}
+
+void mdd_changelog_rec_extra_nid(struct changelog_rec *rec,
+                                lnet_nid_t nid)
+{
+       struct changelog_ext_nid *clnid = changelog_rec_nid(rec);
+
+       clnid->cr_nid = nid;
+}
+
+void mdd_changelog_rec_extra_omode(struct changelog_rec *rec, u32 flags)
+{
+       struct changelog_ext_openmode *omd = changelog_rec_openmode(rec);
+
+       omd->cr_openflags = flags;
+}
+
+void mdd_changelog_rec_extra_xattr(struct changelog_rec *rec,
+                                  const char *xattr_name)
+{
+       struct changelog_ext_xattr *xattr = changelog_rec_xattr(rec);
+
+       strlcpy(xattr->cr_xattr, xattr_name, sizeof(xattr->cr_xattr));
+}
+
 /** Store a namespace change changelog record
  * If this fails, we must fail the whole transaction; we don't
  * want the change to commit without the log entry.
@@ -817,7 +986,7 @@ void mdd_changelog_rec_ext_jobid(struct changelog_rec *rec, const char *jobid)
 int mdd_changelog_ns_store(const struct lu_env *env,
                           struct mdd_device *mdd,
                           enum changelog_rec_type type,
-                          enum changelog_rec_flags crf,
+                          enum changelog_rec_flags clf_flags,
                           struct mdd_object *target,
                           const struct lu_fid *tpfid,
                           const struct lu_fid *sfid,
@@ -830,14 +999,11 @@ int mdd_changelog_ns_store(const struct lu_env *env,
        struct llog_changelog_rec       *rec;
        struct lu_buf                   *buf;
        int                              reclen;
+       __u64                            xflags = CLFE_INVALID;
        int                              rc;
        ENTRY;
 
-       /* Not recording */
-       if (!(mdd->mdd_cl.mc_flags & CLM_ON))
-               RETURN(0);
-
-       if ((mdd->mdd_cl.mc_mask & (1 << type)) == 0)
+       if (!mdd_changelog_enabled(env, mdd, type))
                RETURN(0);
 
        LASSERT(tpfid != NULL);
@@ -850,31 +1016,46 @@ int mdd_changelog_ns_store(const struct lu_env *env,
                RETURN(-ENOMEM);
        rec = buf->lb_buf;
 
-       crf &= CLF_FLAGMASK;
+       clf_flags &= CLF_FLAGMASK;
+       clf_flags |= CLF_EXTRA_FLAGS;
 
-       if (uc != NULL && uc->uc_jobid[0] != '\0')
-               crf |= CLF_JOBID;
+       if (uc) {
+               if (uc->uc_jobid[0] != '\0')
+                       clf_flags |= CLF_JOBID;
+               xflags |= CLFE_UIDGID;
+               xflags |= CLFE_NID;
+       }
 
        if (sname != NULL)
-               crf |= CLF_RENAME;
+               clf_flags |= CLF_RENAME;
        else
-               crf |= CLF_VERSION;
+               clf_flags |= CLF_VERSION;
+
+       rec->cr.cr_flags = clf_flags;
+
+       if (clf_flags & CLF_EXTRA_FLAGS) {
+               mdd_changelog_rec_ext_extra_flags(&rec->cr, xflags);
+               if (xflags & CLFE_UIDGID)
+                       mdd_changelog_rec_extra_uidgid(&rec->cr,
+                                                      uc->uc_uid, uc->uc_gid);
+               if (xflags & CLFE_NID)
+                       mdd_changelog_rec_extra_nid(&rec->cr, uc->uc_nid);
+       }
 
-       rec->cr.cr_flags = crf;
        rec->cr.cr_type = (__u32)type;
        rec->cr.cr_pfid = *tpfid;
        rec->cr.cr_namelen = tname->ln_namelen;
        memcpy(changelog_rec_name(&rec->cr), tname->ln_name, tname->ln_namelen);
 
-       if (crf & CLF_RENAME)
+       if (clf_flags & CLF_RENAME)
                mdd_changelog_rec_ext_rename(&rec->cr, sfid, spfid, sname);
 
-       if (crf & CLF_JOBID)
+       if (clf_flags & CLF_JOBID)
                mdd_changelog_rec_ext_jobid(&rec->cr, uc->uc_jobid);
 
        if (likely(target != NULL)) {
                rec->cr.cr_tfid = *mdo2fid(target);
-               target->mod_cltime = cfs_time_current_64();
+               target->mod_cltime = ktime_get();
        } else {
                fid_zero(&rec->cr.cr_tfid);
        }
@@ -966,41 +1147,32 @@ static int mdd_linkea_prepare(const struct lu_env *env,
                              struct linkea_data *ldata)
 {
        int rc = 0;
-       int rc2 = 0;
        ENTRY;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
-               return 0;
+               RETURN(0);
 
        LASSERT(oldpfid != NULL || newpfid != NULL);
 
-       if (mdd_obj->mod_flags & DEAD_OBJ) {
-               /* Prevent linkea to be updated which is NOT necessary. */
-               ldata->ld_reclen = 0;
-               /* No more links, don't bother */
+       if (mdd_obj->mod_flags & DEAD_OBJ)
+               /* Unnecessary to update linkEA for dead object.  */
                RETURN(0);
-       }
 
        if (oldpfid != NULL) {
                rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
                if (rc) {
-                       if ((check == 1) ||
-                           (rc != -ENODATA && rc != -ENOENT))
+                       if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
                                RETURN(rc);
+
                        /* No changes done. */
                        rc = 0;
                }
        }
 
        /* If renaming, add the new record */
-       if (newpfid != NULL) {
-               /* even if the add fails, we still delete the out-of-date
-                * old link */
-               rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
-                                     first, check);
-       }
-
-       rc = rc != 0 ? rc : rc2;
+       if (newpfid != NULL)
+               rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
+                                    first, check);
 
        RETURN(rc);
 }
@@ -1022,41 +1194,34 @@ int mdd_links_rename(const struct lu_env *env,
                ldata = &mdd_env_info(env)->mti_link_data;
                memset(ldata, 0, sizeof(*ldata));
                rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
-                                       newpfid, newlname, first, check,
-                                       ldata);
-               if (rc != 0)
+                                       newpfid, newlname, first, check, ldata);
+               if (rc)
                        GOTO(out, rc);
        }
 
-       if (ldata->ld_reclen != 0)
+       if (!(mdd_obj->mod_flags & DEAD_OBJ))
                rc = mdd_links_write(env, mdd_obj, ldata, handle);
-       EXIT;
+
+       GOTO(out, rc);
+
 out:
        if (rc != 0) {
-               int error = 1;
-               if (rc == -EOVERFLOW || rc == -ENOSPC)
-                       error = 0;
                if (newlname == NULL)
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea add failed %d "DFID"\n",
+                       CERROR("link_ea add failed %d "DFID"\n",
                               rc, PFID(mdd_object_fid(mdd_obj)));
                else if (oldpfid == NULL)
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea add '%.*s' failed %d "DFID"\n",
-                              newlname->ln_namelen, newlname->ln_name,
-                              rc, PFID(mdd_object_fid(mdd_obj)));
+                       CERROR("link_ea add '%.*s' failed %d "DFID"\n",
+                              newlname->ln_namelen, newlname->ln_name, rc,
+                              PFID(mdd_object_fid(mdd_obj)));
                else if (newpfid == NULL)
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea del '%.*s' failed %d "DFID"\n",
-                              oldlname->ln_namelen, oldlname->ln_name,
-                              rc, PFID(mdd_object_fid(mdd_obj)));
+                       CERROR("link_ea del '%.*s' failed %d "DFID"\n",
+                              oldlname->ln_namelen, oldlname->ln_name, rc,
+                              PFID(mdd_object_fid(mdd_obj)));
                else
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea rename '%.*s'->'%.*s' failed %d "
-                              DFID"\n",
-                              oldlname->ln_namelen, oldlname->ln_name,
-                              newlname->ln_namelen, newlname->ln_name,
-                              rc, PFID(mdd_object_fid(mdd_obj)));
+                       CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
+                              "\n", oldlname->ln_namelen, oldlname->ln_name,
+                              newlname->ln_namelen, newlname->ln_name, rc,
+                              PFID(mdd_object_fid(mdd_obj)));
        }
 
        if (is_vmalloc_addr(ldata->ld_buf))
@@ -1088,50 +1253,6 @@ static inline int mdd_links_del(const struct lu_env *env,
 }
 
 /** Read the link EA into a temp buffer.
- * Uses the mdd_thread_info::mti_big_buf since it is generally large.
- * A pointer to the buffer is stored in \a ldata::ld_buf.
- *
- * \retval 0 or error
- */
-int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
-                  struct linkea_data *ldata)
-{
-       int rc;
-
-       if (!mdd_object_exists(mdd_obj))
-               return -ENODATA;
-
-       /* First try a small buf */
-       LASSERT(env != NULL);
-       ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
-                                              PAGE_SIZE);
-       if (ldata->ld_buf->lb_buf == NULL)
-               return -ENOMEM;
-
-       rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
-       if (rc == -ERANGE) {
-               /* Buf was too small, figure out what we need. */
-               lu_buf_free(ldata->ld_buf);
-               rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
-                                  XATTR_NAME_LINK);
-               if (rc < 0)
-                       return rc;
-               ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
-               if (ldata->ld_buf->lb_buf == NULL)
-                       return -ENOMEM;
-               rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
-                                 XATTR_NAME_LINK);
-       }
-       if (rc < 0) {
-               lu_buf_free(ldata->ld_buf);
-               ldata->ld_buf = NULL;
-               return rc;
-       }
-
-       return linkea_init(ldata);
-}
-
-/** Read the link EA into a temp buffer.
  * Uses the name_buf since it is generally large.
  * \retval IS_ERR err
  * \retval ptr to \a lu_buf (always \a mti_big_buf)
@@ -1156,38 +1277,26 @@ int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
            ldata->ld_leh == NULL)
                return 0;
 
-       buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
-                               ldata->ld_leh->leh_len);
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
                return 0;
 
+again:
+       buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
+                               ldata->ld_leh->leh_len);
        rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
-       if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
-           mdd_object_remote(mdd_obj) == 0) {
-               struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
-               struct thandle  *sub_th;
-
-               /* XXX: If the linkEA is overflow, then we need to notify the
-                *      namespace LFSCK to skip "nlink" attribute verification
-                *      on this object to avoid the "nlink" to be shrinked by
-                *      wrong. It may be not good an interaction with LFSCK
-                *      like this. We will consider to replace it with other
-                *      mechanism in future. LU-5802. */
-               lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
-                              LFSCK_TYPE_NAMESPACE);
-
-               sub_th = thandle_get_sub_by_dt(env, handle,
-                               mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
-               lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
-                               lr, sub_th);
+       if (unlikely(rc == -ENOSPC)) {
+               rc = linkea_overflow_shrink(ldata);
+               if (likely(rc > 0))
+                       goto again;
        }
 
        return rc;
 }
 
-int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
-                         struct thandle *handle, struct linkea_data *ldata,
-                         enum mdd_links_add_overflow overflow)
+static int mdd_declare_links_add(const struct lu_env *env,
+                                struct mdd_object *mdd_obj,
+                                struct thandle *handle,
+                                struct linkea_data *ldata)
 {
        int     rc;
        int     ea_len;
@@ -1197,36 +1306,13 @@ int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
                ea_len = ldata->ld_leh->leh_len;
                linkea = ldata->ld_buf->lb_buf;
        } else {
-               ea_len = DEFAULT_LINKEA_SIZE;
+               ea_len = MAX_LINKEA_SIZE;
                linkea = NULL;
        }
 
-       /* XXX: max size? */
        rc = mdo_declare_xattr_set(env, mdd_obj,
                                   mdd_buf_get_const(env, linkea, ea_len),
                                   XATTR_NAME_LINK, 0, handle);
-       if (rc != 0)
-               return rc;
-
-       if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
-               struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
-               struct thandle  *sub_th;
-
-               /* XXX: If the linkEA is overflow, then we need to notify the
-                *      namespace LFSCK to skip "nlink" attribute verification
-                *      on this object to avoid the "nlink" to be shrinked by
-                *      wrong. It may be not good an interaction with LFSCK
-                *      like this. We will consider to replace it with other
-                *      mechanism in future. LU-5802. */
-               lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
-                              LFSCK_TYPE_NAMESPACE);
-
-               sub_th = thandle_get_sub_by_dt(env, handle,
-                               mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
-               rc = lfsck_in_notify(env,
-                                    mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
-                                    lr, sub_th);
-       }
 
        return rc;
 }
@@ -1240,7 +1326,7 @@ static inline int mdd_declare_links_del(const struct lu_env *env,
        /* For directory, the linkEA will be removed together
         * with the object. */
        if (!S_ISDIR(mdd_object_type(c)))
-               rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
+               rc = mdd_declare_links_add(env, c, handle, NULL);
 
        return rc;
 }
@@ -1269,12 +1355,6 @@ static int mdd_declare_link(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
-               rc = mdo_declare_ref_add(env, c, handle);
-               if (rc != 0)
-                       return rc;
-       }
-
        la->la_valid = LA_CTIME | LA_MTIME;
        rc = mdo_declare_attr_set(env, p, la, handle);
        if (rc != 0)
@@ -1285,12 +1365,12 @@ static int mdd_declare_link(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
-       rc = mdd_declare_links_add(env, c, handle, data,
-                       S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
+       rc = mdd_declare_links_add(env, c, handle, data);
        if (rc != 0)
                return rc;
 
-       rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
+       rc = mdd_declare_changelog_store(env, mdd, CL_HARDLINK, name, NULL,
+                                        handle);
 
        return rc;
 }
@@ -1320,6 +1400,15 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
        if (rc != 0)
                RETURN(rc);
 
+       /*
+        * If we are using project inheritance, we only allow hard link
+        * creation in our tree when the project IDs are the same;
+        * otherwise the tree quota mechanism could be circumvented.
+        */
+       if ((tattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
+           (tattr->la_projid != cattr->la_projid))
+               RETURN(-EXDEV);
+
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
@@ -1329,6 +1418,14 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
        LASSERT(ma->ma_attr.la_valid & LA_CTIME);
        la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
 
+       /* Note: even this function will change ldata, but it comes from
+        * thread_info, which is completely temporary and only seen in
+        * this function, so we do not need reset ldata once it fails.*/
+       rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL, mdo2fid(mdd_tobj),
+                               lname, 0, 0, ldata);
+       if (rc != 0)
+               GOTO(stop, rc);
+
        rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
                              la, ldata);
         if (rc)
@@ -1338,7 +1435,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         if (rc)
                 GOTO(stop, rc);
 
-       mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+       mdd_write_lock(env, mdd_sobj, DT_TGT_CHILD);
        rc = mdd_link_sanity_check(env, mdd_tobj, tattr, lname, mdd_sobj,
                                   cattr);
        if (rc)
@@ -1350,12 +1447,6 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                        GOTO(out_unlock, rc);
        }
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
-               rc = mdo_ref_add(env, mdd_sobj, handle);
-               if (rc != 0)
-                       GOTO(out_unlock, rc);
-       }
-
        *tfid = *mdo2fid(mdd_sobj);
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING3))
                tfid->f_oid = cfs_fail_val;
@@ -1374,17 +1465,12 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
 
        la->la_valid = LA_CTIME;
        rc = mdd_update_time(env, mdd_sobj, cattr, la, handle);
-       if (rc == 0) {
-               rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
-                                       mdo2fid(mdd_tobj), lname, 0, 0,
-                                       ldata);
-               if (rc == 0)
-                       mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
-                                     lname, handle, ldata, 0);
-               /* The failure of links_add should not cause the link
-                * failure, reset rc here */
-               rc = 0;
-       }
+       if (rc == 0)
+               /* Note: The failure of links_add should not cause the
+                * link failure, so do not check return value. */
+               mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
+                             lname, handle, ldata, 0);
+
        EXIT;
 out_unlock:
        mdd_write_unlock(env, mdd_sobj);
@@ -1408,9 +1494,6 @@ static int mdd_mark_orphan_object(const struct lu_env *env,
        struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
        int rc;
 
-       if (!S_ISDIR(mdd_object_type(obj)))
-               return 0;
-
        attr->la_valid = LA_FLAGS;
        attr->la_flags = LUSTRE_ORPHAN_FL;
 
@@ -1426,7 +1509,7 @@ static int mdd_declare_finish_unlink(const struct lu_env *env,
                                     struct mdd_object *obj,
                                     struct thandle *handle)
 {
-       int     rc;
+       int rc;
 
        /* Sigh, we do not know if the unlink object will become orphan in
         * declare phase, but fortunately the flags here does not matter
@@ -1439,7 +1522,7 @@ static int mdd_declare_finish_unlink(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
-       rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
+       rc = mdd_orphan_declare_insert(env, obj, mdd_object_type(obj), handle);
        if (rc != 0)
                return rc;
 
@@ -1464,7 +1547,7 @@ int mdd_finish_unlink(const struct lu_env *env,
                 * will be deleted during mdd_close() */
                obj->mod_flags |= DEAD_OBJ;
                if (obj->mod_count) {
-                       rc = __mdd_orphan_add(env, obj, th);
+                       rc = mdd_orphan_insert(env, obj, th);
                        if (rc == 0)
                                CDEBUG(D_HA, "Object "DFID" is inserted into "
                                        "orphan list, open count = %d\n",
@@ -1478,7 +1561,7 @@ int mdd_finish_unlink(const struct lu_env *env,
                                        obj->mod_count);
 
                        /* mark object as an orphan here, not
-                        * before __mdd_orphan_add() as racing
+                        * before mdd_orphan_insert() as racing
                         * mdd_la_get() may propagate ORPHAN_OBJ
                         * causing the asserition */
                        rc = mdd_mark_orphan_object(env, obj, th, false);
@@ -1559,7 +1642,8 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
                        return rc;
 
                /* FIXME: need changelog for remove entry */
-               rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
+               rc = mdd_declare_changelog_store(env, mdd, CL_UNLINK, name,
+                                                NULL, handle);
        }
 
        return rc;
@@ -1621,6 +1705,9 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
        int rc, is_dir = 0, cl_flags = 0;
        ENTRY;
 
+       /* let shutdown to start */
+       CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 1);
+
        /* cobj == NULL means only delete name entry */
        if (likely(cobj != NULL)) {
                mdd_cobj = md2mdd_obj(cobj);
@@ -1664,7 +1751,7 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                GOTO(stop, rc);
 
        if (likely(mdd_cobj != NULL))
-               mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
+               mdd_write_lock(env, mdd_cobj, DT_TGT_CHILD);
 
        if (likely(no_name == 0) && !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING2)) {
                rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
@@ -1836,7 +1923,8 @@ static int mdd_create_data(const struct lu_env *env,
        if (rc)
                GOTO(stop, rc);
 
-       rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+       rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
+                                        handle);
        if (rc)
                GOTO(stop, rc);
 
@@ -1890,8 +1978,8 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
 static int mdd_object_initialize(const struct lu_env *env,
                                 const struct lu_fid *pfid,
                                 struct mdd_object *child,
-                                struct lu_attr *attr, struct thandle *handle,
-                                const struct md_op_spec *spec)
+                                struct lu_attr *attr,
+                                struct thandle *handle)
 {
        int rc = 0;
        ENTRY;
@@ -1967,17 +2055,57 @@ static int mdd_create_sanity_check(const struct lu_env *env,
                check_perm = false;
        }
 
+       if (S_ISDIR(cattr->la_mode) &&
+           unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA) &&
+           spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen > 0) {
+               const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
+
+               if (!lmv_magic_supported(le32_to_cpu(lum->lum_magic)) &&
+                   le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC_V0) {
+                       rc = -EINVAL;
+                       CERROR("%s: invalid lmv_user_md: magic = %x, "
+                              "stripe_offset = %d, stripe_count = %u: "
+                              "rc = %d\n", mdd2obd_dev(m)->obd_name,
+                               le32_to_cpu(lum->lum_magic),
+                              (int)le32_to_cpu(lum->lum_stripe_offset),
+                              le32_to_cpu(lum->lum_stripe_count), rc);
+                       return rc;
+               }
+       }
+
        rc = mdd_may_create(env, obj, pattr, NULL, check_perm);
        if (rc != 0)
                RETURN(rc);
 
-        /* sgid check */
+       /* sgid check */
        if (pattr->la_mode & S_ISGID) {
+               struct lu_ucred *uc = lu_ucred(env);
+
                cattr->la_gid = pattr->la_gid;
+
+               /* Directories are special, and always inherit S_ISGID */
                if (S_ISDIR(cattr->la_mode)) {
                        cattr->la_mode |= S_ISGID;
                        cattr->la_valid |= LA_MODE;
+               } else if ((cattr->la_mode & (S_ISGID | S_IXGRP))
+                               == (S_ISGID | S_IXGRP) &&
+                          !lustre_in_group_p(uc,
+                                             (cattr->la_valid & LA_GID) ?
+                                             cattr->la_gid : pattr->la_gid) &&
+                          !md_capable(uc, CFS_CAP_FSETID)) {
+                       cattr->la_mode &= ~S_ISGID;
+                       cattr->la_valid |= LA_MODE;
+               }
+       }
+
+       /* Inherit project ID from parent directory */
+       if (pattr->la_flags & LUSTRE_PROJINHERIT_FL) {
+               cattr->la_projid = pattr->la_projid;
+               if (S_ISDIR(cattr->la_mode)) {
+                       cattr->la_flags |= LUSTRE_PROJINHERIT_FL;
+                       cattr->la_valid |= LA_FLAGS;
                }
+               cattr->la_valid |= LA_PROJID;
        }
 
        rc = mdd_name_check(m, lname);
@@ -2008,7 +2136,7 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_declare_object_create(const struct lu_env *env,
+static int mdd_declare_create_object(const struct lu_env *env,
                                     struct mdd_device *mdd,
                                     struct mdd_object *p, struct mdd_object *c,
                                     struct lu_attr *attr,
@@ -2016,18 +2144,19 @@ static int mdd_declare_object_create(const struct lu_env *env,
                                     const struct md_op_spec *spec,
                                     struct lu_buf *def_acl_buf,
                                     struct lu_buf *acl_buf,
+                                    struct lu_buf *hsm_buf,
                                     struct dt_allocation_hint *hint)
 {
        const struct lu_buf *buf;
        int rc;
 
-       rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
+       rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
                                                hint);
-        if (rc)
-                GOTO(out, rc);
+       if (rc)
+               GOTO(out, rc);
 
-#ifdef CONFIG_FS_POSIX_ACL
-       if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
+       if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
                /* if dir, then can inherit default ACl */
                rc = mdo_declare_xattr_set(env, c, def_acl_buf,
                                           XATTR_NAME_ACL_DEFAULT,
@@ -2036,7 +2165,7 @@ static int mdd_declare_object_create(const struct lu_env *env,
                        GOTO(out, rc);
        }
 
-       if (acl_buf->lb_len > 0) {
+       if (acl_buf && acl_buf->lb_len > 0) {
                rc = mdo_declare_attr_set(env, c, attr, handle);
                if (rc)
                        GOTO(out, rc);
@@ -2052,14 +2181,24 @@ static int mdd_declare_object_create(const struct lu_env *env,
                GOTO(out, rc);
 
        /* replay case, create LOV EA from client data */
-       if (spec->no_create ||
+       if ((!(spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE) && spec->no_create) ||
            (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
                buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
                                        spec->u.sp_ea.eadatalen);
-               rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
-                                          handle);
+               rc = mdo_declare_xattr_set(env, c, buf,
+                                          S_ISDIR(attr->la_mode) ?
+                                               XATTR_NAME_LMV : XATTR_NAME_LOV,
+                                          0, handle);
                if (rc)
                        GOTO(out, rc);
+
+               if (spec->sp_cr_flags & MDS_OPEN_PCC) {
+                       rc = mdo_declare_xattr_set(env, c, hsm_buf,
+                                                  XATTR_NAME_HSM,
+                                                  0, handle);
+                       if (rc)
+                               GOTO(out, rc);
+               }
        }
 
        if (S_ISLNK(attr->la_mode)) {
@@ -2096,12 +2235,13 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                              struct linkea_data *ldata,
                              struct lu_buf *def_acl_buf,
                              struct lu_buf *acl_buf,
+                             struct lu_buf *hsm_buf,
                              struct dt_allocation_hint *hint)
 {
        int rc;
 
-       rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
-                                      def_acl_buf, acl_buf, hint);
+       rc = mdd_declare_create_object(env, mdd, p, c, attr, handle, spec,
+                                      def_acl_buf, acl_buf, hsm_buf, hint);
        if (rc)
                GOTO(out, rc);
 
@@ -2112,18 +2252,19 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
        }
 
        if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
-               rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
+               rc = mdd_orphan_declare_insert(env, c, attr->la_mode, handle);
                if (rc)
                        GOTO(out, rc);
        } else {
-               struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
+               struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
+               enum changelog_rec_type type;
 
                rc = mdo_declare_index_insert(env, p, mdo2fid(c), attr->la_mode,
                                              name->ln_name, handle);
                if (rc != 0)
                        return rc;
 
-               rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
+               rc = mdd_declare_links_add(env, c, handle, ldata);
                if (rc)
                        return rc;
 
@@ -2133,7 +2274,12 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                if (rc)
                        return rc;
 
-               rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
+               type = S_ISDIR(attr->la_mode) ? CL_MKDIR :
+                      S_ISREG(attr->la_mode) ? CL_CREATE :
+                      S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD;
+
+               rc = mdd_declare_changelog_store(env, mdd, type, name, NULL,
+                                                handle);
                if (rc)
                        return rc;
        }
@@ -2154,7 +2300,7 @@ static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
                RETURN(0);
        }
 
-       mdd_read_lock(env, pobj, MOR_TGT_PARENT);
+       mdd_read_lock(env, pobj, DT_TGT_PARENT);
        rc = mdo_xattr_get(env, pobj, def_acl_buf,
                           XATTR_NAME_ACL_DEFAULT);
        mdd_read_unlock(env, pobj);
@@ -2186,18 +2332,19 @@ static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
 /**
  * Create a metadata object and initialize it, set acl, xattr.
  **/
-static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
+static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
                             struct mdd_object *son, struct lu_attr *attr,
                             struct md_op_spec *spec, struct lu_buf *acl_buf,
                             struct lu_buf *def_acl_buf,
+                            struct lu_buf *hsm_buf,
                             struct dt_allocation_hint *hint,
-                            struct thandle *handle)
+                            struct thandle *handle, bool initsecctx)
 {
-       const struct lu_buf    *buf;
-       int                     rc;
+       const struct lu_buf *buf;
+       int rc;
 
-       mdd_write_lock(env, son, MOR_TGT_CHILD);
-       rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
+       mdd_write_lock(env, son, DT_TGT_CHILD);
+       rc = mdd_create_object_internal(env, NULL, son, attr, handle, spec,
                                        hint);
        if (rc)
                GOTO(unlock, rc);
@@ -2206,8 +2353,7 @@ static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
         * created in declare phase, they also needs to be added to master
         * object as sub-directory entry. So it has to initialize the master
         * object, then set dir striped EA.(in mdo_xattr_set) */
-       rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
-                                  spec);
+       rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle);
        if (rc != 0)
                GOTO(err_destroy, rc);
 
@@ -2233,13 +2379,26 @@ static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
                                        spec->u.sp_ea.eadatalen);
                rc = mdo_xattr_set(env, son, buf,
                                   S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
-                                                           XATTR_NAME_LOV, 0,
-                                  handle);
+                                                           XATTR_NAME_LOV,
+                                  0, handle);
+               if (rc != 0)
+                       GOTO(err_destroy, rc);
+       }
+
+       if (S_ISREG(attr->la_mode) && spec->sp_cr_flags & MDS_OPEN_PCC) {
+               struct md_hsm mh;
+
+               memset(&mh, 0, sizeof(mh));
+               mh.mh_flags = HS_EXISTS | HS_ARCHIVED | HS_RELEASED;
+               mh.mh_arch_id = spec->sp_archive_id;
+               lustre_hsm2buf(hsm_buf->lb_buf, &mh);
+               rc = mdo_xattr_set(env, son, hsm_buf, XATTR_NAME_HSM,
+                                  0, handle);
                if (rc != 0)
                        GOTO(err_destroy, rc);
        }
 
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
        if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
            S_ISDIR(attr->la_mode)) {
                /* set default acl */
@@ -2260,25 +2419,20 @@ static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
 #endif
 
        if (S_ISLNK(attr->la_mode)) {
-               struct lu_ucred  *uc = lu_ucred_assert(env);
                struct dt_object *dt = mdd_object_child(son);
                const char *target_name = spec->u.sp_symname;
                int sym_len = strlen(target_name);
-               const struct lu_buf *buf;
                loff_t pos = 0;
 
                buf = mdd_buf_get_const(env, target_name, sym_len);
-               rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
-                                               uc->uc_cap &
-                                               CFS_CAP_SYS_RESOURCE_MASK);
-
+               rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle);
                if (rc == sym_len)
                        rc = 0;
                else
                        GOTO(err_initlized, rc = -EFAULT);
        }
 
-       if (spec->sp_cr_file_secctx_name != NULL) {
+       if (initsecctx && spec->sp_cr_file_secctx_name != NULL) {
                buf = mdd_buf_get_const(env, spec->sp_cr_file_secctx,
                                        spec->sp_cr_file_secctx_size);
                rc = mdo_xattr_set(env, son, buf, spec->sp_cr_file_secctx_name,
@@ -2350,12 +2504,51 @@ stop:
        RETURN(rc);
 }
 
-/*
+/**
  * Create object and insert it into namespace.
+ *
+ * Two operations have to be performed:
+ *
+ *  - an allocation of a new object (->do_create()), and
+ *  - an insertion into a parent index (->dio_insert()).
+ *
+ * Due to locking, operation order is not important, when both are
+ * successful, *but* error handling cases are quite different:
+ *
+ *  - if insertion is done first, and following object creation fails,
+ *  insertion has to be rolled back, but this operation might fail
+ *  also leaving us with dangling index entry.
+ *
+ *  - if creation is done first, is has to be undone if insertion fails,
+ *  leaving us with leaked space, which is not good but not fatal.
+ *
+ * It seems that creation-first is simplest solution, but it is sub-optimal
+ * in the frequent
+ *
+ * $ mkdir foo
+ * $ mkdir foo
+ *
+ * case, because second mkdir is bound to create object, only to
+ * destroy it immediately.
+ *
+ * To avoid this follow local file systems that do double lookup:
+ *
+ * 0. lookup -> -EEXIST (mdd_create_sanity_check())
+ * 1. create            (mdd_create_object_internal())
+ * 2. insert            (__mdd_index_insert(), lookup again)
+ *
+ * \param[in] pobj     parent object
+ * \param[in] lname    name of child being created
+ * \param[in,out] child        child object being created
+ * \param[in] spec     additional create parameters
+ * \param[in] ma       attributes for new child object
+ *
+ * \retval             0 on success
+ * \retval             negative errno on failure
  */
-static int mdd_create(const struct lu_env *env, struct md_object *pobj,
+int mdd_create(const struct lu_env *env, struct md_object *pobj,
                      const struct lu_name *lname, struct md_object *child,
-                     struct md_op_spec *spec, struct md_attrma)
+                     struct md_op_spec *spec, struct md_attr *ma)
 {
        struct mdd_thread_info  *info = mdd_env_info(env);
        struct lu_attr          *la = &info->mti_la_for_fix;
@@ -2367,6 +2560,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        struct lu_attr          *pattr = &info->mti_pattr;
        struct lu_buf           acl_buf;
        struct lu_buf           def_acl_buf;
+       struct lu_buf           hsm_buf;
        struct linkea_data      *ldata = &info->mti_link_data;
        const char              *name = lname->ln_name;
        struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
@@ -2374,42 +2568,6 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        int                      rc2;
        ENTRY;
 
-        /*
-         * Two operations have to be performed:
-         *
-         *  - an allocation of a new object (->do_create()), and
-         *
-         *  - an insertion into a parent index (->dio_insert()).
-         *
-         * Due to locking, operation order is not important, when both are
-         * successful, *but* error handling cases are quite different:
-         *
-         *  - if insertion is done first, and following object creation fails,
-         *  insertion has to be rolled back, but this operation might fail
-         *  also leaving us with dangling index entry.
-         *
-         *  - if creation is done first, is has to be undone if insertion
-         *  fails, leaving us with leaked space, which is neither good, nor
-         *  fatal.
-         *
-         * It seems that creation-first is simplest solution, but it is
-         * sub-optimal in the frequent
-         *
-         *         $ mkdir foo
-         *         $ mkdir foo
-         *
-         * case, because second mkdir is bound to create object, only to
-         * destroy it immediately.
-         *
-         * To avoid this follow local file systems that do double lookup:
-         *
-         *     0. lookup -> -EEXIST (mdd_create_sanity_check())
-         *
-         *     1. create            (mdd_object_create_internal())
-         *
-         *     2. insert            (__mdd_index_insert(), lookup again)
-         */
-
        rc = mdd_la_get(env, mdd_pobj, pattr);
        if (rc != 0)
                RETURN(rc);
@@ -2419,21 +2577,37 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        if (rc)
                RETURN(rc);
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
                GOTO(out_free, rc = -EINPROGRESS);
 
        handle = mdd_trans_create(env, mdd);
        if (IS_ERR(handle))
                GOTO(out_free, rc = PTR_ERR(handle));
 
-       acl_buf.lb_buf = info->mti_xattr_buf;
-       acl_buf.lb_len = sizeof(info->mti_xattr_buf);
+       lu_buf_check_and_alloc(&info->mti_xattr_buf,
+                       min_t(unsigned int, mdd->mdd_dt_conf.ddp_max_ea_size,
+                             XATTR_SIZE_MAX));
+       acl_buf = info->mti_xattr_buf;
        def_acl_buf.lb_buf = info->mti_key;
        def_acl_buf.lb_len = sizeof(info->mti_key);
        rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
        if (rc < 0)
                GOTO(out_stop, rc);
 
+       if (S_ISDIR(attr->la_mode)) {
+               struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
+
+               /*
+                * migrate may create 1-stripe directory, so lod_ah_init()
+                * doesn't adjust stripe count from lmu.
+                */
+               if (lmu && lmu->lum_stripe_count == cpu_to_le32(1)) {
+                       info->mti_lmu = *lmu;
+                       info->mti_lmu.lum_stripe_count = 0;
+                       spec->u.sp_ea.eadata = &info->mti_lmu;
+               }
+       }
+
        mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
 
        memset(ldata, 0, sizeof(*ldata));
@@ -2449,28 +2623,38 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
                                        lname, 1, 0, ldata);
        }
 
+       if (spec->sp_cr_flags & MDS_OPEN_PCC) {
+               LASSERT(spec->sp_cr_flags & MDS_OPEN_HAS_EA);
+
+               memset(&hsm_buf, 0, sizeof(hsm_buf));
+               lu_buf_alloc(&hsm_buf, sizeof(struct hsm_attrs));
+               if (hsm_buf.lb_buf == NULL)
+                       GOTO(out_stop, rc = -ENOMEM);
+       }
+
        rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
                                handle, spec, ldata, &def_acl_buf, &acl_buf,
-                               hint);
-        if (rc)
-                GOTO(out_stop, rc);
+                               &hsm_buf, hint);
+       if (rc)
+               GOTO(out_stop, rc);
 
-        rc = mdd_trans_start(env, mdd, handle);
-        if (rc)
-                GOTO(out_stop, rc);
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(out_stop, rc);
 
-       rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
-                              &def_acl_buf, hint, handle);
+       rc = mdd_create_object(env, mdd_pobj, son, attr, spec, &acl_buf,
+                              &def_acl_buf, &hsm_buf, hint, handle, true);
        if (rc != 0)
                GOTO(out_stop, rc);
 
        if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
-               mdd_write_lock(env, son, MOR_TGT_CHILD);
-               rc = __mdd_orphan_add(env, son, handle);
+               mdd_write_lock(env, son, DT_TGT_CHILD);
+               son->mod_flags |= VOLATILE_OBJ;
+               rc = mdd_orphan_insert(env, son, handle);
                GOTO(out_volatile, rc);
        } else {
                rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
-                                       attr->la_mode, name, handle);
+                                     attr->la_mode, name, handle);
                if (rc != 0)
                        GOTO(err_created, rc);
 
@@ -2489,7 +2673,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
 err_insert:
        if (rc != 0) {
                if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
-                       rc2 = __mdd_orphan_del(env, son, handle);
+                       rc2 = mdd_orphan_delete(env, son, handle);
                else
                        rc2 = __mdd_index_delete(env, mdd_pobj, name,
                                                 S_ISDIR(attr->la_mode),
@@ -2498,7 +2682,7 @@ err_insert:
                        goto out_stop;
 
 err_created:
-               mdd_write_lock(env, son, MOR_TGT_CHILD);
+               mdd_write_lock(env, son, DT_TGT_CHILD);
                if (S_ISDIR(attr->la_mode)) {
                        /* Drop the reference, no need to delete "."/"..",
                         * because the object is to be destroyed directly. */
@@ -2550,6 +2734,9 @@ out_free:
                /* if we vmalloced a large buffer drop it */
                lu_buf_free(ldata->ld_buf);
 
+       if (spec->sp_cr_flags & MDS_OPEN_PCC)
+               lu_buf_free(&hsm_buf);
+
        /* The child object shouldn't be cached anymore */
        if (rc)
                set_bit(LU_OBJECT_HEARD_BANSHEE,
@@ -2586,8 +2773,8 @@ static int mdd_rename_order(const struct lu_env *env,
         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
                 rc = MDD_RN_TGTSRC;
         } else {
-               rc = mdd_is_parent(env, mdd, src_pobj, pattr, mdo2fid(tgt_pobj),
-                                  NULL);
+               rc = mdd_is_parent(env, mdd, src_pobj, pattr,
+                                  mdo2fid(tgt_pobj));
                 if (rc == -EREMOTE)
                         rc = 0;
 
@@ -2619,6 +2806,17 @@ static int mdd_rename_sanity_check(const struct lu_env *env,
         * before mdd_rename and enable MDS_PERM_BYPASS. */
        LASSERT(sobj);
 
+       /*
+        * If we are using project inheritance, we only allow renames
+        * into our tree when the project IDs are the same; otherwise
+        * tree quota mechanism would be circumvented.
+        */
+       if (((tpattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
+           tpattr->la_projid != cattr->la_projid) ||
+           ((pattr->la_flags & LUSTRE_PROJINHERIT_FL) &&
+           (pattr->la_projid != tpattr->la_projid)))
+               RETURN(-EXDEV);
+
        rc = mdd_may_delete(env, src_pobj, pattr, sobj, cattr, NULL, 1, 0);
        if (rc)
                RETURN(rc);
@@ -2709,8 +2907,7 @@ static int mdd_declare_rename(const struct lu_env *env,
        if (rc)
                return rc;
 
-       rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
-               S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
+       rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
        if (rc)
                return rc;
 
@@ -2755,7 +2952,8 @@ static int mdd_declare_rename(const struct lu_env *env,
                        return rc;
         }
 
-       rc = mdd_declare_changelog_store(env, mdd, tname, sname, handle);
+       rc = mdd_declare_changelog_store(env, mdd, CL_RENAME, tname, sname,
+                                        handle);
         if (rc)
                 return rc;
 
@@ -2792,6 +2990,9 @@ static int mdd_rename(const struct lu_env *env,
        int rc, rc2;
        ENTRY;
 
+       /* let unlink to complete and commit */
+       CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLY_DATA_RACE, 2 + cfs_fail_val);
+
        if (tobj)
                mdd_tobj = md2mdd_obj(tobj);
 
@@ -2841,8 +3042,12 @@ static int mdd_rename(const struct lu_env *env,
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
        memset(ldata, 0, sizeof(*ldata));
-       mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj), lsname,
-                          mdd_object_fid(mdd_tpobj), ltname, 1, 0, ldata);
+       rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
+                               lsname, mdd_object_fid(mdd_tpobj), ltname,
+                               1, 0, ldata);
+       if (rc)
+               GOTO(stop, rc);
+
        rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
                                mdd_tobj, lsname, ltname, ma, ldata, handle);
        if (rc)
@@ -2894,7 +3099,7 @@ static int mdd_rename(const struct lu_env *env,
                GOTO(fixup_tpobj, rc);
 
        /* Update the linkEA for the source object */
-       mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
+       mdd_write_lock(env, mdd_sobj, DT_SRC_CHILD);
        rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
                              mdo2fid(mdd_tpobj), ltname, handle, ldata,
                              0, 0);
@@ -2913,7 +3118,7 @@ static int mdd_rename(const struct lu_env *env,
          * it must be local one.
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
-                mdd_write_lock(env, mdd_tobj, MOR_TGT_CHILD);
+               mdd_write_lock(env, mdd_tobj, DT_TGT_CHILD);
                tobj_locked = 1;
                 if (mdd_is_dead_obj(mdd_tobj)) {
                         /* shld not be dead, something is wrong */
@@ -3064,1250 +3269,1727 @@ out_pending:
 }
 
 /**
- * During migration once the parent FID has been changed,
- * we need update the parent FID in linkea.
+ * Check whether we should migrate the file/dir
+ * return val
+ *     < 0  permission check failed or other error.
+ *     = 0  the file can be migrated.
  **/
-static int mdd_linkea_update_child_internal(const struct lu_env *env,
-                                           struct mdd_object *parent,
-                                           struct mdd_object *newparent,
-                                           struct mdd_object *child,
-                                           const char *name, int namelen,
-                                           struct thandle *handle,
-                                           bool declare)
+static int mdd_migrate_sanity_check(const struct lu_env *env,
+                                   struct mdd_device *mdd,
+                                   struct mdd_object *spobj,
+                                   struct mdd_object *tpobj,
+                                   struct mdd_object *sobj,
+                                   struct mdd_object *tobj,
+                                   const struct lu_attr *spattr,
+                                   const struct lu_attr *tpattr,
+                                   const struct lu_attr *attr)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct linkea_data      ldata = { NULL };
-       struct lu_buf           *buf = &info->mti_link_buf;
-       int                     count;
-       int                     rc = 0;
+       int rc;
 
        ENTRY;
 
-       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
-       if (buf->lb_buf == NULL)
-               RETURN(-ENOMEM);
-
-       ldata.ld_buf = buf;
-       rc = mdd_links_read(env, child, &ldata);
-       if (rc != 0) {
-               if (rc == -ENOENT || rc == -ENODATA)
-                       rc = 0;
-               RETURN(rc);
+       if (!mdd_object_remote(sobj)) {
+               mdd_read_lock(env, sobj, DT_SRC_CHILD);
+               if (sobj->mod_count > 0) {
+                       CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
+                              mdd2obd_dev(mdd)->obd_name, PFID(mdo2fid(sobj)),
+                              sobj->mod_count);
+                       mdd_read_unlock(env, sobj);
+                       RETURN(-EBUSY);
+               }
+               mdd_read_unlock(env, sobj);
        }
 
-       LASSERT(ldata.ld_leh != NULL);
-       ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
-       for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
-               struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
-               struct lu_name lname;
-               struct lu_fid  fid;
-
-               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
-                                   &lname, &fid);
-
-               if (strncmp(lname.ln_name, name, namelen) != 0 ||
-                   !lu_fid_eq(&fid, mdd_object_fid(parent))) {
-                       ldata.ld_lee = (struct link_ea_entry *)
-                                      ((char *)ldata.ld_lee +
-                                       ldata.ld_reclen);
-                       continue;
-               }
+       if (mdd_object_exists(tobj))
+               RETURN(-EEXIST);
 
-               CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
-                      mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
-                      lname.ln_namelen, lname.ln_name,
-                      PFID(mdd_object_fid(newparent)));
-               /* update to the new parent fid */
-               linkea_entry_pack(ldata.ld_lee, &lname,
-                                 mdd_object_fid(newparent));
-               if (declare)
-                       rc = mdd_declare_links_add(env, child, handle, &ldata,
-                                                  MLAO_IGNORE);
-               else
-                       rc = mdd_links_write(env, child, &ldata, handle);
-               break;
-       }
+       rc = mdd_rename_sanity_check(env, spobj, spattr, tpobj, tpattr, sobj,
+                                    attr, NULL, NULL);
        RETURN(rc);
 }
 
-static int mdd_linkea_declare_update_child(const struct lu_env *env,
-                                          struct mdd_object *parent,
-                                          struct mdd_object *newparent,
-                                          struct mdd_object *child,
-                                          const char *name, int namelen,
-                                          struct thandle *handle)
+typedef int (*mdd_dir_stripe_cb)(const struct lu_env *env,
+                                struct mdd_object *obj,
+                                struct mdd_object *stripe,
+                                const struct lu_buf *lmv_buf,
+                                const struct lu_buf *lmu_buf,
+                                int index,
+                                struct thandle *handle);
+
+static int mdd_dir_declare_delete_stripe(const struct lu_env *env,
+                                        struct mdd_object *obj,
+                                        struct mdd_object *stripe,
+                                        const struct lu_buf *lmv_buf,
+                                        const struct lu_buf *lmu_buf,
+                                        int index,
+                                        struct thandle *handle)
 {
-       return mdd_linkea_update_child_internal(env, parent, newparent,
-                                               child, name,
-                                               namelen, handle, true);
-}
+       struct mdd_thread_info *info = mdd_env_info(env);
+       char *stripe_name = info->mti_name;
+       struct lmv_user_md *lmu = lmu_buf->lb_buf;
+       int rc;
 
-static int mdd_linkea_update_child(const struct lu_env *env,
-                                  struct mdd_object *parent,
-                                  struct mdd_object *newparent,
-                                  struct mdd_object *child,
-                                  const char *name, int namelen,
-                                  struct thandle *handle)
-{
-       return mdd_linkea_update_child_internal(env, parent, newparent,
-                                               child, name,
-                                               namelen, handle, false);
+       if (index < le32_to_cpu(lmu->lum_stripe_count))
+               return 0;
+
+       rc = mdo_declare_index_delete(env, stripe, dotdot, handle);
+       if (rc)
+               return rc;
+
+       snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
+                PFID(mdd_object_fid(stripe)), index);
+
+       rc = mdo_declare_index_delete(env, obj, stripe_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_ref_del(env, obj, handle);
+
+       return rc;
 }
 
-static int mdd_update_linkea_internal(const struct lu_env *env,
-                                     struct mdd_object *mdd_pobj,
-                                     struct mdd_object *mdd_sobj,
-                                     struct mdd_object *mdd_tobj,
-                                     const struct lu_name *child_name,
-                                     struct linkea_data *ldata,
-                                     struct thandle *handle,
-                                     int declare)
+/* delete stripe from its master object namespace */
+static int mdd_dir_delete_stripe(const struct lu_env *env,
+                                struct mdd_object *obj,
+                                struct mdd_object *stripe,
+                                const struct lu_buf *lmv_buf,
+                                const struct lu_buf *lmu_buf,
+                                int index,
+                                struct thandle *handle)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       int                     count;
-       int                     rc = 0;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       char *stripe_name = info->mti_name;
+       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+       struct lmv_user_md *lmu = lmu_buf->lb_buf;
+       __u32 del_offset = le32_to_cpu(lmu->lum_stripe_count);
+       int rc;
+
        ENTRY;
 
-       LASSERT(ldata->ld_buf != NULL);
+       /* local dir will delete via LOD */
+       LASSERT(mdd_object_remote(obj));
+       LASSERT(del_offset < le32_to_cpu(lmv->lmv_stripe_count));
 
-again:
-       /* If it is mulitple links file, we need update the name entry for
-        * all parent */
-       LASSERT(ldata->ld_leh != NULL);
-       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
-       for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
-               struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-               struct mdd_object       *pobj;
-               struct lu_name          lname;
-               struct lu_fid           fid;
-
-               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
-                                   &lname, &fid);
-               pobj = mdd_object_find(env, mdd, &fid);
-               if (IS_ERR(pobj)) {
-                       CWARN("%s: cannot find obj "DFID": rc = %ld\n",
-                             mdd2obd_dev(mdd)->obd_name, PFID(&fid),
-                             PTR_ERR(pobj));
-                       linkea_del_buf(ldata, &lname);
-                       goto again;
-               }
+       if (index < del_offset)
+               RETURN(0);
 
-               if (!mdd_object_exists(pobj)) {
-                       CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
-                             mdd2obd_dev(mdd)->obd_name, PFID(&fid));
-                       linkea_del_buf(ldata, &lname);
-                       mdd_object_put(env, pobj);
-                       goto again;
-               }
+       mdd_write_lock(env, stripe, DT_SRC_CHILD);
+       rc = __mdd_index_delete_only(env, stripe, dotdot, handle);
+       if (rc)
+               GOTO(out, rc);
 
-               if (pobj == mdd_pobj &&
-                   lname.ln_namelen == child_name->ln_namelen &&
-                   strncmp(lname.ln_name, child_name->ln_name,
-                           lname.ln_namelen) == 0) {
-                       CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
-                             mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
-                             PFID(&fid));
-                       linkea_del_buf(ldata, &lname);
-                       mdd_object_put(env, pobj);
-                       goto again;
-               }
+       snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
+                PFID(mdd_object_fid(stripe)), index);
 
-               CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
-                      mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
-                      PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
+       rc = __mdd_index_delete_only(env, obj, stripe_name, handle);
+       if (rc)
+               GOTO(out, rc);
 
-               if (declare) {
-                       /* Remove source name from source directory */
-                       /* Insert new fid with target name into target dir */
-                       rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
-                                                     handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
+       rc = mdo_ref_del(env, obj, handle);
+       GOTO(out, rc);
+out:
+       mdd_write_unlock(env, stripe);
 
-                       rc = mdo_declare_index_insert(env, pobj,
-                                       mdd_object_fid(mdd_tobj),
-                                       mdd_object_type(mdd_tobj),
-                                       lname.ln_name, handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
+       return rc;
+}
 
-                       rc = mdo_declare_ref_add(env, mdd_tobj, handle);
-                       if (rc)
-                               GOTO(next_put, rc);
+static int mdd_dir_declare_destroy_stripe(const struct lu_env *env,
+                                         struct mdd_object *obj,
+                                         struct mdd_object *stripe,
+                                         const struct lu_buf *lmv_buf,
+                                         const struct lu_buf *lmu_buf,
+                                         int index,
+                                         struct thandle *handle)
+{
+       struct lmv_user_md *lmu = lmu_buf->lb_buf;
+       __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
+       int rc;
 
-                       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-                       if (rc)
-                               GOTO(next_put, rc);
-               } else {
-                       char *tmp_name = info->mti_key;
-
-                       if (lname.ln_namelen >= sizeof(info->mti_key)) {
-                               /* lnamelen is too big(> NAME_MAX + 16),
-                                * something wrong about this linkea, let's
-                                * skip it */
-                               linkea_del_buf(ldata, &lname);
-                               mdd_object_put(env, pobj);
-                               goto again;
-                       }
+       if (index < shrink_offset) {
+               if (shrink_offset < 2)
+                       return 0;
+               return mdo_declare_xattr_set(env, stripe, lmv_buf,
+                                            XATTR_NAME_LMV".set", 0, handle);
+       }
 
-                       /* Note: lname might be without \0 at the end, see
-                        * linkea_entry_unpack(), let's add extra \0 by
-                        * snprintf */
-                       snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
-                                lname.ln_namelen, lname.ln_name);
-                       lname.ln_name = tmp_name;
-
-                       /* Let's check if this linkEA still valid, before
-                        * it might be packed into the RPC buffer. */
-                       rc = mdd_lookup(env, &pobj->mod_obj, &lname,
-                                       &info->mti_fid, NULL);
-                       if (rc < 0 ||
-                           !lu_fid_eq(&info->mti_fid,
-                                       mdd_object_fid(mdd_sobj))) {
-                               /* skip invalid linkea entry */
-                               linkea_del_buf(ldata, &lname);
-                               mdd_object_put(env, pobj);
-                               goto again;
-                       }
+       rc = mdo_declare_ref_del(env, stripe, handle);
+       if (rc)
+               return rc;
 
-                       rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
+       rc = mdo_declare_destroy(env, stripe, handle);
 
-                       rc = __mdd_index_insert(env, pobj,
-                                       mdd_object_fid(mdd_tobj),
-                                       mdd_object_type(mdd_tobj),
-                                       tmp_name, handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
+       return rc;
+}
 
-                       mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
-                       rc = mdo_ref_add(env, mdd_tobj, handle);
-                       mdd_write_unlock(env, mdd_tobj);
-                       if (rc)
-                               GOTO(next_put, rc);
+static int mdd_dir_destroy_stripe(const struct lu_env *env,
+                                 struct mdd_object *obj,
+                                 struct mdd_object *stripe,
+                                 const struct lu_buf *lmv_buf,
+                                 const struct lu_buf *lmu_buf,
+                                 int index,
+                                 struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+       struct lmv_user_md *lmu = lmu_buf->lb_buf;
+       __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
+       int rc;
 
-                       mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
-                       mdo_ref_del(env, mdd_sobj, handle);
-                       mdd_write_unlock(env, mdd_sobj);
-               }
-next_put:
-               mdd_object_put(env, pobj);
-               if (rc != 0)
-                       break;
+       ENTRY;
 
-               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
-                                                        ldata->ld_reclen);
+       /* update remaining stripes' LMV */
+       if (index < shrink_offset) {
+               struct lmv_mds_md_v1 *slave_lmv;
+               struct lu_buf slave_buf = {
+                               .lb_buf = &info->mti_lmv.lmv_md_v1,
+                               .lb_len = sizeof(*slave_lmv)
+               };
+               __u32 version = le32_to_cpu(lmv->lmv_layout_version);
+
+               /* if dir will be shrunk to 1-stripe, don't update */
+               if (shrink_offset < 2)
+                       RETURN(0);
+
+               slave_lmv = slave_buf.lb_buf;
+               memset(slave_lmv, 0, sizeof(*slave_lmv));
+               slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
+               slave_lmv->lmv_stripe_count = lmu->lum_stripe_count;
+               slave_lmv->lmv_master_mdt_index = cpu_to_le32(index);
+               slave_lmv->lmv_hash_type = lmv->lmv_hash_type &
+                                          cpu_to_le32(LMV_HASH_TYPE_MASK);
+               slave_lmv->lmv_layout_version = cpu_to_le32(++version);
+
+               rc = mdo_xattr_set(env, stripe, &slave_buf,
+                                  XATTR_NAME_LMV".set", 0, handle);
+               RETURN(rc);
        }
 
+       mdd_write_lock(env, stripe, DT_SRC_CHILD);
+       rc = mdo_ref_del(env, stripe, handle);
+       if (!rc)
+               rc = mdo_destroy(env, stripe, handle);
+       mdd_write_unlock(env, stripe);
+
        RETURN(rc);
 }
 
-static int mdd_migrate_xattrs(const struct lu_env *env,
-                             struct mdd_object *mdd_sobj,
-                             struct mdd_object *mdd_tobj)
+static int mdd_shrink_stripe_is_empty(const struct lu_env *env,
+                                      struct mdd_object *obj,
+                                      struct mdd_object *stripe,
+                                      const struct lu_buf *lmv_buf,
+                                      const struct lu_buf *lmu_buf,
+                                      int index,
+                                      struct thandle *handle)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       char                    *xname;
-       struct thandle          *handle;
-       struct lu_buf           xbuf;
-       int                     xlen;
-       int                     rem;
-       int                     xsize;
-       int                     list_xsize;
-       struct lu_buf           list_xbuf;
-       int                     rc;
+       struct lmv_user_md *lmu = lmu_buf->lb_buf;
+       __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
+
+       /* the default value is 0, but it means 1 */
+       if (!shrink_offset)
+               shrink_offset = 1;
+
+       if (index < shrink_offset)
+               return 0;
+
+       return mdd_dir_is_empty(env, stripe);
+}
+
+/*
+ * iterate stripes of striped directory on remote MDT, local striped directory
+ * is accessed via LOD.
+ */
+static int mdd_dir_iterate_stripes(const struct lu_env *env,
+                                  struct mdd_object *obj,
+                                  const struct lu_buf *lmv_buf,
+                                  const struct lu_buf *lmu_buf,
+                                  struct thandle *handle,
+                                  mdd_dir_stripe_cb cb)
+{
+       struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+       struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+       struct mdd_object *stripe;
+       int i;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(lmv);
+
+       for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
+               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
+               if (!fid_is_sane(fid))
+                       continue;
+
+               stripe = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(stripe))
+                       RETURN(PTR_ERR(stripe));
+
+               rc = cb(env, obj, stripe, lmv_buf, lmu_buf, i, handle);
+               mdd_object_put(env, stripe);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       RETURN(0);
+}
+
+typedef int (*mdd_xattr_cb)(const struct lu_env *env,
+                           struct mdd_object *obj,
+                           const struct lu_buf *buf,
+                           const char *name,
+                           int fl, struct thandle *handle);
+
+/* iterate xattrs, but ignore LMA, LMV, and LINKEA if 'skip_linkea' is set. */
+static int mdd_iterate_xattrs(const struct lu_env *env,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             bool skip_linkea,
+                             struct thandle *handle,
+                             mdd_xattr_cb cb)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       char *xname;
+       struct lu_buf list_xbuf;
+       struct lu_buf cbxbuf;
+       struct lu_buf xbuf = { NULL };
+       int list_xsize;
+       int xlen;
+       int rem;
+       int xsize;
+       int rc;
+
+       ENTRY;
 
        /* retrieve xattr list from the old object */
-       list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
+       list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
        if (list_xsize == -ENODATA)
-               return 0;
+               RETURN(0);
 
        if (list_xsize < 0)
-               return list_xsize;
+               RETURN(list_xsize);
 
        lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
        if (info->mti_big_buf.lb_buf == NULL)
-               return -ENOMEM;
+               RETURN(-ENOMEM);
 
        list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
        list_xbuf.lb_len = list_xsize;
-       rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
+       rc = mdo_xattr_list(env, sobj, &list_xbuf);
        if (rc < 0)
-               return rc;
+               RETURN(rc);
+
+       rem = rc;
        rc = 0;
-       rem = list_xsize;
        xname = list_xbuf.lb_buf;
        while (rem > 0) {
                xlen = strnlen(xname, rem - 1) + 1;
-               if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
-                   strcmp(XATTR_NAME_LMA, xname) == 0 ||
+               if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
                    strcmp(XATTR_NAME_LMV, xname) == 0)
                        goto next;
 
-               /* For directory, if there are default layout, migrate here */
-               if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
-                   !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
+               if (skip_linkea &&
+                   strcmp(XATTR_NAME_LINK, xname) == 0)
                        goto next;
 
-               xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
+               xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
                if (xsize == -ENODATA)
                        goto next;
                if (xsize < 0)
-                       GOTO(out, rc);
+                       GOTO(out, rc = xsize);
 
-               lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
-               if (info->mti_link_buf.lb_buf == NULL)
+               lu_buf_check_and_alloc(&xbuf, xsize);
+               if (xbuf.lb_buf == NULL)
                        GOTO(out, rc = -ENOMEM);
 
-               xbuf.lb_len = xsize;
-               xbuf.lb_buf = info->mti_link_buf.lb_buf;
-               rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
+               rc = mdo_xattr_get(env, sobj, &xbuf, xname);
                if (rc == -ENODATA)
                        goto next;
                if (rc < 0)
                        GOTO(out, rc);
 
-               handle = mdd_trans_create(env, mdd);
-               if (IS_ERR(handle))
-                       GOTO(out, rc = PTR_ERR(handle));
-
-               rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
-                                          handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-               /* Note: this transaction is part of migration, and it is not
-                * the last step of migration, so we set th_local = 1 to avoid
-                * update last rcvd for this transaction */
-               handle->th_local = 1;
-               rc = mdd_trans_start(env, mdd, handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-
-               rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
-               if (rc == -EEXIST)
-                       GOTO(stop_trans, rc = 0);
+               cbxbuf = xbuf;
+               cbxbuf.lb_len = xsize;
+repeat:
+               rc = cb(env, tobj, &cbxbuf, xname, 0, handle);
+               if (unlikely(rc == -ENOSPC &&
+                            strcmp(xname, XATTR_NAME_LINK) == 0)) {
+                       rc = linkea_overflow_shrink(
+                                       (struct linkea_data *)(cbxbuf.lb_buf));
+                       if (likely(rc > 0)) {
+                               cbxbuf.lb_len = rc;
+                               goto repeat;
+                       }
+               }
 
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-stop_trans:
-               rc = mdd_trans_stop(env, mdd, rc, handle);
-               if (rc != 0)
+               if (rc)
                        GOTO(out, rc);
 next:
+               xname += xlen;
                rem -= xlen;
-               memmove(xname, xname + xlen, rem);
        }
+
 out:
-       return rc;
+       lu_buf_free(&xbuf);
+       RETURN(rc);
 }
 
-static int mdd_declare_migrate_create(const struct lu_env *env,
-                                     struct mdd_object *mdd_pobj,
-                                     struct mdd_object *mdd_sobj,
-                                     struct mdd_object *mdd_tobj,
-                                     struct md_op_spec *spec,
-                                     struct lu_attr *la,
-                                     union lmv_mds_md *mgr_ea,
-                                     struct linkea_data *ldata,
-                                     struct thandle *handle)
+typedef int (*mdd_linkea_cb)(const struct lu_env *env,
+                            struct mdd_object *sobj,
+                            struct mdd_object *tobj,
+                            const struct lu_name *sname,
+                            const struct lu_fid *sfid,
+                            const struct lu_name *lname,
+                            const struct lu_fid *fid,
+                            void *opaque,
+                            struct thandle *handle);
+
+static int mdd_declare_update_link(const struct lu_env *env,
+                                  struct mdd_object *sobj,
+                                  struct mdd_object *tobj,
+                                  const struct lu_name *tname,
+                                  const struct lu_fid *tpfid,
+                                  const struct lu_name *lname,
+                                  const struct lu_fid *fid,
+                                  void *unused,
+                                  struct thandle *handle)
 {
-       struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
-       const struct lu_buf     *buf;
-       int                     rc;
-       int                     mgr_easize;
-
-       rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
-                                               handle, spec, NULL);
-       if (rc != 0)
-               return rc;
-
-       rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
-                                          handle);
-       if (rc != 0)
-               return rc;
+       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+       struct mdd_object *pobj;
+       int rc;
 
-       if (S_ISLNK(la->la_mode)) {
-               const char *target_name = spec->u.sp_symname;
-               int sym_len = strlen(target_name);
-               const struct lu_buf *buf;
+       /* ignore tobj */
+       if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+           !strcmp(tname->ln_name, lname->ln_name))
+               return 0;
 
-               buf = mdd_buf_get_const(env, target_name, sym_len);
-               rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
-                                            buf, 0, handle);
-               if (rc != 0)
-                       return rc;
-       } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
-               rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
-                                          MLAO_IGNORE);
-               if (rc != 0)
-                       return rc;
-       }
+       pobj = mdd_object_find(env, mdd, fid);
+       if (IS_ERR(pobj))
+               return PTR_ERR(pobj);
 
-       if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
-               buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
-                                       spec->u.sp_ea.eadatalen);
-               rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
-                                          0, handle);
-               if (rc)
-                       return rc;
-       }
 
-       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
-       buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
-       rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
-                                  0, handle);
+       rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+       if (!rc)
+               rc = mdo_declare_index_insert(env, pobj, mdo2fid(tobj),
+                                             mdd_object_type(sobj),
+                                             lname->ln_name, handle);
+       mdd_object_put(env, pobj);
        if (rc)
                return rc;
 
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
-       rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
+       rc = mdo_declare_ref_add(env, tobj, handle);
+       if (rc)
+               return rc;
 
+       rc = mdo_declare_ref_del(env, sobj, handle);
        return rc;
 }
 
-static int mdd_migrate_create(const struct lu_env *env,
-                             struct mdd_object *mdd_pobj,
-                             struct mdd_object *mdd_sobj,
-                             struct mdd_object *mdd_tobj,
-                             const struct lu_name *lname,
-                             struct lu_attr *la)
+static int mdd_update_link(const struct lu_env *env,
+                          struct mdd_object *sobj,
+                          struct mdd_object *tobj,
+                          const struct lu_name *tname,
+                          const struct lu_fid *tpfid,
+                          const struct lu_name *lname,
+                          const struct lu_fid *fid,
+                          void *unused,
+                          struct thandle *handle)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct md_op_spec       *spec = &info->mti_spec;
-       struct lu_buf           lmm_buf = { NULL };
-       struct lu_buf           link_buf = { NULL };
-       const struct lu_buf     *buf;
-       struct thandle          *handle;
-       struct lmv_mds_md_v1    *mgr_ea;
-       struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
-       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
-       int                     mgr_easize;
-       struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
-       int                     rc;
+       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+       struct mdd_object *pobj;
+       int rc;
+
        ENTRY;
 
-       /* prepare spec for create */
-       memset(spec, 0, sizeof(*spec));
-       spec->sp_cr_lookup = 0;
-       spec->sp_feat = &dt_directory_features;
-       if (S_ISLNK(la->la_mode)) {
-               buf = lu_buf_check_and_alloc(
-                               &mdd_env_info(env)->mti_big_buf,
-                               la->la_size + 1);
-               link_buf = *buf;
-               link_buf.lb_len = la->la_size + 1;
-               memset(link_buf.lb_buf, 0, link_buf.lb_len);
-               rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
-               if (rc <= 0) {
-                       rc = rc != 0 ? rc : -EFAULT;
-                       CERROR("%s: "DFID" readlink failed: rc = %d\n",
-                              mdd2obd_dev(mdd)->obd_name,
-                              PFID(mdd_object_fid(mdd_sobj)), rc);
-                       RETURN(rc);
-               }
-               spec->u.sp_symname = link_buf.lb_buf;
-       } else if (S_ISREG(la->la_mode)) {
-               /* retrieve lov of the old object */
-               rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
-               if (rc != 0 && rc != -ENODATA)
-                       RETURN(rc);
-               if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
-                       spec->u.sp_ea.eadata = lmm_buf.lb_buf;
-                       spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
-                       spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
-               }
-       } else if (S_ISDIR(la->la_mode)) {
-               rc = mdd_links_read(env, mdd_sobj, ldata);
-               if (rc == -ENODATA) {
-                       /* ignore the non-linkEA error */
-                       ldata = NULL;
-                       rc = 0;
-               }
-               if (rc < 0)
-                       RETURN(rc);
+       LASSERT(lu_name_is_valid(lname));
+
+       /* ignore tobj */
+       if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+           !strncmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
+               RETURN(0);
+
+       CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
+              PFID(fid), PNAME(lname), PFID(mdo2fid(tobj)));
+
+       pobj = mdd_object_find(env, mdd, fid);
+       if (IS_ERR(pobj)) {
+               CWARN("%s: cannot find obj "DFID": %ld\n",
+                     mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
+               RETURN(PTR_ERR(pobj));
        }
 
-       mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
-       memset(mgr_ea, 0, sizeof(*mgr_ea));
-       mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
-       mgr_ea->lmv_stripe_count = cpu_to_le32(2);
-       mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
-       mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
-       fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
-       fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
+       if (!mdd_object_exists(pobj)) {
+               CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
+               mdd_object_put(env, pobj);
+               RETURN(-ENOENT);
+       }
 
-       mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
+       mdd_write_lock(env, pobj, DT_TGT_PARENT);
+       rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
+       if (!rc)
+               rc = __mdd_index_insert_only(env, pobj, mdo2fid(tobj),
+                                            mdd_object_type(sobj),
+                                            lname->ln_name, handle);
+       mdd_write_unlock(env, pobj);
+       mdd_object_put(env, pobj);
+       if (rc)
+               RETURN(rc);
 
-       handle = mdd_trans_create(env, mdd);
-       if (IS_ERR(handle))
-               GOTO(out_free, rc = PTR_ERR(handle));
+       mdd_write_lock(env, tobj, DT_TGT_CHILD);
+       rc = mdo_ref_add(env, tobj, handle);
+       mdd_write_unlock(env, tobj);
+       if (rc)
+               RETURN(rc);
 
-       /* Note: this transaction is part of migration, and it is not
-        * the last step of migration, so we set th_local = 1 to avoid
-        * update last rcvd for this transaction */
-       handle->th_local = 1;
-       rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                       spec, la,
-                                       (union lmv_mds_md *)info->mti_xattr_buf,
-                                       ldata, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       mdd_write_lock(env, sobj, DT_SRC_CHILD);
+       rc = mdo_ref_del(env, sobj, handle);
+       mdd_write_unlock(env, sobj);
 
-       rc = mdd_trans_start(env, mdd, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       RETURN(rc);
+}
 
-       /* don't set nlink from the original object */
-       la->la_valid &= ~LA_NLINK;
+static inline int mdd_fld_lookup(const struct lu_env *env,
+                                struct mdd_device *mdd,
+                                const struct lu_fid *fid,
+                                __u32 *mdt_index)
+{
+       struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
+       struct seq_server_site *ss;
+       int rc;
 
-       /* create the target object */
-       rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
-                              hint, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
 
-       if (S_ISDIR(la->la_mode) && ldata != NULL) {
-               rc = mdd_links_write(env, mdd_tobj, ldata, handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-       }
+       range->lsr_flags = LU_SEQ_RANGE_MDT;
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
+       if (rc)
+               return rc;
 
-       /* Set MIGRATE EA on the source inode, so once the migration needs
-        * to be re-done during failover, the re-do process can locate the
-        * target object which is already being created. */
-       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
-       buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
-       rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       *mdt_index = range->lsr_index;
 
-       /* Set immutable flag, so any modification is disabled until
-        * the migration is done. Once the migration is interrupted,
-        * if the resume process find the migrating object has both
-        * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
-        * flag and approve the migration */
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
-       rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
-stop_trans:
-       if (handle != NULL)
-               rc = mdd_trans_stop(env, mdd, rc, handle);
-out_free:
-       if (lmm_buf.lb_buf != NULL)
-               OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
-       RETURN(rc);
+       return 0;
 }
 
-static int mdd_migrate_entries(const struct lu_env *env,
-                              struct mdd_object *mdd_sobj,
-                              struct mdd_object *mdd_tobj)
+static int mdd_is_link_on_source_mdt(const struct lu_env *env,
+                                    struct mdd_object *sobj,
+                                    struct mdd_object *tobj,
+                                    const struct lu_name *tname,
+                                    const struct lu_fid *tpfid,
+                                    const struct lu_name *lname,
+                                    const struct lu_fid *fid,
+                                    void *opaque,
+                                    struct thandle *handle)
 {
-       struct dt_object        *next = mdd_object_child(mdd_sobj);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
-       struct thandle          *handle;
-       struct dt_it            *it;
-       const struct dt_it_ops  *iops;
-       int                      result;
-       struct lu_dirent        *ent;
-       int                      rc;
+       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+       __u32 source_mdt_index = *(__u32 *)opaque;
+       __u32 link_mdt_index;
+       int rc;
+
        ENTRY;
 
-       OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
-       if (ent == NULL)
-               RETURN(-ENOMEM);
+       /* ignore tobj */
+       if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+           !strcmp(tname->ln_name, lname->ln_name))
+               return 0;
+
+       rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
+       if (rc)
+               RETURN(rc);
+
+       RETURN(link_mdt_index == source_mdt_index);
+}
+
+static int mdd_iterate_linkea(const struct lu_env *env,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *tname,
+                             const struct lu_fid *tpfid,
+                             struct linkea_data *ldata,
+                             void *opaque,
+                             struct thandle *handle,
+                             mdd_linkea_cb cb)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       char *filename = info->mti_name;
+       struct lu_name lname;
+       struct lu_fid fid;
+       int rc = 0;
+
+       if (!ldata->ld_buf)
+               return 0;
+
+       for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
+            linkea_next_entry(ldata)) {
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
+                                   &fid);
+
+               /* Note: lname might miss \0 at the end */
+               snprintf(filename, sizeof(info->mti_name), "%.*s",
+                        lname.ln_namelen, lname.ln_name);
+               lname.ln_name = filename;
+
+               CDEBUG(D_INFO, DFID"/"DNAME"\n", PFID(&fid), PNAME(&lname));
+
+               rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
+                       handle);
+       }
+
+       return rc;
+}
+
+/**
+ * Prepare linkea, and check whether file needs migrate: if source still has
+ * link on source MDT, no need to migrate, just update namespace on source and
+ * target parents.
+ *
+ * \retval     0 do migrate
+ * \retval     1 don't migrate
+ * \retval     -errno on failure
+ */
+static int migrate_linkea_prepare(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct mdd_object *spobj,
+                                 struct mdd_object *tpobj,
+                                 struct mdd_object *sobj,
+                                 const struct lu_name *lname,
+                                 const struct lu_attr *attr,
+                                 struct linkea_data *ldata)
+{
+       __u32 source_mdt_index;
+       int rc;
+
+       ENTRY;
+
+       memset(ldata, 0, sizeof(*ldata));
+       rc = mdd_linkea_prepare(env, sobj, mdo2fid(spobj), lname,
+                               mdo2fid(tpobj), lname, 1, 0, ldata);
+       if (rc)
+               RETURN(rc);
 
-       if (!dt_try_as_dir(env, next))
-               GOTO(out_ent, rc = -ENOTDIR);
        /*
-        * iterate directories
+        * Then it will check if the file should be migrated. If the file has
+        * mulitple links, we only need migrate the file if all of its entries
+        * has been migrated to the remote MDT.
         */
-       iops = &next->do_index_ops->dio_it;
-       it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
-       if (IS_ERR(it))
-               GOTO(out_ent, rc = PTR_ERR(it));
+       if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
+               RETURN(0);
+
+       /* If there are still links locally, don't migrate this file */
+       LASSERT(ldata->ld_leh != NULL);
 
-       rc = iops->load(env, it, 0);
-       if (rc == 0)
-               rc = iops->next(env, it);
-       else if (rc > 0)
-               rc = 0;
        /*
-        * At this point and across for-loop:
-        *
-        *  rc == 0 -> ok, proceed.
-        *  rc >  0 -> end of directory.
-        *  rc <  0 -> error.
+        * If linkEA is overflow, it means there are some unknown name entries
+        * under unknown parents, which will prevent the migration.
         */
-       do {
-               struct mdd_object       *child;
-               char                    *name = mdd_env_info(env)->mti_key;
-               int                     len;
-               int                     recsize;
-               int                     is_dir;
-               bool                    target_exist = false;
-
-               len = iops->key_size(env, it);
-               if (len == 0)
-                       goto next;
+       if (unlikely(ldata->ld_leh->leh_overflow_time))
+               RETURN(-EOVERFLOW);
 
-               result = iops->rec(env, it, (struct dt_rec *)ent,
-                                  LUDA_FID | LUDA_TYPE);
-               if (result == -ESTALE)
-                       goto next;
-               if (result != 0) {
-                       rc = result;
-                       goto out;
-               }
+       rc = mdd_fld_lookup(env, mdd, mdo2fid(sobj), &source_mdt_index);
+       if (rc)
+               RETURN(rc);
 
-               fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
-               recsize = le16_to_cpu(ent->lde_reclen);
+       rc = mdd_iterate_linkea(env, sobj, NULL, lname, mdo2fid(tpobj), ldata,
+                               &source_mdt_index, NULL,
+                               mdd_is_link_on_source_mdt);
+       RETURN(rc);
+}
 
-               /* Insert new fid with target name into target dir */
-               if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
-                   (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
-                    ent->lde_name[1] == '.'))
-                       goto next;
+static int mdd_dir_declare_layout_delete(const struct lu_env *env,
+                                        struct mdd_object *obj,
+                                        const struct lu_buf *lmv_buf,
+                                        const struct lu_buf *lmu_buf,
+                                        struct thandle *handle)
+{
+       int rc;
 
-               child = mdd_object_find(env, mdd, &ent->lde_fid);
-               if (IS_ERR(child))
-                       GOTO(out, rc = PTR_ERR(child));
+       if (!lmv_buf->lb_buf)
+               rc = mdo_declare_index_delete(env, obj, dotdot, handle);
+       else if (mdd_object_remote(obj))
+               rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
+                                            mdd_dir_declare_delete_stripe);
+       else
+               rc = mdo_declare_xattr_set(env, obj, lmu_buf,
+                                          XATTR_NAME_LMV".del", 0, handle);
 
-               mdd_write_lock(env, child, MOR_SRC_CHILD);
-               is_dir = S_ISDIR(mdd_object_type(child));
+       return rc;
+}
 
-               snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
+static int mdd_dir_layout_delete(const struct lu_env *env,
+                                struct mdd_object *obj,
+                                const struct lu_buf *lmv_buf,
+                                const struct lu_buf *lmu_buf,
+                                struct thandle *handle)
+{
+       int rc;
 
-               /* Check whether the name has been inserted to the target */
-               if (dt_try_as_dir(env, dt_tobj)) {
-                       struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+       ENTRY;
 
-                       rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
-                                      (struct dt_key *)name);
-                       if (unlikely(rc == 0))
-                               target_exist = true;
+       mdd_write_lock(env, obj, DT_SRC_PARENT);
+       if (!lmv_buf->lb_buf)
+               /* normal dir */
+               rc = __mdd_index_delete_only(env, obj, dotdot, handle);
+       else if (mdd_object_remote(obj))
+               /* striped, but remote */
+               rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
+                                            mdd_dir_delete_stripe);
+       else
+               rc = mdo_xattr_set(env, obj, lmu_buf, XATTR_NAME_LMV".del", 0,
+                                  handle);
+       mdd_write_unlock(env, obj);
+
+       RETURN(rc);
+}
+
+static int mdd_declare_migrate_create(const struct lu_env *env,
+                                     struct mdd_object *tpobj,
+                                     struct mdd_object *sobj,
+                                     struct mdd_object *tobj,
+                                     const struct lu_name *lname,
+                                     struct lu_attr *attr,
+                                     struct lu_buf *sbuf,
+                                     struct linkea_data *ldata,
+                                     struct md_op_spec *spec,
+                                     struct dt_allocation_hint *hint,
+                                     struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
+       int rc;
+
+       if (S_ISDIR(attr->la_mode)) {
+               struct lu_buf lmu_buf = { NULL };
+
+               if (lmv) {
+                       struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
+
+                       lmu->lum_stripe_count = 0;
+                       lmu_buf.lb_buf = lmu;
+                       lmu_buf.lb_len = sizeof(*lmu);
                }
 
-               handle = mdd_trans_create(env, mdd);
-               if (IS_ERR(handle))
-                       GOTO(out_put, rc = PTR_ERR(handle));
-
-               /* Note: this transaction is part of migration, and it is not
-                * the last step of migration, so we set th_local = 1 to avoid
-                * updating last rcvd for this transaction */
-               handle->th_local = 1;
-               if (likely(!target_exist)) {
-                       rc = mdo_declare_index_insert(env, mdd_tobj,
-                                                     &ent->lde_fid,
-                                                     mdd_object_type(child),
-                                                     name, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+               rc = mdd_dir_declare_layout_delete(env, sobj, sbuf, &lmu_buf,
+                                                  handle);
+               if (rc)
+                       return rc;
 
-                       if (is_dir) {
-                               rc = mdo_declare_ref_add(env, mdd_tobj, handle);
-                               if (rc != 0)
-                                       GOTO(out_put, rc);
-                       }
+               if (lmv) {
+                       rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
+                                                  handle);
+                       if (rc)
+                               return rc;
                }
+       }
 
-               rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
-               if (rc != 0)
-                       GOTO(out_put, rc);
+       rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
+                               lname, attr, handle, spec, ldata, NULL, NULL,
+                               NULL, hint);
+       if (rc)
+               return rc;
 
-               if (is_dir) {
-                       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+       if (S_ISDIR(attr->la_mode) && mdd_dir_is_empty(env, sobj) != 0) {
+               if (!lmv) {
+                       /*
+                        * if sobj is not striped, fake a 1-stripe LMV, which
+                        * will be used to generate a compound LMV for tobj.
+                        */
+                       LASSERT(sizeof(info->mti_key) >
+                               lmv_mds_md_size(1, LMV_MAGIC_V1));
+                       lmv = (typeof(lmv))info->mti_key;
+                       memset(lmv, 0, sizeof(*lmv));
+                       lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+                       lmv->lmv_stripe_count = cpu_to_le32(1);
+                       fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdo2fid(sobj));
+                       sbuf->lb_buf = lmv;
+                       sbuf->lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
+
+                       rc = mdo_declare_xattr_set(env, tobj, sbuf,
+                                                  XATTR_NAME_LMV".add", 0,
+                                                  handle);
+                       sbuf->lb_buf = NULL;
+                       sbuf->lb_len = 0;
+               } else {
+                       rc = mdo_declare_xattr_set(env, tobj, sbuf,
+                                                  XATTR_NAME_LMV".add", 0,
+                                                  handle);
+               }
+               if (rc)
+                       return rc;
+       }
 
-                       /* Update .. for child */
-                       rc = mdo_declare_index_delete(env, child, dotdot,
-                                                     handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+       /*
+        * tobj mode will be used in lod_declare_xattr_set(), but it's not
+        * createb yet, copy from sobj.
+        */
+       tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
+       tobj->mod_obj.mo_lu.lo_header->loh_attr |=
+               sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
 
-                       rc = mdo_declare_index_insert(env, child,
-                                                     mdd_object_fid(mdd_tobj),
-                                                     S_IFDIR, dotdot, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+       rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
+                               mdo_declare_xattr_set);
+       if (rc)
+               return rc;
+
+       if (S_ISREG(attr->la_mode)) {
+               struct lu_buf fid_buf;
+
+               handle->th_complex = 1;
+
+               /* target may be remote, update PFID via sobj. */
+               fid_buf.lb_buf = (void *)mdo2fid(tobj);
+               fid_buf.lb_len = sizeof(struct lu_fid);
+               rc = mdo_declare_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID,
+                                          0, handle);
+               if (rc)
+                       return rc;
+
+               rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+               if (rc)
+                       return rc;
+       }
+
+       if (!S_ISDIR(attr->la_mode)) {
+               rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
+                                       ldata, NULL, handle,
+                                       mdd_declare_update_link);
+               if (rc)
+                       return rc;
+
+               if (lmv) {
+                       rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
+                                                  handle);
+                       if (rc)
+                               return rc;
                }
+       }
 
-               rc = mdd_linkea_declare_update_child(env, mdd_sobj,mdd_tobj,
-                                                    child, name,
-                                                    strlen(name),
-                                                    handle);
-               if (rc != 0)
-                       GOTO(out_put, rc);
+       return rc;
+}
 
-               rc = mdd_trans_start(env, mdd, handle);
-               if (rc != 0) {
-                       CERROR("%s: transaction start failed: rc = %d\n",
-                              mdd2obd_dev(mdd)->obd_name, rc);
-                       GOTO(out_put, rc);
+/**
+ * Create target, migrate xattrs and update links.
+ *
+ * Create target according to \a spec, and then migrate xattrs, if it's
+ * directory, migrate source stripes to target, else update fid to target
+ * for links.
+ *
+ * \param[in] env      execution environment
+ * \param[in] tpobj    target parent object
+ * \param[in] sobj     source object
+ * \param[in] tobj     target object
+ * \param[in] lname    file name
+ * \param[in] attr     source attributes
+ * \param[in] sbuf     source LMV buf
+ * \param[in] ldata    source linkea
+ * \param[in] spec     migrate create spec
+ * \param[in] hint     target creation hint
+ * \param[in] handle   tranasction handle
+ *
+ * \retval     0 on success
+ * \retval     -errno on failure
+ **/
+static int mdd_migrate_create(const struct lu_env *env,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *lname,
+                             struct lu_attr *attr,
+                             const struct lu_buf *sbuf,
+                             struct linkea_data *ldata,
+                             struct md_op_spec *spec,
+                             struct dt_allocation_hint *hint,
+                             struct thandle *handle)
+{
+       int rc;
+
+       ENTRY;
+
+       /*
+        * directory will migrate sobj stripes to tobj:
+        * 1. delete stripes from sobj.
+        * 2. add stripes to tobj, see lod_dir_declare_layout_add().
+        * 3. create/attach stripes for tobj, see lod_xattr_set_lmv().
+        */
+       if (S_ISDIR(attr->la_mode)) {
+               struct lu_buf lmu_buf = { NULL };
+
+               if (sbuf->lb_buf) {
+                       struct mdd_thread_info *info = mdd_env_info(env);
+                       struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
+
+                       lmu->lum_stripe_count = 0;
+                       lmu_buf.lb_buf = lmu;
+                       lmu_buf.lb_len = sizeof(*lmu);
                }
 
-               if (likely(!target_exist)) {
-                       rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
-                                               mdd_object_type(child),
-                                               name, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+               rc = mdd_dir_layout_delete(env, sobj, sbuf, &lmu_buf, handle);
+               if (rc)
+                       RETURN(rc);
+
+               /*
+                * delete LMV so that later when destroying sobj it won't delete
+                * stripes again.
+                */
+               if (sbuf->lb_buf) {
+                       mdd_write_lock(env, sobj, DT_SRC_CHILD);
+                       rc = mdo_xattr_del(env, sobj, XATTR_NAME_LMV, handle);
+                       mdd_write_unlock(env, sobj);
+                       if (rc)
+                               RETURN(rc);
                }
+       }
 
-               rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
-               if (rc != 0)
-                       GOTO(out_put, rc);
+       /* don't set nlink from sobj */
+       attr->la_valid &= ~LA_NLINK;
 
-               if (is_dir) {
-                       rc = __mdd_index_delete_only(env, child, dotdot,
-                                                    handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+       rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, NULL,
+                              hint, handle, false);
+       if (rc)
+               RETURN(rc);
 
-                       rc = __mdd_index_insert_only(env, child,
-                                        mdd_object_fid(mdd_tobj), S_IFDIR,
-                                        dotdot, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
+       mdd_write_lock(env, tobj, DT_TGT_CHILD);
+       rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle, mdo_xattr_set);
+       mdd_write_unlock(env, tobj);
+       if (rc)
+               RETURN(rc);
 
-               rc = mdd_linkea_update_child(env, mdd_sobj, mdd_tobj,
-                                            child, name,
-                                            strlen(name), handle);
+       /* for regular file, update OST objects XATTR_NAME_FID */
+       if (S_ISREG(attr->la_mode)) {
+               struct lu_buf fid_buf;
 
-out_put:
-               mdd_write_unlock(env, child);
-               mdd_object_put(env, child);
-               rc = mdd_trans_stop(env, mdd, rc, handle);
-               if (rc != 0)
-                       GOTO(out, rc);
-next:
-               result = iops->next(env, it);
-               if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
-                       GOTO(out, rc = -EINTR);
+               /* target may be remote, update PFID via sobj. */
+               fid_buf.lb_buf = (void *)mdo2fid(tobj);
+               fid_buf.lb_len = sizeof(struct lu_fid);
+               rc = mdo_xattr_set(env, sobj, &fid_buf, XATTR_NAME_FID, 0,
+                                  handle);
+               if (rc)
+                       RETURN(rc);
+
+               /* delete LOV to avoid deleting OST objs when destroying sobj */
+               mdd_write_lock(env, sobj, DT_SRC_CHILD);
+               rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+               mdd_write_unlock(env, sobj);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       if (!S_ISDIR(attr->la_mode))
+               rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
+                                       ldata, NULL, handle, mdd_update_link);
 
-               if (result == -ESTALE)
-                       goto next;
-       } while (result == 0);
-out:
-       iops->put(env, it);
-       iops->fini(env, it);
-out_ent:
-       OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
        RETURN(rc);
 }
 
-static int mdd_declare_update_linkea(const struct lu_env *env,
-                                    struct mdd_object *mdd_pobj,
-                                    struct mdd_object *mdd_sobj,
-                                    struct mdd_object *mdd_tobj,
-                                    const struct lu_name *child_name,
-                                    struct linkea_data *ldata,
-                                    struct thandle *handle)
+static int mdd_declare_migrate_update(const struct lu_env *env,
+                                     struct mdd_object *spobj,
+                                     struct mdd_object *tpobj,
+                                     struct mdd_object *sobj,
+                                     struct mdd_object *tobj,
+                                     const struct lu_name *lname,
+                                     struct lu_attr *attr,
+                                     struct lu_attr *spattr,
+                                     struct lu_attr *tpattr,
+                                     struct linkea_data *ldata,
+                                     bool do_create,
+                                     bool do_destroy,
+                                     struct md_attr *ma,
+                                     struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
+       struct lu_attr *la = &info->mti_la_for_fix;
+       int rc;
+
+       rc = mdo_declare_index_delete(env, spobj, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       if (S_ISDIR(attr->la_mode)) {
+               rc = mdo_declare_ref_del(env, spobj, handle);
+               if (rc)
+                       return rc;
+       }
+
+       rc = mdo_declare_index_insert(env, tpobj, fid, mdd_object_type(sobj),
+                                     lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_declare_links_add(env, do_create ? tobj : sobj, handle, ldata);
+       if (rc)
+               return rc;
+
+       if (S_ISDIR(attr->la_mode)) {
+               rc = mdo_declare_ref_add(env, tpobj, handle);
+               if (rc)
+                       return rc;
+       }
+
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdo_declare_attr_set(env, spobj, la, handle);
+       if (rc)
+               return rc;
+
+       if (tpobj != spobj) {
+               rc = mdo_declare_attr_set(env, tpobj, la, handle);
+               if (rc)
+                       return rc;
+       }
+
+       if (do_create && do_destroy) {
+               rc = mdo_declare_ref_del(env, sobj, handle);
+               if (rc)
+                       return rc;
+
+               rc = mdo_declare_destroy(env, sobj, handle);
+               if (rc)
+                       return rc;
+       }
+
+       return rc;
+}
+
+/**
+ * migrate dirent from \a spobj to \a tpobj, and destroy \a sobj
+ **/
+static int mdd_migrate_update(const struct lu_env *env,
+                             struct mdd_object *spobj,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *lname,
+                             struct lu_attr *attr,
+                             struct lu_attr *spattr,
+                             struct lu_attr *tpattr,
+                             struct linkea_data *ldata,
+                             bool do_create,
+                             bool do_destroy,
+                             struct md_attr *ma,
+                             struct thandle *handle)
 {
-       return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                         child_name, ldata, handle, 1);
+       struct mdd_thread_info *info = mdd_env_info(env);
+       const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
+       struct lu_attr *la = &info->mti_la_for_fix;
+       int rc;
+
+       ENTRY;
+
+       CDEBUG(D_INFO, "update %s "DFID"/"DFID" to "DFID"/"DFID"\n",
+              lname->ln_name, PFID(mdo2fid(spobj)),
+              PFID(mdo2fid(sobj)), PFID(mdo2fid(tpobj)),
+              PFID(fid));
+
+       rc = __mdd_index_delete(env, spobj, lname->ln_name,
+                               S_ISDIR(attr->la_mode), handle);
+       if (rc)
+               RETURN(rc);
+
+       rc = __mdd_index_insert(env, tpobj, fid, mdd_object_type(sobj),
+                               lname->ln_name, handle);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_links_write(env, do_create ? tobj : sobj, ldata, handle);
+       if (rc)
+               RETURN(rc);
+
+       la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+       la->la_valid = LA_CTIME | LA_MTIME;
+       mdd_write_lock(env, spobj, DT_SRC_PARENT);
+       rc = mdd_update_time(env, spobj, spattr, la, handle);
+       mdd_write_unlock(env, spobj);
+       if (rc)
+               RETURN(rc);
+
+       if (tpobj != spobj) {
+               la->la_valid = LA_CTIME | LA_MTIME;
+               mdd_write_lock(env, tpobj, DT_TGT_PARENT);
+               rc = mdd_update_time(env, tpobj, tpattr, la, handle);
+               mdd_write_unlock(env, tpobj);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       /*
+        * there are three situations we shouldn't destroy source:
+        * 1. if source is not dir, and it happens to be located on the same MDT
+        *    as target parent.
+        * 2. if source is not dir, and has link on the same MDT where source is
+        *    located.
+        * 3. if source is dir, and it's a normal, non-empty dir.
+        *
+        * the first two situations equals to !do_create, and the 3rd equals to
+        * !do_destroy, so the below condition is actually
+        * !(!do_create || !do_destroy).
+        *
+        * NB, if user has opened source dir before migration, he will get
+        * -ENOENT error when close it later, because source is likely to be
+        *  remote, which can't be moved to orphan list, but except this error
+        *  message, this won't cause any inconsistency or trouble.
+        */
+       if (do_create && do_destroy) {
+               mdd_write_lock(env, sobj, DT_SRC_CHILD);
+               mdo_ref_del(env, sobj, handle);
+               rc = mdo_destroy(env, sobj, handle);
+               mdd_write_unlock(env, sobj);
+       }
+
+       RETURN(rc);
 }
 
-static int mdd_update_linkea(const struct lu_env *env,
-                            struct mdd_object *mdd_pobj,
-                            struct mdd_object *mdd_sobj,
-                            struct mdd_object *mdd_tobj,
-                            const struct lu_name *child_name,
-                            struct linkea_data *ldata,
-                            struct thandle *handle)
-{
-       return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                         child_name, ldata, handle, 0);
+/**
+ * Migrate directory or file.
+ *
+ * migrate source to target in following steps:
+ *   1. create target, append source stripes after target's if it's directory,
+ *      migrate xattrs and update fid of source links.
+ *   2. update namespace: migrate dirent from source parent to target parent,
+ *      update file linkea, and destroy source if it's not needed any more.
+ *
+ * \param[in] env      execution environment
+ * \param[in] md_pobj  parent master object
+ * \param[in] md_sobj  source object
+ * \param[in] lname    file name
+ * \param[in] md_tobj  target object
+ * \param[in] spec     target creation spec
+ * \param[in] ma       used to update \a pobj mtime and ctime
+ *
+ * \retval             0 on success
+ * \retval             -errno on failure
+ */
+static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
+                      struct md_object *md_sobj, const struct lu_name *lname,
+                      struct md_object *md_tobj, struct md_op_spec *spec,
+                      struct md_attr *ma)
+{
+       struct mdd_device *mdd = mdo2mdd(md_pobj);
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_object *pobj = md2mdd_obj(md_pobj);
+       struct mdd_object *sobj = md2mdd_obj(md_sobj);
+       struct mdd_object *tobj = md2mdd_obj(md_tobj);
+       struct mdd_object *spobj = NULL;
+       struct mdd_object *tpobj = NULL;
+       struct lu_attr *spattr = &info->mti_pattr;
+       struct lu_attr *tpattr = &info->mti_tpattr;
+       struct lu_attr *attr = &info->mti_cattr;
+       struct linkea_data *ldata = &info->mti_link_data;
+       struct dt_allocation_hint *hint = &info->mti_hint;
+       struct lu_fid *fid = &info->mti_fid2;
+       struct lu_buf pbuf = { NULL };
+       struct lu_buf sbuf = { NULL };
+       struct lmv_mds_md_v1 *plmv;
+       struct thandle *handle;
+       bool do_create = true;
+       bool do_destroy = true;
+       int rc;
+       ENTRY;
+
+       rc = mdd_la_get(env, sobj, attr);
+       if (rc)
+               RETURN(rc);
+
+       /* locate source and target stripe on pobj, which are the real parent */
+       rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
+       if (rc < 0 && rc != -ENODATA)
+               RETURN(rc);
+
+       plmv = pbuf.lb_buf;
+       if (plmv) {
+               __u32 hash_type = le32_to_cpu(plmv->lmv_hash_type);
+               __u32 count = le32_to_cpu(plmv->lmv_stripe_count);
+               int index;
+
+               /* locate target parent stripe */
+               if (hash_type & LMV_HASH_FLAG_MIGRATION) {
+                       /*
+                        * fail check here to make sure top dir migration
+                        * succeed.
+                        */
+                       if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+                               GOTO(out, rc = -EIO);
+                       hash_type &= ~LMV_HASH_FLAG_MIGRATION;
+                       count = le32_to_cpu(plmv->lmv_migrate_offset);
+               }
+               index = lmv_name_to_stripe_index(hash_type, count,
+                                                lname->ln_name,
+                                                lname->ln_namelen);
+               if (index < 0)
+                       GOTO(out, rc = index);
+
+               fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+               tpobj = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(tpobj))
+                       GOTO(out, rc = PTR_ERR(tpobj));
+
+               /* locate source parent stripe */
+               if (le32_to_cpu(plmv->lmv_hash_type) &
+                   LMV_HASH_FLAG_MIGRATION) {
+                       hash_type = le32_to_cpu(plmv->lmv_migrate_hash);
+                       count = le32_to_cpu(plmv->lmv_stripe_count) -
+                               le32_to_cpu(plmv->lmv_migrate_offset);
+
+                       index = lmv_name_to_stripe_index(hash_type, count,
+                                                        lname->ln_name,
+                                                        lname->ln_namelen);
+                       if (index < 0) {
+                               mdd_object_put(env, tpobj);
+                               GOTO(out, rc = index);
+                       }
+
+                       index += le32_to_cpu(plmv->lmv_migrate_offset);
+                       fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+                       spobj = mdd_object_find(env, mdd, fid);
+                       if (IS_ERR(spobj)) {
+                               mdd_object_put(env, tpobj);
+                               GOTO(out, rc = PTR_ERR(spobj));
+                       }
+               } else {
+                       spobj = tpobj;
+                       mdd_object_get(spobj);
+               }
+       } else {
+               tpobj = pobj;
+               spobj = pobj;
+               mdd_object_get(tpobj);
+               mdd_object_get(spobj);
+       }
+
+       rc = mdd_la_get(env, spobj, spattr);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mdd_la_get(env, tpobj, tpattr);
+       if (rc)
+               GOTO(out, rc);
+
+       if (S_ISDIR(attr->la_mode)) {
+               struct lmv_user_md_v1 *lmu = spec->u.sp_ea.eadata;
+
+               LASSERT(lmu);
+
+               /*
+                * if user use default value '0' for stripe_count, we need to
+                * adjust it to '1' to create a 1-stripe directory.
+                */
+               if (lmu->lum_stripe_count == 0) {
+                       /* eadata is from request, don't alter it */
+                       info->mti_lmu = *lmu;
+                       info->mti_lmu.lum_stripe_count = cpu_to_le32(1);
+                       spec->u.sp_ea.eadata = &info->mti_lmu;
+                       lmu = spec->u.sp_ea.eadata;
+               }
+
+               rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
+               if (rc == -ENODATA) {
+                       if (mdd_dir_is_empty(env, sobj) == 0) {
+                               /*
+                                * if sobj is empty, and target is not striped,
+                                * create target as a normal directory.
+                                */
+                               if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
+                                       info->mti_lmu = *lmu;
+                                       info->mti_lmu.lum_stripe_count = 0;
+                                       spec->u.sp_ea.eadata = &info->mti_lmu;
+                                       lmu = spec->u.sp_ea.eadata;
+                               }
+                       } else {
+                               /*
+                                * sobj is not striped dir, if it's not empty,
+                                * it will be migrated to be a stripe of target,
+                                * don't destroy it after migration.
+                                */
+                               do_destroy = false;
+                       }
+               } else if (rc) {
+                       GOTO(out, rc);
+               } else {
+                       struct lmv_mds_md_v1 *lmv = sbuf.lb_buf;
+
+                       if (le32_to_cpu(lmv->lmv_hash_type) &
+                           LMV_HASH_FLAG_MIGRATION) {
+                               __u32 lum_stripe_count = lmu->lum_stripe_count;
+                               __u32 lmv_hash_type = lmv->lmv_hash_type &
+                                       cpu_to_le32(LMV_HASH_TYPE_MASK);
+
+                               if (!lum_stripe_count)
+                                       lum_stripe_count = cpu_to_le32(1);
+
+                               /* TODO: check specific MDTs */
+                               if (lmv->lmv_migrate_offset !=
+                                   lum_stripe_count ||
+                                   lmv->lmv_master_mdt_index !=
+                                   lmu->lum_stripe_offset ||
+                                   (lmv_hash_type != 0 &&
+                                    lmv_hash_type != lmu->lum_hash_type)) {
+                                       CERROR("%s: \'"DNAME"\' migration was "
+                                               "interrupted, run \'lfs migrate "
+                                               "-m %d -c %d -H %d "DNAME"\' to "
+                                               "finish migration.\n",
+                                               mdd2obd_dev(mdd)->obd_name,
+                                               PNAME(lname),
+                                               le32_to_cpu(
+                                                   lmv->lmv_master_mdt_index),
+                                               le32_to_cpu(
+                                                   lmv->lmv_migrate_offset),
+                                               le32_to_cpu(lmv_hash_type),
+                                               PNAME(lname));
+                                       GOTO(out, rc = -EPERM);
+                               }
+                               GOTO(out, rc = -EALREADY);
+                       }
+               }
+       } else if (!mdd_object_remote(tpobj)) {
+               /*
+                * if source is already on MDT where target parent is located,
+                * no need to create, just update namespace.
+                */
+               do_create = false;
+       } else if (S_ISLNK(attr->la_mode)) {
+               lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
+               if (!sbuf.lb_buf)
+                       GOTO(out, rc = -ENOMEM);
+               rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
+               if (rc <= 0) {
+                       rc = rc ?: -EFAULT;
+                       CERROR("%s: "DFID" readlink failed: rc = %d\n",
+                              mdd2obd_dev(mdd)->obd_name,
+                              PFID(mdo2fid(sobj)), rc);
+                       GOTO(out, rc);
+               }
+               spec->u.sp_symname = sbuf.lb_buf;
+       } else if (S_ISREG(attr->la_mode)) {
+               spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
+               spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
+       }
+
+       /*
+        * if sobj has link on the same MDT, no need to create, just update
+        * namespace, and it will be a remote file on target parent, which is
+        * similar to rename.
+        */
+       rc = migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, lname, attr,
+                                   ldata);
+       if (rc > 0)
+               do_create = false;
+       else if (rc)
+               GOTO(out, rc);
+
+       rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
+                                     spattr, tpattr, attr);
+       if (rc)
+               GOTO(out, rc);
+
+       mdd_object_make_hint(env, tpobj, tobj, attr, spec, hint);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               GOTO(out, rc = PTR_ERR(handle));
+
+       if (do_create) {
+               rc = mdd_declare_migrate_create(env, tpobj, sobj, tobj, lname,
+                                               attr, &sbuf, ldata, spec, hint,
+                                               handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
+       }
+
+       rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, tobj, lname,
+                                       attr, spattr, tpattr, ldata, do_create,
+                                       do_destroy, ma, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+                                        handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       if (do_create) {
+               rc = mdd_migrate_create(env, tpobj, sobj, tobj, lname, attr,
+                                       &sbuf, ldata, spec, hint, handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
+       }
+
+       rc = mdd_migrate_update(env, spobj, tpobj, sobj, tobj, lname, attr,
+                               spattr, tpattr, ldata, do_create, do_destroy,
+                               ma, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
+                                   mdo2fid(spobj), mdo2fid(sobj),
+                                   mdo2fid(tpobj), lname, lname, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       EXIT;
+stop_trans:
+       rc = mdd_trans_stop(env, mdd, rc, handle);
+out:
+       if (spobj && !IS_ERR(spobj))
+               mdd_object_put(env, spobj);
+       if (tpobj && !IS_ERR(tpobj))
+               mdd_object_put(env, tpobj);
+       lu_buf_free(&sbuf);
+       lu_buf_free(&pbuf);
+       return rc;
 }
 
-static int mdd_declare_migrate_update_name(const struct lu_env *env,
-                                          struct mdd_object *mdd_pobj,
-                                          struct mdd_object *mdd_sobj,
-                                          struct mdd_object *mdd_tobj,
-                                          const struct lu_name *lname,
-                                          struct lu_attr *la,
-                                          struct lu_attr *parent_la,
-                                          struct linkea_data *ldata,
+static int __mdd_dir_declare_layout_shrink(const struct lu_env *env,
+                                          struct mdd_object *pobj,
+                                          struct mdd_object *obj,
+                                          struct mdd_object *stripe,
+                                          struct lu_attr *attr,
+                                          struct lu_buf *lmv_buf,
+                                          const struct lu_buf *lmu_buf,
+                                          struct lu_name *lname,
                                           struct thandle *handle)
 {
-       struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+       struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
+       struct lu_buf shrink_buf = { .lb_buf = lmu,
+                                    .lb_len = sizeof(*lmu) };
        int rc;
 
-       /* Revert IMMUTABLE flag */
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
-       rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
-       if (rc != 0)
-               return rc;
+       LASSERT(lmv);
 
-       /* delete entry from source dir */
-       rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
-       if (rc != 0)
-               return rc;
+       memcpy(lmu, lmu_buf->lb_buf, sizeof(*lmu));
 
-       if (ldata->ld_buf != NULL) {
-               rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
-                                              mdd_tobj, lname, ldata, handle);
-               if (rc != 0)
-                       return rc;
-       }
+       if (le32_to_cpu(lmu->lum_stripe_count) < 2)
+               lmu->lum_stripe_count = 0;
 
-       if (S_ISREG(mdd_object_type(mdd_sobj))) {
-               rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
+       rc = mdd_dir_declare_layout_delete(env, obj, lmv_buf, &shrink_buf,
                                           handle);
-               if (rc != 0)
-                       return rc;
+       if (rc)
+               return rc;
 
-               handle->th_complex = 1;
-               rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
-                                          XATTR_NAME_FID,
-                                          LU_XATTR_REPLACE, handle);
-               if (rc < 0)
-                       return rc;
-       }
+       if (lmu->lum_stripe_count == 0) {
+               lmu->lum_stripe_count = cpu_to_le32(1);
 
-       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
-               rc = mdo_declare_ref_del(env, mdd_pobj, handle);
-               if (rc != 0)
+               rc = mdo_declare_xattr_del(env, obj, XATTR_NAME_LMV, handle);
+               if (rc)
                        return rc;
        }
 
-       /* new name */
-       rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
-                                     mdd_object_type(mdd_tobj),
-                                     lname->ln_name, handle);
-       if (rc != 0)
+       rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, &shrink_buf, handle,
+                                    mdd_dir_declare_destroy_stripe);
+       if (rc)
                return rc;
 
-       rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL, MLAO_IGNORE);
-       if (rc != 0)
+       if (le32_to_cpu(lmu->lum_stripe_count) > 1)
+               return mdo_declare_xattr_set(env, obj, lmv_buf,
+                                            XATTR_NAME_LMV".set", 0, handle);
+
+       rc = mdo_declare_index_insert(env, stripe, mdo2fid(pobj), S_IFDIR,
+                                     dotdot, handle);
+       if (rc)
                return rc;
 
-       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
-               rc = mdo_declare_ref_add(env, mdd_pobj, handle);
-               if (rc != 0)
-                       return rc;
-       }
+       rc = mdd_iterate_xattrs(env, obj, stripe, false, handle,
+                               mdo_declare_xattr_set);
+       if (rc)
+               return rc;
 
-       /* delete old object */
-       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-       if (rc != 0)
+       rc = mdo_declare_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
+       if (rc)
                return rc;
 
-       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
-               /* delete old object */
-               rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-               if (rc != 0)
-                       return rc;
-               /* set nlink to 0 */
-               rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
-               if (rc != 0)
-                       return rc;
-       }
+       rc = mdo_declare_attr_set(env, stripe, attr, handle);
+       if (rc)
+               return rc;
 
-       rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
+       rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
        if (rc)
                return rc;
 
-       rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
-       if (rc != 0)
+       rc = mdo_declare_index_insert(env, pobj, mdo2fid(stripe), attr->la_mode,
+                                     lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_ref_del(env, obj, handle);
+       if (rc)
                return rc;
 
-       rc = mdd_declare_changelog_store(env, mdd, lname, NULL, handle);
+       rc = mdo_declare_ref_del(env, obj, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_destroy(env, obj, handle);
+       if (rc)
+               return rc;
 
        return rc;
+
 }
 
-static int mdd_migrate_update_name(const struct lu_env *env,
-                                  struct mdd_object *mdd_pobj,
-                                  struct mdd_object *mdd_sobj,
-                                  struct mdd_object *mdd_tobj,
-                                  const struct lu_name *lname,
-                                  struct md_attr *ma)
+/*
+ * after files under \a obj were migrated, shrink old stripes from \a obj,
+ * furthermore, if it becomes a 1-stripe directory, convert it to a normal one.
+ */
+static int __mdd_dir_layout_shrink(const struct lu_env *env,
+                                  struct mdd_object *pobj,
+                                  struct mdd_object *obj,
+                                  struct mdd_object *stripe,
+                                  struct lu_attr *attr,
+                                  struct lu_buf *lmv_buf,
+                                  const struct lu_buf *lmu_buf,
+                                  struct lu_name *lname,
+                                  struct thandle *handle)
 {
-       struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
-       struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
-       struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
-       struct thandle          *handle;
-       int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
-       const char              *name = lname->ln_name;
-       int                     rc;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+       struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
+       struct lu_buf shrink_buf = { .lb_buf = lmu,
+                                    .lb_len = sizeof(*lmu) };
+       int len = lmv_buf->lb_len;
+       __u32 version = le32_to_cpu(lmv->lmv_layout_version);
+       int rc;
+
        ENTRY;
 
-       /* update time for parent */
-       LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-       p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
-       p_la->la_valid = LA_CTIME;
+       /* lmu needs to be altered, but lmu_buf is const */
+       memcpy(lmu, lmu_buf->lb_buf, sizeof(*lmu));
 
-       rc = mdd_la_get(env, mdd_sobj, so_attr);
-       if (rc != 0)
-               RETURN(rc);
+       /*
+        * if dir will be shrunk to 1-stripe, delete all stripes, because it
+        * will be converted to normal dir.
+        */
+       if (le32_to_cpu(lmu->lum_stripe_count) == 1)
+               lmu->lum_stripe_count = 0;
 
-       ldata->ld_buf = NULL;
-       rc = mdd_links_read(env, mdd_sobj, ldata);
-       if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
+       /* delete stripes after lmu_stripe_count */
+       rc = mdd_dir_layout_delete(env, obj, lmv_buf, &shrink_buf, handle);
+       if (rc)
                RETURN(rc);
 
-       handle = mdd_trans_create(env, mdd);
-       if (IS_ERR(handle))
-               RETURN(PTR_ERR(handle));
+       if (lmu->lum_stripe_count == 0) {
+               lmu->lum_stripe_count = cpu_to_le32(1);
 
-       rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                            lname, so_attr, p_la, ldata,
-                                            handle);
-       if (rc != 0) {
-               /* If the migration can not be fit in one transaction, just
-                * leave it in the original MDT */
-               if (rc == -E2BIG)
-                       GOTO(stop_trans, rc = 0);
-               else
-                       GOTO(stop_trans, rc);
+               /* delete LMV to avoid deleting stripes again upon destroy */
+               mdd_write_lock(env, obj, DT_SRC_CHILD);
+               rc = mdo_xattr_del(env, obj, XATTR_NAME_LMV, handle);
+               mdd_write_unlock(env, obj);
+               if (rc)
+                       RETURN(rc);
        }
 
-       CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
-              mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
-              PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
-              PFID(mdd_object_fid(mdd_tobj)));
-
-       rc = mdd_trans_start(env, mdd, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
-
-       /* Revert IMMUTABLE flag */
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
-       rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
-
-       /* Remove source name from source directory */
-       rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
-
-       if (ldata->ld_buf != NULL) {
-               rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                      lname, ldata, handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-
-               /*  linkea update might decrease the source object
-                *  nlink, let's get the attr again after ref_del */
-               rc = mdd_la_get(env, mdd_sobj, so_attr);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
+       /* destroy stripes after lmu_stripe_count */
+       mdd_write_lock(env, obj, DT_SRC_PARENT);
+       rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, &shrink_buf, handle,
+                                    mdd_dir_destroy_stripe);
+       mdd_write_unlock(env, obj);
+
+       if (le32_to_cpu(lmu->lum_stripe_count) > 1) {
+               /* update dir LMV, that's all if it's still striped. */
+               lmv->lmv_stripe_count = lmu->lum_stripe_count;
+               lmv->lmv_hash_type &= ~cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
+               lmv->lmv_migrate_offset = 0;
+               lmv->lmv_migrate_hash = 0;
+               lmv->lmv_layout_version = cpu_to_le32(++version);
+
+               lmv_buf->lb_len = sizeof(*lmv);
+               rc = mdo_xattr_set(env, obj, lmv_buf, XATTR_NAME_LMV".set", 0,
+                                  handle);
+               lmv_buf->lb_len = len;
+               RETURN(rc);
        }
 
-       if (S_ISREG(so_attr->la_mode)) {
-               if (so_attr->la_nlink == 1) {
-                       rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
-                                          handle);
-                       if (rc != 0 && rc != -ENODATA)
-                               GOTO(stop_trans, rc);
-
-                       rc = mdo_xattr_set(env, mdd_tobj, NULL,
-                                          XATTR_NAME_FID,
-                                          LU_XATTR_REPLACE, handle);
-                       if (rc < 0)
-                               GOTO(stop_trans, rc);
-               }
-       }
+       /* replace directory with its remaining stripe */
+       LASSERT(pobj);
+       LASSERT(stripe);
 
-       /* Insert new fid with target name into target dir */
-       rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
-                               mdd_object_type(mdd_tobj), name, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       mdd_write_lock(env, pobj, DT_SRC_PARENT);
+       mdd_write_lock(env, obj, DT_SRC_CHILD);
 
-       linkea_add_buf(ldata, lname, mdd_object_fid(mdd_pobj));
-       rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
-                          ldata, 1);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       /* insert dotdot to stripe which points to parent */
+       rc = __mdd_index_insert_only(env, stripe, mdo2fid(pobj), S_IFDIR,
+                                    dotdot, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+       /* copy xattrs including linkea */
+       rc = mdd_iterate_xattrs(env, obj, stripe, false, handle, mdo_xattr_set);
+       if (rc)
+               GOTO(out, rc);
 
-       mdd_sobj->mod_flags |= DEAD_OBJ;
-       rc = mdd_mark_orphan_object(env, mdd_sobj, handle, false);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       /* delete LMV */
+       rc = mdo_xattr_del(env, stripe, XATTR_NAME_LMV, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       rc = __mdd_orphan_add(env, mdd_sobj, handle);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       /* don't set nlink from parent */
+       attr->la_valid &= ~LA_NLINK;
 
-       mdo_ref_del(env, mdd_sobj, handle);
-       if (is_dir)
-               mdo_ref_del(env, mdd_sobj, handle);
+       rc = mdo_attr_set(env, stripe, attr, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       /* Get the attr again after ref_del */
-       rc = mdd_la_get(env, mdd_sobj, so_attr);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       /* delete dir name from parent */
+       rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       ma->ma_attr = *so_attr;
-       ma->ma_valid |= MA_INODE;
+       /* insert stripe to parent with dir name */
+       rc = __mdd_index_insert_only(env, pobj, mdo2fid(stripe), attr->la_mode,
+                                    lname->ln_name, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       /* destroy dir obj */
+       rc = mdo_ref_del(env, obj, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
-                              mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
-                              mdo2fid(mdd_pobj), lname, lname, handle);
-       if (rc != 0) {
-               CWARN("%s: changelog for migrate %s "DFID
-                     "under "DFID" failed: rc = %d\n",
-                     mdd2obd_dev(mdd)->obd_name, lname->ln_name,
-                     PFID(mdd_object_fid(mdd_sobj)),
-                     PFID(mdd_object_fid(mdd_pobj)), rc);
-               /* Sigh, there are no easy way to migrate back the object, so
-                * let's reset the result to 0 for now XXX */
-               rc = 0;
-       }
-out_unlock:
-       mdd_write_unlock(env, mdd_sobj);
+       rc = mdo_ref_del(env, obj, handle);
+       if (rc)
+               GOTO(out, rc);
 
-stop_trans:
-       rc = mdd_trans_stop(env, mdd, rc, handle);
+       rc = mdo_destroy(env, obj, handle);
+       if (rc)
+               GOTO(out, rc);
 
-       RETURN(rc);
+       EXIT;
+out:
+       mdd_write_unlock(env, obj);
+       mdd_write_unlock(env, pobj);
+
+       return rc;
 }
 
-static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
-                         const struct lu_fid *fid, __u32 *mdt_index)
+/*
+ * shrink directory stripes to lum_stripe_count specified by lum_mds_md.
+ */
+int mdd_dir_layout_shrink(const struct lu_env *env,
+                         struct md_object *md_obj,
+                         const struct lu_buf *lmu_buf)
 {
-       struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
-       struct seq_server_site *ss;
+       struct mdd_device *mdd = mdo2mdd(md_obj);
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_object *obj = md2mdd_obj(md_obj);
+       struct mdd_object *pobj = NULL;
+       struct mdd_object *stripe = NULL;
+       struct lu_attr *attr = &info->mti_pattr;
+       struct lu_fid *fid = &info->mti_fid2;
+       struct lu_name lname = { NULL };
+       struct lu_buf lmv_buf = { NULL };
+       struct lmv_mds_md_v1 *lmv;
+       struct lmv_user_md *lmu;
+       struct thandle *handle;
        int rc;
 
-       ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
-
-       range->lsr_flags = LU_SEQ_RANGE_MDT;
-       rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
-       if (rc != 0)
-               return rc;
-
-       *mdt_index = range->lsr_index;
-
-       return 0;
-}
-/**
- * Check whether we should migrate the file/dir
- * return val
- *     < 0  permission check failed or other error.
- *     = 0  the file can be migrated.
- *     > 0  the file does not need to be migrated, mostly
- *          for multiple link file
- **/
-static int mdd_migrate_sanity_check(const struct lu_env *env,
-                                   struct mdd_object *pobj,
-                                   const struct lu_attr *pattr,
-                                   struct mdd_object *sobj,
-                                   struct lu_attr *sattr)
-{
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct linkea_data      *ldata = &info->mti_link_data;
-       struct mdd_device       *mdd = mdo2mdd(&pobj->mod_obj);
-       int                     mgr_easize;
-       struct lu_buf           *mgr_buf;
-       int                     count;
-       int                     rc;
-       __u64 mdt_index;
        ENTRY;
 
-       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
-       mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
-       if (mgr_buf->lb_buf == NULL)
-               RETURN(-ENOMEM);
+       rc = mdd_la_get(env, obj, attr);
+       if (rc)
+               RETURN(rc);
 
-       rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
-       if (rc > 0) {
-               union lmv_mds_md *lmm = mgr_buf->lb_buf;
-
-               /* If the object has migrateEA, it means IMMUTE flag
-                * is being set by previous migration process, so it
-                * needs to override the IMMUTE flag, otherwise the
-                * following sanity check will fail */
-               if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
-                                               LMV_HASH_FLAG_MIGRATION) {
-                       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
-
-                       sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
-                       CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
-                              mdd2obd_dev(mdd)->obd_name,
-                              PFID(mdd_object_fid(sobj)));
-               }
-       }
+       if (!S_ISDIR(attr->la_mode))
+               RETURN(-ENOTDIR);
 
-       rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
-                                    sobj, sattr, NULL, NULL);
-       if (rc != 0)
+       rc = mdd_stripe_get(env, obj, &lmv_buf, XATTR_NAME_LMV);
+       if (rc < 0)
                RETURN(rc);
 
-       /* Then it will check if the file should be migrated. If the file
-        * has mulitple links, we only need migrate the file if all of its
-        * entries has been migrated to the remote MDT */
-       if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
-               RETURN(0);
+       lmv = lmv_buf.lb_buf;
+       lmu = lmu_buf->lb_buf;
 
-       rc = mdd_links_read(env, sobj, ldata);
-       if (rc != 0) {
-               /* For multiple links files, if there are no linkEA data at all,
-                * means the file might be created before linkEA is enabled, and
-                * all of its links should not be migrated yet, otherwise it
-                * should have some linkEA there */
-               if (rc == -ENOENT || rc == -ENODATA)
-                       RETURN(1);
-               RETURN(rc);
-       }
+       /* this was checked in MDT */
+       LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
+               le32_to_cpu(lmv->lmv_stripe_count));
 
-       mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
-       /* If there are still links locally, then the file will not be
-        * migrated. */
-       LASSERT(ldata->ld_leh != NULL);
-       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
-       for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
-               struct lu_name          lname;
-               struct lu_fid           fid;
-               __u32                   parent_mdt_index;
-
-               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
-                                   &lname, &fid);
-               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
-                                                        ldata->ld_reclen);
-
-               rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
-               if (rc != 0)
-                       RETURN(rc);
+       rc = mdd_dir_iterate_stripes(env, obj, &lmv_buf, lmu_buf, NULL,
+                                    mdd_shrink_stripe_is_empty);
+       if (rc < 0)
+               GOTO(out, rc);
+       else if (rc != 0)
+               GOTO(out, rc = -ENOTEMPTY);
 
-               /* Migrate the object only if none of its parents are on the
-                * current MDT. */
-               if (parent_mdt_index != mdt_index)
-                       continue;
+       /*
+        * if obj stripe count will be shrunk to 1, we need to convert it to a
+        * normal dir, which will change its fid and update parent namespace,
+        * get obj name and parent fid from linkea.
+        */
+       if (le32_to_cpu(lmu->lum_stripe_count) < 2) {
+               struct linkea_data *ldata = &info->mti_link_data;
+               char *filename = info->mti_name;
 
-               CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
-                      PFID(mdd_object_fid(sobj)), lname.ln_namelen,
-                      lname.ln_name, PFID(&fid));
-               rc = 1;
-               break;
-       }
+               rc = mdd_links_read(env, obj, ldata);
+               if (rc)
+                       GOTO(out, rc);
 
-       RETURN(rc);
-}
+               if (ldata->ld_leh->leh_reccount > 1)
+                       GOTO(out, rc = -EINVAL);
 
-static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
-                      struct md_object *sobj, const struct lu_name *lname,
-                      struct md_object *tobj, struct md_attr *ma)
-{
-       struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
-       struct mdd_device       *mdd = mdo2mdd(pobj);
-       struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
-       struct mdd_object       *mdd_tobj = md2mdd_obj(tobj);
-       struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
-       struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
-       bool                    created = false;
-       int                     rc;
+               linkea_first_entry(ldata);
+               if (!ldata->ld_lee)
+                       GOTO(out, rc = -ENODATA);
 
-       ENTRY;
-       /* If the file will being migrated, it will check whether
-        * the file is being opened by someone else right now */
-       mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
-       if (mdd_sobj->mod_count > 0) {
-               CDEBUG(D_OTHER,
-                      "%s: "DFID"%s is already opened count %d: rc = %d\n",
-                      mdd2obd_dev(mdd)->obd_name,
-                      PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
-                      mdd_sobj->mod_count, -EBUSY);
-               mdd_read_unlock(env, mdd_sobj);
-               GOTO(put, rc = -EBUSY);
-       }
-       mdd_read_unlock(env, mdd_sobj);
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
+                                   fid);
 
-       rc = mdd_la_get(env, mdd_sobj, so_attr);
-       if (rc != 0)
-               GOTO(put, rc);
+               /* Note: lname might miss \0 at the end */
+               snprintf(filename, sizeof(info->mti_name), "%.*s",
+                        lname.ln_namelen, lname.ln_name);
+               lname.ln_name = filename;
 
-       rc = mdd_la_get(env, mdd_pobj, pattr);
-       if (rc != 0)
-               GOTO(put, rc);
+               pobj = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(pobj)) {
+                       rc = PTR_ERR(pobj);
+                       pobj = NULL;
+                       GOTO(out, rc);
+               }
 
-       rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
-       if (rc != 0) {
-               if (rc > 0)
-                       rc = 0;
-               GOTO(put, rc);
+               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]);
+
+               stripe = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(stripe)) {
+                       mdd_object_put(env, pobj);
+                       pobj = NULL;
+                       GOTO(out, rc = PTR_ERR(stripe));
+               }
        }
 
-       /* Sigh, it is impossible to finish all of migration in a single
-        * transaction, for example migrating big directory entries to the
-        * new MDT, it needs insert all of name entries of children in the
-        * new directory.
-        *
-        * So migration will be done in multiple steps and transactions.
-        *
-        * 1. create an orphan object on the remote MDT in one transaction.
-        * 2. migrate extend attributes to the new target file/directory.
-        * 3. For directory, migrate the entries to the new MDT and update
-        * linkEA of each children. Because we can not migrate all entries
-        * in a single transaction, so the migrating directory will become
-        * a striped directory during migration, so once the process is
-        * interrupted, the directory is still accessible. (During lookup,
-        * client will locate the name by searching both original and target
-        * object).
-        * 4. Finally, update the name/FID to point to the new file/directory
-        * in a separate transaction.
-        */
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               GOTO(out, rc = PTR_ERR(handle));
 
-       /* step 1: Check whether the orphan object has been created, and create
-        * orphan object on the remote MDT if needed */
-       if (!mdd_object_exists(mdd_tobj)) {
-               rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                       lname, so_attr);
-               if (rc != 0)
-                       GOTO(put, rc);
-               created = true;
-       }
+       rc = __mdd_dir_declare_layout_shrink(env, pobj, obj, stripe, attr,
+                                            &lmv_buf, lmu_buf, &lname, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       LASSERT(mdd_object_exists(mdd_tobj));
-       /* step 2: migrate xattr */
-       rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
-       if (rc != 0)
-               GOTO(put, rc);
+       rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
+                                        handle);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       /* step 3: migrate name entries to the orphan object */
-       if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
-               rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
-               if (rc != 0)
-                       GOTO(put, rc);
-               if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
-                                                 OBD_FAIL_MDS_REINT_NET_REP)))
-                       GOTO(put, rc = 0);
-       } else {
-               OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
-       }
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       LASSERT(mdd_object_exists(mdd_tobj));
-       /* step 4: update name entry to the new object */
-       rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
-                                    ma);
-       if (rc != 0)
-               GOTO(put, rc);
+       rc = __mdd_dir_layout_shrink(env, pobj, obj, stripe, attr, &lmv_buf,
+                                    lmu_buf, &lname, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       /* newly created target was not locked, don't cache its attributes */
-       if (created)
-               mdd_invalidate(env, tobj);
-put:
-       RETURN(rc);
+       rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
+                                           XATTR_NAME_LMV, handle);
+       GOTO(stop_trans, rc);
+
+stop_trans:
+       rc = mdd_trans_stop(env, mdd, rc, handle);
+out:
+       if (pobj) {
+               mdd_object_put(env, stripe);
+               mdd_object_put(env, pobj);
+       }
+       lu_buf_free(&lmv_buf);
+       return rc;
 }
 
 const struct md_dir_operations mdd_dir_ops = {