Whamcloud - gitweb
LU-14579 flr: mirror unlink and split race
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index c978a46..2f69cc7 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/mdd/mdd_dir.c
  *
@@ -70,7 +69,7 @@ mdd_name_check(struct mdd_device *m, const struct lu_name *ln)
 static int
 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
             const struct lu_attr *pattr, const struct lu_name *lname,
-            struct lu_fid* fid, int mask)
+            struct lu_fid *fid, unsigned int may_mask)
 {
        const char *name = lname->ln_name;
        const struct dt_key *key = (const struct dt_key *)name;
@@ -92,7 +91,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                       PFID(mdd_object_fid(mdd_obj)));
        }
 
-       rc = mdd_permission_internal_locked(env, mdd_obj, pattr, mask,
+       rc = mdd_permission_internal_locked(env, mdd_obj, pattr, may_mask,
                                            DT_TGT_PARENT);
        if (rc)
                RETURN(rc);
@@ -124,7 +123,7 @@ int mdd_lookup(const struct lu_env *env,
 }
 
 /** Read the link EA into a temp buffer.
- * Uses the mdd_thread_info::mti_big_buf since it is generally large.
+ * Uses the mdd_thread_info::mti_link_buf since it is generally large.
  * A pointer to the buffer is stored in \a ldata::ld_buf.
  *
  * \retval 0 or error
@@ -168,9 +167,9 @@ static int __mdd_links_read(const struct lu_env *env,
        return linkea_init(ldata);
 }
 
-static int mdd_links_read(const struct lu_env *env,
-                         struct mdd_object *mdd_obj,
-                         struct linkea_data *ldata)
+int mdd_links_read(const struct lu_env *env,
+                  struct mdd_object *mdd_obj,
+                  struct linkea_data *ldata)
 {
        int rc;
 
@@ -356,8 +355,7 @@ int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
  *           -ve        other error
  *
  */
-static int mdd_dir_is_empty(const struct lu_env *env,
-                            struct mdd_object *dir)
+int mdd_dir_is_empty(const struct lu_env *env, struct mdd_object *dir)
 {
        struct dt_it     *it;
        struct dt_object *obj;
@@ -512,7 +510,7 @@ static inline int mdd_is_sticky(const struct lu_env *env,
        if (cattr->la_uid == uc->uc_fsuid)
                return 0;
 
-       return !md_capable(uc, CFS_CAP_FOWNER);
+       return !md_capable(uc, CAP_FOWNER);
 }
 
 static int mdd_may_delete_entry(const struct lu_env *env,
@@ -800,7 +798,7 @@ int mdd_changelog_write_rec(const struct lu_env *env,
                struct llog_changelog_rec *rec;
 
                mdd = lu2mdd_dev(loghandle->lgh_ctxt->loc_obd->obd_lu_dev);
-               rec = container_of0(r, struct llog_changelog_rec, cr_hdr);
+               rec = container_of(r, struct llog_changelog_rec, cr_hdr);
 
                spin_lock(&mdd->mdd_cl.mc_lock);
                rec->cr.cr_index = mdd->mdd_cl.mc_index + 1;
@@ -1255,7 +1253,7 @@ static inline int mdd_links_del(const struct lu_env *env,
 /** Read the link EA into a temp buffer.
  * Uses the name_buf since it is generally large.
  * \retval IS_ERR err
- * \retval ptr to \a lu_buf (always \a mti_big_buf)
+ * \retval ptr to \a lu_buf (always \a mti_link_buf)
  */
 struct lu_buf *mdd_links_get(const struct lu_env *env,
                             struct mdd_object *mdd_obj)
@@ -1793,7 +1791,7 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
 
        /* Enough for only unlink the entry */
        if (unlikely(mdd_cobj == NULL))
-               GOTO(stop, rc);
+               GOTO(cleanup, rc);
 
        if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
                /* update ctime of an unlinked file only if it is still
@@ -1938,7 +1936,8 @@ static int mdd_create_data(const struct lu_env *env,
        if (rc)
                GOTO(stop, rc);
 
-       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle);
+       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, son, handle,
+                                     NULL);
 
 stop:
        rc = mdd_trans_stop(env, mdd, rc, handle);
@@ -2092,7 +2091,7 @@ static int mdd_create_sanity_check(const struct lu_env *env,
                           !lustre_in_group_p(uc,
                                              (cattr->la_valid & LA_GID) ?
                                              cattr->la_gid : pattr->la_gid) &&
-                          !md_capable(uc, CFS_CAP_FSETID)) {
+                          !md_capable(uc, CAP_FSETID)) {
                        cattr->la_mode &= ~S_ISGID;
                        cattr->la_valid |= LA_MODE;
                }
@@ -2150,6 +2149,11 @@ static int mdd_declare_create_object(const struct lu_env *env,
        const struct lu_buf *buf;
        int rc;
 
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
+       /* ldiskfs OSD needs this information for credit allocation */
+       if (def_acl_buf)
+               hint->dah_acl_len = def_acl_buf->lb_len;
+#endif
        rc = mdd_declare_create_object_internal(env, p, c, attr, handle, spec,
                                                hint);
        if (rc)
@@ -2222,6 +2226,16 @@ static int mdd_declare_create_object(const struct lu_env *env,
                if (rc < 0)
                        GOTO(out, rc);
        }
+
+       if (spec->sp_cr_file_encctx != NULL) {
+               buf = mdd_buf_get_const(env, spec->sp_cr_file_encctx,
+                                       spec->sp_cr_file_encctx_size);
+               rc = mdo_declare_xattr_set(env, c, buf,
+                                          LL_XATTR_NAME_ENCRYPTION_CONTEXT, 0,
+                                          handle);
+               if (rc < 0)
+                       GOTO(out, rc);
+       }
 out:
        return rc;
 }
@@ -2293,6 +2307,7 @@ static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
                        struct lu_buf *acl_buf)
 {
        int     rc;
+
        ENTRY;
 
        if (S_ISLNK(la->la_mode)) {
@@ -2306,9 +2321,12 @@ static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
                           XATTR_NAME_ACL_DEFAULT);
        mdd_read_unlock(env, pobj);
        if (rc > 0) {
+               /* ACL buffer size is not enough, need realloc */
+               if (rc > acl_buf->lb_len)
+                       RETURN(-ERANGE);
+
                /* If there are default ACL, fix mode/ACL by default ACL */
                def_acl_buf->lb_len = rc;
-               LASSERT(def_acl_buf->lb_len <= acl_buf->lb_len);
                memcpy(acl_buf->lb_buf, def_acl_buf->lb_buf, rc);
                acl_buf->lb_len = rc;
                rc = __mdd_fix_mode_acl(env, acl_buf, &la->la_mode);
@@ -2443,6 +2461,16 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
                        GOTO(err_initlized, rc);
        }
 
+       if (spec->sp_cr_file_encctx != NULL) {
+               buf = mdd_buf_get_const(env, spec->sp_cr_file_encctx,
+                                       spec->sp_cr_file_encctx_size);
+               rc = mdo_xattr_set(env, son, buf,
+                                  LL_XATTR_NAME_ENCRYPTION_CONTEXT, 0,
+                                  handle);
+               if (rc < 0)
+                       GOTO(err_initlized, rc);
+       }
+
 err_initlized:
        if (unlikely(rc != 0)) {
                int rc2;
@@ -2552,22 +2580,23 @@ int mdd_create(const struct lu_env *env, struct md_object *pobj,
                      const struct lu_name *lname, struct md_object *child,
                      struct md_op_spec *spec, struct md_attr *ma)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct lu_attr          *la = &info->mti_la_for_fix;
-       struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
-       struct mdd_object       *son = md2mdd_obj(child);
-       struct mdd_device       *mdd = mdo2mdd(pobj);
-       struct lu_attr          *attr = &ma->ma_attr;
-       struct thandle          *handle;
-       struct lu_attr          *pattr = &info->mti_pattr;
-       struct lu_buf           acl_buf;
-       struct lu_buf           def_acl_buf;
-       struct lu_buf           hsm_buf;
-       struct linkea_data      *ldata = &info->mti_link_data;
-       const char              *name = lname->ln_name;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lu_attr *la = &info->mti_la_for_fix;
+       struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
+       struct mdd_object *son = md2mdd_obj(child);
+       struct mdd_device *mdd = mdo2mdd(pobj);
+       struct lu_attr *attr = &ma->ma_attr;
+       struct thandle *handle;
+       struct lu_attr *pattr = &info->mti_pattr;
+       struct lu_buf acl_buf;
+       struct lu_buf def_acl_buf;
+       struct lu_buf hsm_buf;
+       struct linkea_data *ldata = &info->mti_link_data;
+       const char *name = lname->ln_name;
        struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
-       int                      rc;
-       int                      rc2;
+       int acl_size = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
+       int rc, rc2;
+
        ENTRY;
 
        rc = mdd_la_get(env, mdd_pobj, pattr);
@@ -2586,13 +2615,25 @@ int mdd_create(const struct lu_env *env, struct md_object *pobj,
        if (IS_ERR(handle))
                GOTO(out_free, rc = PTR_ERR(handle));
 
-       lu_buf_check_and_alloc(&info->mti_xattr_buf,
-                       min_t(unsigned int, mdd->mdd_dt_conf.ddp_max_ea_size,
-                             XATTR_SIZE_MAX));
-       acl_buf = info->mti_xattr_buf;
-       def_acl_buf.lb_buf = info->mti_key;
-       def_acl_buf.lb_len = sizeof(info->mti_key);
+use_bigger_buffer:
+       acl_buf = *lu_buf_check_and_alloc(&info->mti_xattr_buf, acl_size);
+       if (!acl_buf.lb_buf)
+               GOTO(out_stop, rc = -ENOMEM);
+       /* mti_big_buf is also used down below in mdd_changelog_ns_store(),
+        * but def_acl_buf is finished with it before then
+        */
+       def_acl_buf = *lu_buf_check_and_alloc(&info->mti_big_buf, acl_size);
+       if (!def_acl_buf.lb_buf)
+               GOTO(out_stop, rc = -ENOMEM);
+
        rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
+       if (unlikely(rc == -ERANGE &&
+                    acl_size == LUSTRE_POSIX_ACL_MAX_SIZE_OLD)) {
+               /* use maximum-sized xattr buffer for too-big default ACL */
+               acl_size = min_t(unsigned int, mdd->mdd_dt_conf.ddp_max_ea_size,
+                                XATTR_SIZE_MAX);
+               goto use_bigger_buffer;
+       }
        if (rc < 0)
                GOTO(out_stop, rc);
 
@@ -2743,49 +2784,6 @@ out_free:
        return rc;
 }
 
-/*
- * Get locks on parents in proper order
- * RETURN: < 0 - error, rename_order if successful
- */
-enum rename_order {
-        MDD_RN_SAME,
-        MDD_RN_SRCTGT,
-        MDD_RN_TGTSRC
-};
-
-static int mdd_rename_order(const struct lu_env *env,
-                           struct mdd_device *mdd,
-                           struct mdd_object *src_pobj,
-                           const struct lu_attr *pattr,
-                           struct mdd_object *tgt_pobj)
-{
-       /* order of locking, 1 - tgt-src, 0 - src-tgt*/
-       int rc;
-
-       ENTRY;
-       if (src_pobj == tgt_pobj)
-               RETURN(MDD_RN_SAME);
-
-       /* compared the parent child relationship of src_p & tgt_p */
-       if (lu_fid_eq(&mdd->mdd_root_fid, mdd_object_fid(src_pobj))) {
-               rc = MDD_RN_SRCTGT;
-       } else if (lu_fid_eq(&mdd->mdd_root_fid, mdd_object_fid(tgt_pobj))) {
-               rc = MDD_RN_TGTSRC;
-       } else {
-               rc = mdd_is_parent(env, mdd, src_pobj, pattr,
-                                  mdd_object_fid(tgt_pobj));
-               if (rc == -EREMOTE)
-                       rc = 0;
-
-               if (rc == 1)
-                       rc = MDD_RN_TGTSRC;
-               else if (rc == 0)
-                       rc = MDD_RN_SRCTGT;
-       }
-
-       RETURN(rc);
-}
-
 /* has not mdd_write{read}_lock on any obj yet. */
 static int mdd_rename_sanity_check(const struct lu_env *env,
                                    struct mdd_object *src_pobj,
@@ -2958,6 +2956,16 @@ static int mdd_declare_rename(const struct lu_env *env,
         return rc;
 }
 
+static int mdd_migrate_object(const struct lu_env *env,
+                             struct mdd_object *spobj,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
+                             struct md_op_spec *spec,
+                             struct md_attr *ma);
+
 /* src object can be remote that is why we use only fid and type of object */
 static int mdd_rename(const struct lu_env *env,
                       struct md_object *src_pobj, struct md_object *tgt_pobj,
@@ -3002,6 +3010,31 @@ static int mdd_rename(const struct lu_env *env,
        if (rc)
                GOTO(out_pending, rc);
 
+       /* if rename is cross MDTs, migrate symlink if it doesn't have other
+        * hard links, and target doesn't exist.
+        */
+       if (mdd_object_remote(mdd_sobj) && S_ISLNK(cattr->la_mode) &&
+           cattr->la_nlink == 1 && !tobj) {
+               struct md_op_spec *spec = &mdd_env_info(env)->mti_spec;
+               struct lu_device *ld = &mdd->mdd_md_dev.md_lu_dev;
+               struct lu_fid tfid;
+
+               rc = ld->ld_ops->ldo_fid_alloc(env, ld, &tfid, &tgt_pobj->mo_lu,
+                                              NULL);
+               if (rc < 0)
+                       GOTO(out_pending, rc);
+
+               mdd_tobj = mdd_object_find(env, mdd, &tfid);
+               if (IS_ERR(mdd_tobj))
+                       GOTO(out_pending, rc = PTR_ERR(mdd_tobj));
+
+               memset(spec, 0, sizeof(*spec));
+               rc = mdd_migrate_object(env, mdd_spobj, mdd_tpobj, mdd_sobj,
+                                       mdd_tobj, lsname, ltname, spec, ma);
+               mdd_object_put(env, mdd_tobj);
+               GOTO(out_pending, rc);
+       }
+
        rc = mdd_la_get(env, mdd_spobj, pattr);
        if (rc)
                GOTO(out_pending, rc);
@@ -3030,11 +3063,6 @@ static int mdd_rename(const struct lu_env *env,
        if (rc < 0)
                GOTO(out_pending, rc);
 
-       /* FIXME: Should consider tobj and sobj too in rename_lock. */
-       rc = mdd_rename_order(env, mdd, mdd_spobj, pattr, mdd_tpobj);
-       if (rc < 0)
-               GOTO(out_pending, rc);
-
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
@@ -3623,7 +3651,8 @@ static int mdd_migrate_linkea_prepare(const struct lu_env *env,
                                      struct mdd_object *spobj,
                                      struct mdd_object *tpobj,
                                      struct mdd_object *sobj,
-                                     const struct lu_name *lname,
+                                     const struct lu_name *sname,
+                                     const struct lu_name *tname,
                                      const struct lu_attr *attr,
                                      struct linkea_data *ldata)
 {
@@ -3633,8 +3662,8 @@ static int mdd_migrate_linkea_prepare(const struct lu_env *env,
        ENTRY;
 
        memset(ldata, 0, sizeof(*ldata));
-       rc = mdd_linkea_prepare(env, sobj, mdd_object_fid(spobj), lname,
-                               mdd_object_fid(tpobj), lname, 1, 0, ldata);
+       rc = mdd_linkea_prepare(env, sobj, mdd_object_fid(spobj), sname,
+                               mdd_object_fid(tpobj), tname, 1, 0, ldata);
        if (rc)
                RETURN(rc);
 
@@ -3660,7 +3689,7 @@ static int mdd_migrate_linkea_prepare(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
-       rc = mdd_iterate_linkea(env, sobj, NULL, lname, mdd_object_fid(tpobj),
+       rc = mdd_iterate_linkea(env, sobj, NULL, tname, mdd_object_fid(tpobj),
                                ldata, &source_mdt_index, NULL,
                                mdd_is_link_on_source_mdt);
        RETURN(rc);
@@ -3670,7 +3699,8 @@ static int mdd_declare_migrate_update(const struct lu_env *env,
                                      struct mdd_object *spobj,
                                      struct mdd_object *tpobj,
                                      struct mdd_object *obj,
-                                     const struct lu_name *lname,
+                                     const struct lu_name *sname,
+                                     const struct lu_name *tname,
                                      struct lu_attr *attr,
                                      struct lu_attr *spattr,
                                      struct lu_attr *tpattr,
@@ -3682,7 +3712,7 @@ static int mdd_declare_migrate_update(const struct lu_env *env,
        struct lu_attr *la = &info->mti_la_for_fix;
        int rc;
 
-       rc = mdo_declare_index_delete(env, spobj, lname->ln_name, handle);
+       rc = mdo_declare_index_delete(env, spobj, sname->ln_name, handle);
        if (rc)
                return rc;
 
@@ -3694,7 +3724,7 @@ static int mdd_declare_migrate_update(const struct lu_env *env,
 
        rc = mdo_declare_index_insert(env, tpobj, mdd_object_fid(obj),
                                      attr->la_mode & S_IFMT,
-                                     lname->ln_name, handle);
+                                     tname->ln_name, handle);
        if (rc)
                return rc;
 
@@ -3727,7 +3757,8 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
                                      struct mdd_object *tpobj,
                                      struct mdd_object *sobj,
                                      struct mdd_object *tobj,
-                                     const struct lu_name *lname,
+                                     const struct lu_name *sname,
+                                     const struct lu_name *tname,
                                      struct lu_attr *spattr,
                                      struct lu_attr *tpattr,
                                      struct lu_attr *attr,
@@ -3764,7 +3795,7 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
        mdd_object_make_hint(env, tpobj, tobj, attr, spec, hint);
 
        rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
-                               lname, attr, handle, spec, ldata, NULL, NULL,
+                               tname, attr, handle, spec, ldata, NULL, NULL,
                                NULL, hint);
        if (rc)
                return rc;
@@ -3824,7 +3855,7 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
        }
 
        if (!S_ISDIR(attr->la_mode)) {
-               rc = mdd_iterate_linkea(env, sobj, tobj, lname,
+               rc = mdd_iterate_linkea(env, sobj, tobj, tname,
                                        mdd_object_fid(tpobj), ldata, NULL,
                                        handle, mdd_declare_update_link);
                if (rc)
@@ -3847,8 +3878,9 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
                        return rc;
        }
 
-       rc = mdd_declare_migrate_update(env, spobj, tpobj, tobj, lname, attr,
-                                       spattr, tpattr, ldata, ma, handle);
+       rc = mdd_declare_migrate_update(env, spobj, tpobj, tobj, sname, tname,
+                                       attr, spattr, tpattr, ldata, ma,
+                                       handle);
        return rc;
 }
 
@@ -3859,7 +3891,8 @@ static int mdd_migrate_update(const struct lu_env *env,
                              struct mdd_object *spobj,
                              struct mdd_object *tpobj,
                              struct mdd_object *obj,
-                             const struct lu_name *lname,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
                              struct lu_attr *attr,
                              struct lu_attr *spattr,
                              struct lu_attr *tpattr,
@@ -3873,18 +3906,18 @@ static int mdd_migrate_update(const struct lu_env *env,
 
        ENTRY;
 
-       CDEBUG(D_INFO, "update "DFID"/%s to "DFID"/"DFID"\n",
-              PFID(mdd_object_fid(spobj)), lname->ln_name,
-              PFID(mdd_object_fid(tpobj)), PFID(mdd_object_fid(obj)));
+       CDEBUG(D_INFO, "update "DFID" from "DFID"/%s to "DFID"/%s\n",
+              PFID(mdd_object_fid(obj)), PFID(mdd_object_fid(spobj)),
+              sname->ln_name, PFID(mdd_object_fid(tpobj)), tname->ln_name);
 
-       rc = __mdd_index_delete(env, spobj, lname->ln_name,
+       rc = __mdd_index_delete(env, spobj, sname->ln_name,
                                S_ISDIR(attr->la_mode), handle);
        if (rc)
                RETURN(rc);
 
        rc = __mdd_index_insert(env, tpobj, mdd_object_fid(obj),
                                attr->la_mode & S_IFMT,
-                               lname->ln_name, handle);
+                               tname->ln_name, handle);
        if (rc)
                RETURN(rc);
 
@@ -3940,7 +3973,8 @@ static int mdd_migrate_create(const struct lu_env *env,
                              struct mdd_object *tpobj,
                              struct mdd_object *sobj,
                              struct mdd_object *tobj,
-                             const struct lu_name *lname,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
                              struct lu_attr *spattr,
                              struct lu_attr *tpattr,
                              struct lu_attr *attr,
@@ -4012,7 +4046,7 @@ static int mdd_migrate_create(const struct lu_env *env,
 
        /* update links FID */
        if (!S_ISDIR(attr->la_mode)) {
-               rc = mdd_iterate_linkea(env, sobj, tobj, lname,
+               rc = mdd_iterate_linkea(env, sobj, tobj, tname,
                                        mdd_object_fid(tpobj), ldata,
                                        NULL, handle, mdd_update_link);
                if (rc)
@@ -4034,12 +4068,16 @@ static int mdd_migrate_create(const struct lu_env *env,
                        RETURN(rc);
        }
 
-       rc = mdd_migrate_update(env, spobj, tpobj, tobj, lname, attr,
+       rc = mdd_migrate_update(env, spobj, tpobj, tobj, sname, tname, attr,
                                spattr, tpattr, ldata, ma, handle);
 
        RETURN(rc);
 }
 
+/* NB: if user issued different migrate command, we can't adjust it silently
+ * here, because this command will decide target MDT in subdir migration in
+ * LMV.
+ */
 static int mdd_migrate_cmd_check(struct mdd_device *mdd,
                                 const struct lmv_mds_md_v1 *lmv,
                                 const struct lmv_user_md_v1 *lum,
@@ -4047,11 +4085,6 @@ static int mdd_migrate_cmd_check(struct mdd_device *mdd,
 {
        __u32 lum_stripe_count = lum->lum_stripe_count;
        __u32 lmv_hash_type = lmv->lmv_hash_type;
-       char *mdt_hash_name[] = { "none",
-                                 LMV_HASH_NAME_ALL_CHARS,
-                                 LMV_HASH_NAME_FNV_1A_64,
-                                 LMV_HASH_NAME_CRUSH,
-       };
 
        if (!lmv_is_sane(lmv))
                return -EBADF;
@@ -4079,7 +4112,7 @@ static int mdd_migrate_cmd_check(struct mdd_device *mdd,
 }
 
 /**
- * Migrate directory or file.
+ * Internal function to migrate directory or file between MDTs.
  *
  * migrate source to target in following steps:
  *   1. create target, append source stripes after target's if it's directory,
@@ -4088,39 +4121,38 @@ static int mdd_migrate_cmd_check(struct mdd_device *mdd,
  *      update file linkea, and destroy source if it's not needed any more.
  *
  * \param[in] env      execution environment
- * \param[in] md_pobj  parent master object
- * \param[in] md_sobj  source object
- * \param[in] lname    file name
- * \param[in] md_tobj  target object
+ * \param[in] spobj    source parent object
+ * \param[in] tpobj    target parent object
+ * \param[in] sobj     source object
+ * \param[in] tobj     target object
+ * \param[in] sname    source file name
+ * \param[in] tname    target file name
  * \param[in] spec     target creation spec
  * \param[in] ma       used to update \a pobj mtime and ctime
  *
  * \retval             0 on success
  * \retval             -errno on failure
  */
-static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
-                      struct md_object *md_sobj, const struct lu_name *lname,
-                      struct md_object *md_tobj, struct md_op_spec *spec,
-                      struct md_attr *ma)
+static int mdd_migrate_object(const struct lu_env *env,
+                             struct mdd_object *spobj,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *sname,
+                             const struct lu_name *tname,
+                             struct md_op_spec *spec,
+                             struct md_attr *ma)
 {
-       struct mdd_device *mdd = mdo2mdd(md_pobj);
        struct mdd_thread_info *info = mdd_env_info(env);
-       struct mdd_object *pobj = md2mdd_obj(md_pobj);
-       struct mdd_object *sobj = md2mdd_obj(md_sobj);
-       struct mdd_object *tobj = md2mdd_obj(md_tobj);
-       struct mdd_object *spobj = NULL;
-       struct mdd_object *tpobj = NULL;
+       struct mdd_device *mdd = mdo2mdd(&spobj->mod_obj);
        struct lu_attr *spattr = &info->mti_pattr;
        struct lu_attr *tpattr = &info->mti_tpattr;
        struct lu_attr *attr = &info->mti_cattr;
        struct linkea_data *ldata = &info->mti_link_data;
        struct dt_allocation_hint *hint = &info->mti_hint;
-       struct lu_fid *fid = &info->mti_fid2;
-       struct lu_buf pbuf = { NULL };
        struct lu_buf sbuf = { NULL };
        struct lmv_mds_md_v1 *lmv;
        struct thandle *handle;
-       bool nsonly = false;
        int rc;
 
        ENTRY;
@@ -4129,64 +4161,15 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
        if (rc)
                RETURN(rc);
 
-       /* locate source and target stripe on pobj, which are the real parent */
-       rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
-       if (rc < 0 && rc != -ENODATA)
-               RETURN(rc);
-
-       lmv = pbuf.lb_buf;
-       if (lmv) {
-               __u32 hash_type = le32_to_cpu(lmv->lmv_hash_type);
-               int index;
-
-               /* locate target parent stripe */
-               /* fail check here to make sure top dir migration succeed. */
-               if ((hash_type & LMV_HASH_FLAG_MIGRATION) &&
-                   OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
-                       GOTO(out, rc = -EIO);
-
-               index = lmv_name_to_stripe_index(lmv, lname->ln_name,
-                                                lname->ln_namelen);
-               if (index < 0)
-                       GOTO(out, rc = index);
-
-               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
-               tpobj = mdd_object_find(env, mdd, fid);
-               if (IS_ERR(tpobj))
-                       GOTO(out, rc = PTR_ERR(tpobj));
-
-               /* locate source parent stripe */
-               if (hash_type & LMV_HASH_FLAG_LAYOUT_CHANGE) {
-                       index = lmv_name_to_stripe_index_old(lmv,
-                                                            lname->ln_name,
-                                                            lname->ln_namelen);
-                       if (index < 0)
-                               GOTO(out, rc = index);
-
-                       fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
-                       spobj = mdd_object_find(env, mdd, fid);
-                       if (IS_ERR(spobj))
-                               GOTO(out, rc = PTR_ERR(spobj));
-               } else {
-                       spobj = tpobj;
-                       mdd_object_get(spobj);
-               }
-       } else {
-               tpobj = pobj;
-               spobj = pobj;
-               mdd_object_get(tpobj);
-               mdd_object_get(spobj);
-       }
-
        rc = mdd_la_get(env, spobj, spattr);
        if (rc)
-               GOTO(out, rc);
+               RETURN(rc);
 
        rc = mdd_la_get(env, tpobj, tpattr);
        if (rc)
-               GOTO(out, rc);
+               RETURN(rc);
 
-       if (S_ISDIR(attr->la_mode)) {
+       if (S_ISDIR(attr->la_mode) && !spec->sp_migrate_nsonly) {
                struct lmv_user_md_v1 *lum = spec->u.sp_ea.eadata;
 
                LASSERT(lum);
@@ -4202,19 +4185,22 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
                        GOTO(out, rc);
 
                lmv = sbuf.lb_buf;
-               if (lmv &&
-                   (le32_to_cpu(lmv->lmv_hash_type) &
-                    LMV_HASH_FLAG_MIGRATION)) {
-                       rc = mdd_migrate_cmd_check(mdd, lmv, lum, lname);
-                       GOTO(out, rc);
+               if (lmv) {
+                       if (!lmv_is_sane(lmv))
+                               GOTO(out, rc = -EBADF);
+                       if (lmv_is_migrating(lmv)) {
+                               rc = mdd_migrate_cmd_check(mdd, lmv, lum,
+                                                          sname);
+                               GOTO(out, rc);
+                       }
                }
-       } else {
+       } else if (!S_ISDIR(attr->la_mode)) {
                if (spobj == tpobj)
                        GOTO(out, rc = -EALREADY);
 
                /* update namespace only if @sobj is on MDT where @tpobj is. */
-               if (!mdd_object_remote(tpobj))
-                       nsonly = true;
+               if (!mdd_object_remote(tpobj) && !mdd_object_remote(sobj))
+                       spec->sp_migrate_nsonly = true;
 
                if (S_ISLNK(attr->la_mode)) {
                        lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
@@ -4233,11 +4219,11 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
        }
 
        /* linkea needs update upon FID or parent stripe change */
-       rc = mdd_migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, lname,
-                                       attr, ldata);
+       rc = mdd_migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, sname,
+                                       tname, attr, ldata);
        if (rc > 0)
                /* update namespace only if @sobj has link on its MDT. */
-               nsonly = true;
+               spec->sp_migrate_nsonly = true;
        else if (rc < 0)
                GOTO(out, rc);
 
@@ -4250,53 +4236,155 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
        if (IS_ERR(handle))
                GOTO(out, rc = PTR_ERR(handle));
 
-       if (nsonly)
-               rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, lname,
-                                               attr, spattr, tpattr, ldata, ma,
-                                               handle);
+       if (spec->sp_migrate_nsonly)
+               rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, sname,
+                                               tname, attr, spattr, tpattr,
+                                               ldata, ma, handle);
        else
                rc = mdd_declare_migrate_create(env, spobj, tpobj, sobj, tobj,
-                                               lname, spattr, tpattr, attr,
-                                               &sbuf, ldata, ma, spec, hint,
-                                               handle);
+                                               sname, tname, spattr, tpattr,
+                                               attr, &sbuf, ldata, ma, spec,
+                                               hint, handle);
        if (rc)
-               GOTO(stop_trans, rc);
+               GOTO(stop, rc);
 
-       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, tname, sname,
                                         handle);
        if (rc)
-               GOTO(stop_trans, rc);
+               GOTO(stop, rc);
 
        rc = mdd_trans_start(env, mdd, handle);
        if (rc)
-               GOTO(stop_trans, rc);
+               GOTO(stop, rc);
 
-       if (nsonly)
-               rc = mdd_migrate_update(env, spobj, tpobj, sobj, lname, attr,
-                                       spattr, tpattr, ldata, ma, handle);
+       if (spec->sp_migrate_nsonly)
+               rc = mdd_migrate_update(env, spobj, tpobj, sobj, sname, tname,
+                                       attr, spattr, tpattr, ldata, ma,
+                                       handle);
        else
-               rc = mdd_migrate_create(env, spobj, tpobj, sobj, tobj, lname,
-                                       spattr, tpattr, attr, &sbuf, ldata, ma,
-                                       spec, hint, handle);
+               rc = mdd_migrate_create(env, spobj, tpobj, sobj, tobj, sname,
+                                       tname, spattr, tpattr, attr, &sbuf,
+                                       ldata, ma, spec, hint, handle);
        if (rc)
-               GOTO(stop_trans, rc);
+               GOTO(stop, rc);
 
        rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0,
-                                   nsonly ? sobj : tobj, mdd_object_fid(spobj),
-                                   mdd_object_fid(sobj), mdd_object_fid(tpobj),
-                                   lname, lname, handle);
+                                   spec->sp_migrate_nsonly ? sobj : tobj,
+                                   mdd_object_fid(spobj), mdd_object_fid(sobj),
+                                   mdd_object_fid(tpobj), tname, sname,
+                                   handle);
        if (rc)
-               GOTO(stop_trans, rc);
+               GOTO(stop, rc);
        EXIT;
 
-stop_trans:
+stop:
        rc = mdd_trans_stop(env, mdd, rc, handle);
 out:
+       lu_buf_free(&sbuf);
+
+       return rc;
+}
+
+/**
+ * Migrate directory or file between MDTs.
+ *
+ * \param[in] env      execution environment
+ * \param[in] md_pobj  parent master object
+ * \param[in] md_sobj  source object
+ * \param[in] lname    file name
+ * \param[in] md_tobj  target object
+ * \param[in] spec     target creation spec
+ * \param[in] ma       used to update \a pobj mtime and ctime
+ *
+ * \retval             0 on success
+ * \retval             -errno on failure
+ */
+static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
+                      struct md_object *md_sobj, const struct lu_name *lname,
+                      struct md_object *md_tobj, struct md_op_spec *spec,
+                      struct md_attr *ma)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_device *mdd = mdo2mdd(md_pobj);
+       struct mdd_object *pobj = md2mdd_obj(md_pobj);
+       struct mdd_object *sobj = md2mdd_obj(md_sobj);
+       struct mdd_object *tobj = md2mdd_obj(md_tobj);
+       struct mdd_object *spobj = NULL;
+       struct mdd_object *tpobj = NULL;
+       struct lu_buf pbuf = { NULL };
+       struct lu_fid *fid = &info->mti_fid2;
+       struct lmv_mds_md_v1 *lmv;
+       int rc;
+
+       ENTRY;
+
+       /* locate source and target stripe on pobj, which are the real parent */
+       rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
+       if (rc < 0 && rc != -ENODATA)
+               RETURN(rc);
+
+       lmv = pbuf.lb_buf;
+       if (lmv) {
+               int index;
+
+               if (!lmv_is_sane(lmv))
+                       GOTO(out, rc = -EBADF);
+
+               /* locate target parent stripe */
+               /* fail check here to make sure top dir migration succeed. */
+               if (lmv_is_migrating(lmv) &&
+                   OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+                       GOTO(out, rc = -EIO);
+
+               index = lmv_name_to_stripe_index(lmv, lname->ln_name,
+                                                lname->ln_namelen);
+               if (index < 0)
+                       GOTO(out, rc = index);
+
+               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
+               tpobj = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(tpobj))
+                       GOTO(out, rc = PTR_ERR(tpobj));
+
+               /* locate source parent stripe */
+               if (lmv_is_layout_changing(lmv)) {
+                       index = lmv_name_to_stripe_index_old(lmv,
+                                                            lname->ln_name,
+                                                            lname->ln_namelen);
+                       if (index < 0)
+                               GOTO(out, rc = index);
+
+                       fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[index]);
+                       spobj = mdd_object_find(env, mdd, fid);
+                       if (IS_ERR(spobj))
+                               GOTO(out, rc = PTR_ERR(spobj));
+
+                       /* parent stripe unchanged */
+                       if (spobj == tpobj) {
+                               if (!lmv_is_restriping(lmv))
+                                       GOTO(out, rc = -EINVAL);
+                               GOTO(out, rc = -EALREADY);
+                       }
+               } else {
+                       spobj = tpobj;
+                       mdd_object_get(spobj);
+               }
+       } else {
+               tpobj = pobj;
+               spobj = pobj;
+               mdd_object_get(tpobj);
+               mdd_object_get(spobj);
+       }
+
+       rc = mdd_migrate_object(env, spobj, tpobj, sobj, tobj, lname, lname,
+                               spec, ma);
+       GOTO(out, rc);
+
+out:
        if (!IS_ERR_OR_NULL(spobj))
                mdd_object_put(env, spobj);
        if (!IS_ERR_OR_NULL(tpobj))
                mdd_object_put(env, tpobj);
-       lu_buf_free(&sbuf);
        lu_buf_free(&pbuf);
 
        return rc;
@@ -4476,20 +4564,23 @@ int mdd_dir_layout_shrink(const struct lu_env *env,
                RETURN(rc);
 
        lmv = lmv_buf.lb_buf;
+       if (!lmv_is_sane(lmv))
+               RETURN(-EBADF);
+
        lmu = mlc->mlc_buf.lb_buf;
 
        /* adjust the default value '0' to '1' */
        if (lmu->lum_stripe_count == 0)
                lmu->lum_stripe_count = cpu_to_le32(1);
 
-       /* this was checked in MDT */
+       /* these were checked in MDT */
        LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
                le32_to_cpu(lmv->lmv_stripe_count));
+       LASSERT(!lmv_is_splitting(lmv));
+       LASSERT(lmv_is_migrating(lmv) || lmv_is_merging(lmv));
 
-       /*
-        * if obj stripe count will be shrunk to 1, we need to convert it to a
-        * normal dir, which will change its fid and update parent namespace,
-        * get obj name and parent fid from linkea.
+       /* if dir stripe count will be shrunk to 1, it needs to be transformed
+        * to a plain dir, which will cause FID change and namespace update.
         */
        if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
                struct linkea_data *ldata = &info->mti_link_data;
@@ -4585,6 +4676,257 @@ out:
        return rc;
 }
 
+static int mdd_dir_declare_split_plain(const struct lu_env *env,
+                                       struct mdd_device *mdd,
+                                       struct mdd_object *pobj,
+                                       struct mdd_object *obj,
+                                       struct mdd_object *tobj,
+                                       struct md_layout_change *mlc,
+                                       struct dt_allocation_hint *hint,
+                                       struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       const struct lu_name *lname = mlc->mlc_name;
+       struct lu_attr *la = &info->mti_la_for_fix;
+       struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata;
+       struct linkea_data *ldata = &info->mti_link_data;
+       struct lmv_mds_md_v1 *lmv;
+       __u32 count;
+       int rc;
+
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+       rc = mdo_declare_layout_change(env, obj, mlc, handle);
+       if (rc)
+               return rc;
+
+       memset(ldata, 0, sizeof(*ldata));
+       rc = mdd_linkea_prepare(env, obj, NULL, NULL, mdd_object_fid(pobj),
+                               lname, 1, 0, ldata);
+       if (rc)
+               return rc;
+
+       count = lum->lum_stripe_count;
+       lum->lum_stripe_count = 0;
+       mdd_object_make_hint(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
+                            hint);
+       rc = mdd_declare_create(env, mdo2mdd(&pobj->mod_obj), pobj, tobj,
+                               lname, mlc->mlc_attr, handle, mlc->mlc_spec,
+                               ldata, NULL, NULL, NULL, hint);
+       if (rc)
+               return rc;
+
+       /* tobj mode will be used in lod_declare_xattr_set(), but it's not
+        * created yet.
+        */
+       tobj->mod_obj.mo_lu.lo_header->loh_attr |= S_IFDIR;
+
+       lmv = (typeof(lmv))info->mti_key;
+       memset(lmv, 0, sizeof(*lmv));
+       lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+       lmv->lmv_stripe_count = cpu_to_le32(1);
+       lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
+       fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdd_object_fid(obj));
+
+       mlc->mlc_opc = MD_LAYOUT_ATTACH;
+       mlc->mlc_buf.lb_buf = lmv;
+       mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
+       rc = mdo_declare_layout_change(env, tobj, mlc, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_iterate_xattrs(env, obj, tobj, true, handle,
+                               mdo_declare_xattr_set);
+       if (rc)
+               return rc;
+
+       lum->lum_stripe_count = count;
+       mlc->mlc_opc = MD_LAYOUT_SPLIT;
+       rc = mdo_declare_layout_change(env, tobj, mlc, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
+                                     S_IFDIR, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdo_declare_attr_set(env, obj, la, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_attr_set(env, pobj, la, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+                                        handle);
+       return rc;
+}
+
+/**
+ * plain directory split:
+ * 1. create \a tobj as plain directory.
+ * 2. append \a obj as first stripe of \a tobj.
+ * 3. migrate xattrs from \a obj to \a tobj.
+ * 4. split \a tobj to specific stripe count.
+ */
+static int mdd_dir_split_plain(const struct lu_env *env,
+                               struct mdd_device *mdd,
+                               struct mdd_object *pobj,
+                               struct mdd_object *obj,
+                               struct mdd_object *tobj,
+                               struct md_layout_change *mlc,
+                               struct dt_allocation_hint *hint,
+                               struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lu_attr *pattr = &info->mti_pattr;
+       struct lu_attr *la = &info->mti_la_for_fix;
+       const struct lu_name *lname = mlc->mlc_name;
+       struct linkea_data *ldata = &info->mti_link_data;
+       int rc;
+
+       ENTRY;
+
+       /* copy linkea out and set on target later */
+       rc = mdd_links_read(env, obj, ldata);
+       if (rc)
+               RETURN(rc);
+
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       if (rc)
+               RETURN(rc);
+
+       /* don't set nlink from obj */
+       mlc->mlc_attr->la_valid &= ~LA_NLINK;
+
+       rc = mdd_create_object(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
+                              NULL, NULL, NULL, hint, handle, false);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_iterate_xattrs(env, obj, tobj, true, handle, mdo_xattr_set);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_links_write(env, tobj, ldata, handle);
+       if (rc)
+               RETURN(rc);
+
+       rc = __mdd_index_delete(env, pobj, lname->ln_name, true, handle);
+       if (rc)
+               RETURN(rc);
+
+       rc = __mdd_index_insert(env, pobj, mdd_object_fid(tobj), S_IFDIR,
+                               lname->ln_name, handle);
+       if (rc)
+               RETURN(rc);
+
+       la->la_ctime = la->la_mtime = mlc->mlc_attr->la_mtime;
+       la->la_valid = LA_CTIME | LA_MTIME;
+
+       mdd_write_lock(env, obj, DT_SRC_CHILD);
+       rc = mdd_update_time(env, tobj, mlc->mlc_attr, la, handle);
+       mdd_write_unlock(env, obj);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_la_get(env, pobj, pattr);
+       if (rc)
+               RETURN(rc);
+
+       la->la_valid = LA_CTIME | LA_MTIME;
+
+       mdd_write_lock(env, pobj, DT_SRC_PARENT);
+       rc = mdd_update_time(env, pobj, pattr, la, handle);
+       mdd_write_unlock(env, pobj);
+       if (rc)
+               RETURN(rc);
+
+       /* FID changes, record it as CL_MIGRATE */
+       rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
+                                   mdd_object_fid(pobj), mdd_object_fid(obj),
+                                   mdd_object_fid(pobj), lname, lname, handle);
+       RETURN(rc);
+}
+
+int mdd_dir_layout_split(const struct lu_env *env, struct md_object *o,
+                        struct md_layout_change *mlc)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_device *mdd = mdo2mdd(o);
+       struct mdd_object *obj = md2mdd_obj(o);
+       struct mdd_object *pobj = md2mdd_obj(mlc->mlc_parent);
+       struct mdd_object *tobj = md2mdd_obj(mlc->mlc_target);
+       struct dt_allocation_hint *hint = &info->mti_hint;
+       bool is_plain = false;
+       struct thandle *handle;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(S_ISDIR(mdd_object_type(obj)));
+
+       rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LMV);
+       if (rc == -ENODATA)
+               is_plain = true;
+       else if (rc < 0)
+               RETURN(rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       if (is_plain) {
+               rc = mdd_dir_declare_split_plain(env, mdd, pobj, obj, tobj, mlc,
+                                                hint, handle);
+       } else {
+               mlc->mlc_opc = MD_LAYOUT_SPLIT;
+               rc = mdo_declare_layout_change(env, obj, mlc, handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
+
+               rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL,
+                                                NULL, handle);
+       }
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       if (is_plain) {
+               rc = mdd_dir_split_plain(env, mdd, pobj, obj, tobj, mlc, hint,
+                                        handle);
+       } else {
+               mdd_write_lock(env, obj, DT_TGT_CHILD);
+               rc = mdo_xattr_set(env, obj, NULL, XATTR_NAME_LMV,
+                                  LU_XATTR_CREATE, handle);
+               mdd_write_unlock(env, obj);
+               if (rc)
+                       GOTO(stop_trans, rc);
+
+               rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
+                                                   XATTR_NAME_LMV, handle);
+       }
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       EXIT;
+
+stop_trans:
+       rc = mdd_trans_stop(env, mdd, rc, handle);
+
+       return rc;
+}
+
 const struct md_dir_operations mdd_dir_ops = {
        .mdo_is_subdir     = mdd_is_subdir,
        .mdo_lookup        = mdd_lookup,