Whamcloud - gitweb
LU-11025 dne: directory restripe and auto split 84/37284/19
authorLai Siyao <lai.siyao@whamcloud.com>
Mon, 30 Dec 2019 15:27:27 +0000 (23:27 +0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 2 Jun 2020 14:03:15 +0000 (14:03 +0000)
A specific restriper thread is created for each MDT, it does three
tasks in a loop:
1. If there is directory whose total sub-files exceeds threshold
   (50000 by default, can be changed "lctl set_param
   mdt.*.dir_split_count=N"), split this directory by adding new
   stripes (4 stripes by default, which can be adjusted by
   "lctl set_param mdt.*.dir_split_delta=N").
2. If a directory stripe LMV is marked 'MIGRATION', migrate sub file
   from current offset, and update offset to next file.
3. If a directory master LMV is marked 'RESTRIPING', check whether
   all stripe LMV 'MIGRATION' flag is cleared, if so, clear
   'RESTRIPING' flag and update directory LMV.

In last patch, the first part of manual directory stripe is
implemented, and in this patch, sub file migrations and dir layout
update is done. Directory auto-split is done in similar way, except
that the first step is done by this thread too.

Directory auto-split can be enabled/disabled by "lctl set_param
mdt.*.enable_dir_auto_split=[0|1]", it's turned on by default.

Auto split is triggered at the end of getattr(): since now the attr
contains dirent count, check whether it exceeds threshold, if so,
add this directory into mdr_auto_split list and wake up the dir
restriper thread.

Restripe migration is also triggered in getattr(): if the object is
directory stripe, and LMV 'MIGRATION' flag set, add this object into
mdr_restripe_migrate list and wake up the dir restriper thread.

Directory layout update is similar: if current directory is striped,
and LNV 'RESTRIPING' flag is set, add this directory into
mdr_restripe_update list and wake up restriper thread.

By default restripe migrate dirent only, and leave inode unchanged, it
can be adjusted by "lctl set_param mdt.*.dir_restripe_nsonly=[0|1]".

Currently DoM file inode migration is not supported, migrate dirent
only for such files to avoid leaving dir migration/restripe
unfinished.

Add sanity.sh 230o, 230p and 230q, adjust 230j since DoM files migrate
dirent.

Signed-off-by: Lai Siyao <lai.siyao@whamcloud.com>
Change-Id: I8c83b42e4acbaab067d0092d0b232de37f956588
Reviewed-on: https://review.whamcloud.com/37284
Tested-by: jenkins <devops@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Hongchao Zhang <hongchao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
18 files changed:
libcfs/include/libcfs/linux/linux-wait.h
lustre/include/lu_object.h
lustre/include/lustre_lmv.h
lustre/include/md_object.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/llite/dir.c
lustre/llite/statahead.c
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdt/Makefile.in
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_lproc.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_restripe.c [new file with mode: 0644]
lustre/mdt/mdt_xattr.c
lustre/tests/sanity.sh

index d406666..67aece9 100644 (file)
@@ -203,6 +203,8 @@ __out:      __ret;                                                          \
 
 #ifndef TASK_NOLOAD
 
+#define TASK_IDLE TASK_INTERRUPTIBLE
+
 #define ___wait_event_idle(wq_head, condition, exclusive, ret, cmd)    \
 ({                                                                     \
        wait_queue_entry_t __wq_entry;                                  \
index 781b9b4..668dfdc 100644 (file)
@@ -171,7 +171,6 @@ struct lu_device_operations {
         * \param[in] parent    parent object
         * \param[in] name      lu_name
         *
-        * \retval 0            on success
         * \retval 0            0 FID allocated successfully.
         * \retval 1            1 FID allocated successfully and new sequence
         *                      requested from seq meta server
index e33cdc1..363dfb0 100644 (file)
@@ -444,32 +444,42 @@ insane:
 
 static inline bool lmv_is_splitting(const struct lmv_mds_md_v1 *lmv)
 {
-       LASSERT(lmv_is_sane2(lmv));
+       if (!lmv_is_sane2(lmv))
+               return false;
+
        return lmv_hash_is_splitting(cpu_to_le32(lmv->lmv_hash_type));
 }
 
 static inline bool lmv_is_merging(const struct lmv_mds_md_v1 *lmv)
 {
-       LASSERT(lmv_is_sane2(lmv));
+       if (!lmv_is_sane2(lmv))
+               return false;
+
        return lmv_hash_is_merging(cpu_to_le32(lmv->lmv_hash_type));
 }
 
 static inline bool lmv_is_migrating(const struct lmv_mds_md_v1 *lmv)
 {
-       LASSERT(lmv_is_sane(lmv));
+       if (!lmv_is_sane(lmv))
+               return false;
+
        return lmv_hash_is_migrating(cpu_to_le32(lmv->lmv_hash_type));
 }
 
 static inline bool lmv_is_restriping(const struct lmv_mds_md_v1 *lmv)
 {
-       LASSERT(lmv_is_sane2(lmv));
+       if (!lmv_is_sane2(lmv))
+               return false;
+
        return lmv_hash_is_splitting(cpu_to_le32(lmv->lmv_hash_type)) ||
               lmv_hash_is_merging(cpu_to_le32(lmv->lmv_hash_type));
 }
 
 static inline bool lmv_is_layout_changing(const struct lmv_mds_md_v1 *lmv)
 {
-       LASSERT(lmv_is_sane2(lmv));
+       if (!lmv_is_sane2(lmv))
+               return false;
+
        return lmv_hash_is_splitting(cpu_to_le32(lmv->lmv_hash_type)) ||
               lmv_hash_is_merging(cpu_to_le32(lmv->lmv_hash_type)) ||
               lmv_hash_is_migrating(cpu_to_le32(lmv->lmv_hash_type));
index a54af3e..95fa430 100644 (file)
@@ -204,11 +204,16 @@ struct md_layout_change {
                        __u32                   *mlc_resync_ids;
                }; /* file */
                struct {
-                       struct md_object        *mlc_parent;    /* parent obj in plain dir split */
-                       struct md_object        *mlc_target;    /* target obj in plain dir split */
-                       struct lu_attr          *mlc_attr;      /* target attr in plain dir split */
-                       const struct lu_name    *mlc_name;      /* target name in plain dir split */
-                       struct md_op_spec       *mlc_spec;      /* dir split spec */
+                       /* parent obj in plain dir split */
+                       struct md_object        *mlc_parent;
+                       /* target obj in plain dir split */
+                       struct md_object        *mlc_target;
+                       /* target attr in plain dir split */
+                       struct lu_attr          *mlc_attr;
+                       /* target name in plain dir split */
+                       const struct lu_name    *mlc_name;
+                       /* dir split spec */
+                       struct md_op_spec       *mlc_spec;
                }; /* dir */
        };
 };
index 8c992d4..088dab5 100644 (file)
@@ -533,6 +533,24 @@ static inline __kernel_size_t lu_dirent_calc_size(size_t namelen, __u16 attr)
        return (size + 7) & ~7;
 }
 
+static inline __u16 lu_dirent_type_get(struct lu_dirent *ent)
+{
+       __u16 type = 0;
+       struct luda_type *lt;
+       int len = 0;
+
+       if (__le32_to_cpu(ent->lde_attrs) & LUDA_TYPE) {
+               const unsigned int align = sizeof(struct luda_type) - 1;
+
+               len = __le16_to_cpu(ent->lde_namelen);
+               len = (len + align) & ~align;
+               lt = (void *)ent->lde_name + len;
+               type = __le16_to_cpu(lt->lt_type);
+       }
+
+       return type;
+}
+
 #define MDS_DIR_END_OFF 0xfffffffffffffffeULL
 
 /**
@@ -2171,7 +2189,8 @@ struct lmv_mds_md_v1 {
 };
 
 #define LMV_DEBUG(mask, lmv, msg)                                      \
-       CDEBUG(mask, "%s LMV: magic=%#x count=%u index=%u hash=%#x version=%u migrate offset=%u migrate hash=%u.\n",    \
+       CDEBUG(mask,                                                    \
+              "%s LMV: magic=%#x count=%u index=%u hash=%#x version=%u migrate offset=%u migrate hash=%u.\n",  \
               msg, (lmv)->lmv_magic, (lmv)->lmv_stripe_count,          \
               (lmv)->lmv_master_mdt_index, (lmv)->lmv_hash_type,       \
               (lmv)->lmv_layout_version, (lmv)->lmv_migrate_offset,    \
index 007f6cf..9a83ba4 100644 (file)
@@ -175,29 +175,6 @@ void ll_release_page(struct inode *inode, struct page *page,
        put_page(page);
 }
 
-/**
- * return IF_* type for given lu_dirent entry.
- * IF_* flag shld be converted to particular OS file type in
- * platform llite module.
- */
-static u16 ll_dirent_type_get(struct lu_dirent *ent)
-{
-       u16 type = 0;
-       struct luda_type *lt;
-       int len = 0;
-
-       if (le32_to_cpu(ent->lde_attrs) & LUDA_TYPE) {
-               const unsigned align = sizeof(struct luda_type) - 1;
-
-               len = le16_to_cpu(ent->lde_namelen);
-               len = (len + align) & ~align;
-               lt = (void *)ent->lde_name + len;
-               type = IFTODT(le16_to_cpu(lt->lt_type));
-       }
-
-       return type;
-}
-
 #ifdef HAVE_DIR_CONTEXT
 int ll_dir_read(struct inode *inode, __u64 *ppos, struct md_op_data *op_data,
                struct dir_context *ctx)
@@ -256,7 +233,7 @@ int ll_dir_read(struct inode *inode, __u64 *ppos, struct md_op_data *op_data,
                                lhash = hash;
                        fid_le_to_cpu(&fid, &ent->lde_fid);
                        ino = cl_fid_build_ino(&fid, is_api32);
-                       type = ll_dirent_type_get(ent);
+                       type = IFTODT(lu_dirent_type_get(ent));
                        /* For ll_nfs_get_name_filldir(), it will try to access
                         * 'ent' through 'lde_name', so the parameter 'name'
                         * for 'filldir()' must be part of the 'ent'. */
index 9a25ae7..7752b45 100644 (file)
@@ -901,10 +901,6 @@ static void sa_statahead(struct dentry *parent, const char *name, int len,
        EXIT;
 }
 
-#ifndef TASK_IDLE
-#define TASK_IDLE TASK_INTERRUPTIBLE
-#endif
-
 /* async glimpse (agl) thread main function */
 static int ll_agl_thread(void *arg)
 {
index 65a552d..164cf24 100644 (file)
@@ -1606,8 +1606,8 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
                         * the FIDs of all shards of the striped directory. */
                        if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
                                rc = lmv_mds_md_size(
-                                               le32_to_cpu(lmv1->lmv_stripe_count),
-                                               le32_to_cpu(lmv1->lmv_magic));
+                                       le32_to_cpu(lmv1->lmv_stripe_count),
+                                       le32_to_cpu(lmv1->lmv_magic));
                } else {
                        lmv1 = buf->lb_buf;
                        if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
index e985f2b..0006f16 100644 (file)
@@ -4040,7 +4040,7 @@ static int mdd_migrate_create(const struct lu_env *env,
        RETURN(rc);
 }
 
-/* NB: if user issued different migrate command, we can't ajust it silently
+/* NB: if user issued different migrate command, we can't adjust it silently
  * here, because this command will decide target MDT in subdir migration in
  * LMV.
  */
@@ -4180,8 +4180,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
                                        GOTO(out, rc = -EINVAL);
                                GOTO(out, rc = -EALREADY);
                        }
-                       if (S_ISDIR(attr->la_mode))
-                               nsonly = spec->sp_migrate_nsonly;
+                       nsonly = spec->sp_migrate_nsonly;
                } else {
                        spobj = tpobj;
                        mdd_object_get(spobj);
@@ -4646,7 +4645,7 @@ static int mdd_dir_declare_split_plain(const struct lu_env *env,
                return rc;
 
        /* tobj mode will be used in lod_declare_xattr_set(), but it's not
-        * createb yet.
+        * created yet.
         */
        tobj->mod_obj.mo_lu.lo_header->loh_attr |= S_IFDIR;
 
index ec054d7..ae7a7a2 100644 (file)
@@ -1,7 +1,7 @@
 MODULES := mdt
 mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o
 mdt-objs += mdt_open.o mdt_identity.o mdt_lproc.o mdt_fs.o mdt_som.o
-mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o mdt_io.o
+mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o mdt_io.o mdt_restripe.o
 mdt-objs += mdt_hsm_cdt_actions.o
 mdt-objs += mdt_hsm_cdt_requests.o
 mdt-objs += mdt_hsm_cdt_client.o
index 949edee..82fc529 100644 (file)
@@ -58,6 +58,7 @@
 #include <uapi/linux/lustre/lustre_param.h>
 #include <lustre_quota.h>
 #include <lustre_swab.h>
+#include <lustre_lmv.h>
 #include <obd.h>
 #include <obd_support.h>
 #include <lustre_barrier.h>
@@ -979,8 +980,8 @@ int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(rc);
 }
 
-int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
-                  struct md_attr *ma, const char *name)
+int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                    struct md_attr *ma, const char *name)
 {
        struct md_object *next = mdt_object_child(o);
        struct lu_buf    *buf = &info->mti_buf;
@@ -1056,6 +1057,40 @@ got:
        return rc;
 }
 
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                  struct md_attr *ma, const char *name)
+{
+       int rc;
+
+       if (!info->mti_big_lmm) {
+               OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
+               if (!info->mti_big_lmm)
+                       return -ENOMEM;
+               info->mti_big_lmmsize = PAGE_SIZE;
+       }
+
+       if (strcmp(name, XATTR_NAME_LOV) == 0) {
+               ma->ma_lmm = info->mti_big_lmm;
+               ma->ma_lmm_size = info->mti_big_lmmsize;
+               ma->ma_valid &= ~MA_LOV;
+       } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
+               ma->ma_lmv = info->mti_big_lmm;
+               ma->ma_lmv_size = info->mti_big_lmmsize;
+               ma->ma_valid &= ~MA_LMV;
+       } else {
+               LBUG();
+       }
+
+       LASSERT(!info->mti_big_lmm_used);
+       rc = __mdt_stripe_get(info, o, ma, name);
+       /* since big_lmm is always used here, clear 'used' flag to avoid
+        * assertion in mdt_big_xattr_get().
+        */
+       info->mti_big_lmm_used = 0;
+
+       return rc;
+}
+
 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
                      struct lu_fid *pfid)
 {
@@ -1103,6 +1138,51 @@ int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(0);
 }
 
+int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
+                          struct lu_fid *pfid, struct lu_name *lname)
+{
+       struct lu_buf *buf = &info->mti_buf;
+       struct link_ea_header *leh;
+       struct link_ea_entry *lee;
+       int reclen;
+       int rc;
+
+       buf->lb_buf = info->mti_xattr_buf;
+       buf->lb_len = sizeof(info->mti_xattr_buf);
+       rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
+                         XATTR_NAME_LINK);
+       if (rc == -ERANGE) {
+               rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
+               buf->lb_buf = info->mti_big_lmm;
+               buf->lb_len = info->mti_big_lmmsize;
+       }
+       if (rc < 0)
+               return rc;
+
+       if (rc < sizeof(*leh)) {
+               CERROR("short LinkEA on "DFID": rc = %d\n",
+                      PFID(mdt_object_fid(o)), rc);
+               return -ENODATA;
+       }
+
+       leh = (struct link_ea_header *)buf->lb_buf;
+       lee = (struct link_ea_entry *)(leh + 1);
+       if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+               leh->leh_magic = LINK_EA_MAGIC;
+               leh->leh_reccount = __swab32(leh->leh_reccount);
+               leh->leh_len = __swab64(leh->leh_len);
+       }
+       if (leh->leh_magic != LINK_EA_MAGIC)
+               return -EINVAL;
+
+       if (leh->leh_reccount == 0)
+               return -ENODATA;
+
+       linkea_entry_unpack(lee, &reclen, lname, pfid);
+
+       return 0;
+}
+
 int mdt_attr_get_complex(struct mdt_thread_info *info,
                         struct mdt_object *o, struct md_attr *ma)
 {
@@ -1140,19 +1220,19 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
        }
 
        if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
                if (rc)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV && S_ISDIR(mode)) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
                if (rc != 0)
                        GOTO(out, rc);
        }
 
        if (need & MA_LMV_DEF && S_ISDIR(mode)) {
-               rc = mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
+               rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
                if (rc != 0)
                        GOTO(out, rc);
        }
@@ -1202,19 +1282,21 @@ out:
 }
 
 static int mdt_getattr_internal(struct mdt_thread_info *info,
-                                struct mdt_object *o, int ma_need)
+                               struct mdt_object *o, int ma_need)
 {
-       struct md_object        *next = mdt_object_child(o);
-       const struct mdt_body   *reqbody = info->mti_body;
-       struct ptlrpc_request   *req = mdt_info_req(info);
-       struct md_attr          *ma = &info->mti_attr;
-       struct lu_attr          *la = &ma->ma_attr;
-       struct req_capsule      *pill = info->mti_pill;
-       const struct lu_env     *env = info->mti_env;
-       struct mdt_body         *repbody;
-       struct lu_buf           *buffer = &info->mti_buf;
-       struct obd_export       *exp = info->mti_exp;
-       int                      rc;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct md_object *next = mdt_object_child(o);
+       const struct mdt_body *reqbody = info->mti_body;
+       struct ptlrpc_request *req = mdt_info_req(info);
+       struct md_attr *ma = &info->mti_attr;
+       struct lu_attr *la = &ma->ma_attr;
+       struct req_capsule *pill = info->mti_pill;
+       const struct lu_env *env = info->mti_env;
+       struct mdt_body *repbody;
+       struct lu_buf *buffer = &info->mti_buf;
+       struct obd_export *exp = info->mti_exp;
+       int rc;
+
        ENTRY;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
@@ -1301,13 +1383,13 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                }
        }
 
-        if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
+       if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
            reqbody->mbo_valid & OBD_MD_FLDIREA  &&
-            lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
-                /* get default stripe info for this dir. */
-                ma->ma_need |= MA_LOV_DEF;
-        }
-        ma->ma_need |= ma_need;
+           lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
+               /* get default stripe info for this dir. */
+               ma->ma_need |= MA_LOV_DEF;
+       }
+       ma->ma_need |= ma_need;
 
        rc = mdt_attr_get_complex(info, o, ma);
        if (unlikely(rc)) {
@@ -1326,22 +1408,27 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_t_state = MS_RESTORE;
        }
 
-        if (likely(ma->ma_valid & MA_INODE))
-                mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
-        else
-                RETURN(-EFAULT);
+       if (unlikely(!(ma->ma_valid & MA_INODE)))
+               RETURN(-EFAULT);
+
+       mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
 
-        if (mdt_body_has_lov(la, reqbody)) {
-                if (ma->ma_valid & MA_LOV) {
-                        LASSERT(ma->ma_lmm_size);
+       if (mdt_body_has_lov(la, reqbody)) {
+               u32 stripe_count = 1;
+
+               if (ma->ma_valid & MA_LOV) {
+                       LASSERT(ma->ma_lmm_size);
                        repbody->mbo_eadatasize = ma->ma_lmm_size;
                        if (S_ISDIR(la->la_mode))
                                repbody->mbo_valid |= OBD_MD_FLDIREA;
                        else
                                repbody->mbo_valid |= OBD_MD_FLEASIZE;
                        mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
-                }
+               }
                if (ma->ma_valid & MA_LMV) {
+                       struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+                       u32 magic = le32_to_cpu(lmv->lmv_magic);
+
                        /* Return -ENOTSUPP for old client */
                        if (!mdt_is_striped_client(req->rq_export))
                                RETURN(-ENOTSUPP);
@@ -1350,6 +1437,13 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        mdt_dump_lmv(D_INFO, ma->ma_lmv);
                        repbody->mbo_eadatasize = ma->ma_lmv_size;
                        repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
+
+                       stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+                       if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
+                               mdt_restripe_migrate_add(info, o);
+                       else if (magic == LMV_MAGIC_V1 &&
+                                lmv_is_restriping(lmv))
+                               mdt_restripe_update_add(info, o);
                }
                if (ma->ma_valid & MA_LMV_DEF) {
                        /* Return -ENOTSUPP for old client */
@@ -1366,6 +1460,18 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                        repbody->mbo_valid |= (OBD_MD_FLDIREA |
                                               OBD_MD_DEFAULT_MEA);
                }
+               CDEBUG(D_VFSTRACE,
+                      "dirent count %llu stripe count %u MDT count %d\n",
+                      ma->ma_attr.la_dirent_count, stripe_count,
+                      atomic_read(&mdt->mdt_mds_mds_conns) + 1);
+               if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
+                   ma->ma_attr.la_dirent_count >
+                       mdt->mdt_restriper.mdr_dir_split_count &&
+                   !fid_is_root(mdt_object_fid(o)) &&
+                   mdt->mdt_enable_dir_auto_split &&
+                   !o->mot_restriping &&
+                   stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1)
+                       mdt_auto_split_add(info, o);
        } else if (S_ISLNK(la->la_mode) &&
                   reqbody->mbo_valid & OBD_MD_LINKNAME) {
                buffer->lb_buf = ma->ma_lmm;
@@ -1403,8 +1509,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                               print_limit < rc ? "..." : "", print_limit,
                               (char *)ma->ma_lmm + rc - print_limit, rc);
                        rc = 0;
-                }
-        }
+               }
+       }
 
        if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
                repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
@@ -1426,10 +1532,10 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
 #endif
 
 out:
-        if (rc == 0)
+       if (rc == 0)
                mdt_counter_incr(req, LPROC_MDT_GETATTR);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdt_getattr(struct tgt_session_info *tsi)
@@ -5369,6 +5475,8 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        next->md_ops->mdo_iocontrol(env, next, OBD_IOC_STOP_LFSCK, 0, &stop);
 
        mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
+
+       mdt_restriper_stop(m);
        ping_evictor_stop();
 
        /* Remove the HSM /proc entry so the coordinator cannot be
@@ -5510,10 +5618,12 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        m->mdt_enable_remote_dir = 1;
        m->mdt_enable_striped_dir = 1;
        m->mdt_enable_dir_migration = 1;
-       m->mdt_enable_dir_restripe = 1;
+       m->mdt_enable_dir_restripe = 0;
+       m->mdt_enable_dir_auto_split = 0;
        m->mdt_enable_remote_dir_gid = 0;
        m->mdt_enable_chprojid_gid = 0;
        m->mdt_enable_remote_rename = 1;
+       m->mdt_dir_restripe_nsonly = 1;
 
        atomic_set(&m->mdt_mds_mds_conns, 0);
        atomic_set(&m->mdt_async_commit_count, 0);
@@ -5674,7 +5784,14 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if ((lsi->lsi_lmd->lmd_flags & LMD_FLG_LOCAL_RECOV))
                m->mdt_lut.lut_local_recovery = 1;
 
+       rc = mdt_restriper_start(m);
+       if (rc)
+               GOTO(err_ping_evictor, rc);
+
        RETURN(0);
+
+err_ping_evictor:
+       ping_evictor_stop();
 err_procfs:
        mdt_tunables_fini(m);
 err_recovery:
@@ -5824,6 +5941,8 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                init_rwsem(&mo->mot_dom_sem);
                init_rwsem(&mo->mot_open_sem);
                atomic_set(&mo->mot_open_count, 0);
+               mo->mot_restripe_offset = 0;
+               INIT_LIST_HEAD(&mo->mot_restripe_linkage);
                RETURN(o);
        }
        RETURN(NULL);
index b9d32d6..d259696 100644 (file)
@@ -59,6 +59,7 @@
 #include <lustre_eacl.h>
 #include <lustre_quota.h>
 #include <lustre_linkea.h>
+#include <lustre_lmv.h>
 
 struct mdt_object;
 
@@ -205,6 +206,36 @@ struct mdt_statfs_cache {
        __u64 msf_age;
 };
 
+/* split directory automatically when sub file count exceeds 50k */
+#define DIR_SPLIT_COUNT_DEFAULT        50000
+
+/* directory auto-split allocate delta new stripes each time */
+#define DIR_SPLIT_DELTA_DEFAULT        4
+
+struct mdt_dir_restriper {
+       struct lu_env           mdr_env;
+       struct lu_context       mdr_session;
+       struct task_struct     *mdr_task;
+       /* lock for below fields */
+       spinlock_t              mdr_lock;
+       /* auto split when plain dir/shard sub files exceed threshold */
+       u64                     mdr_dir_split_count;
+       /* auto split growth delta */
+       u32                     mdr_dir_split_delta;
+       /* directories to split */
+       struct list_head        mdr_auto_splitting;
+       /* directories under which sub files are migrating */
+       struct list_head        mdr_migrating;
+       /* directories waiting to update layout after migration */
+       struct list_head        mdr_updating;
+       /* time to update directory layout after migration */
+       time64_t                mdr_update_time;
+       /* lum used in split/migrate/layout_change */
+       union lmv_mds_md        mdr_lmv;
+       /* page used in readdir */
+       struct page            *mdr_page;
+};
+
 struct mdt_device {
        /* super-class */
        struct lu_device           mdt_lu_dev;
@@ -256,9 +287,12 @@ struct mdt_device {
                                   mdt_enable_striped_dir:1,
                                   mdt_enable_dir_migration:1,
                                   mdt_enable_dir_restripe:1,
+                                  mdt_enable_dir_auto_split:1,
                                   mdt_enable_remote_rename:1,
                                   mdt_skip_lfsck:1,
-                                  mdt_readonly:1;
+                                  mdt_readonly:1,
+                                  /* dir restripe migrate dirent only */
+                                  mdt_dir_restripe_nsonly:1;
 
                                   /* user with gid can create remote/striped
                                    * dir, and set default dir stripe */
@@ -293,6 +327,8 @@ struct mdt_device {
        atomic_t                   mdt_async_commit_count;
 
        struct mdt_object         *mdt_md_root;
+
+       struct mdt_dir_restriper   mdt_restriper;
 };
 
 #define MDT_SERVICE_WATCHDOG_FACTOR    (2)
@@ -304,14 +340,17 @@ struct mdt_object {
        struct lu_object_header mot_header;
        struct lu_object        mot_obj;
        unsigned int            mot_lov_created:1,  /* lov object created */
-                               mot_cache_attr:1;   /* enable remote object
+                               mot_cache_attr:1,   /* enable remote object
                                                     * attribute cache */
+                               mot_restriping:1,   /* dir restriping */
+                               /* dir auto-split disabled */
+                               mot_auto_split_disabled:1;
        int                     mot_write_count;
        spinlock_t              mot_write_lock;
-       /* Lock to protect object's SOM update. */
-       struct mutex            mot_som_mutex;
         /* Lock to protect create_data */
        struct mutex            mot_lov_mutex;
+       /* Lock to protect object's SOM update. */
+       struct mutex            mot_som_mutex;
        /* lock to protect read/write stages for Data-on-MDT files */
        struct rw_semaphore     mot_dom_sem;
        /* Lock to protect lease open.
@@ -319,6 +358,10 @@ struct mdt_object {
        struct rw_semaphore     mot_open_sem;
        atomic_t                mot_lease_count;
        atomic_t                mot_open_count;
+       /* directory offset, used in sub file migration in dir restripe */
+       loff_t                  mot_restripe_offset;
+       /* link to mdt_restriper auto_splitting/migrating/updating */
+       struct list_head        mot_restripe_linkage;
 };
 
 struct mdt_lock_handle {
@@ -849,10 +892,14 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
                         struct mdt_object *o, struct md_attr *ma);
 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
                      const char *name);
+int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                    struct md_attr *ma, const char *name);
 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
                   struct md_attr *ma, const char *name);
 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
                      struct lu_fid *pfid);
+int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
+                          struct lu_fid *pfid, struct lu_name *lname);
 int mdt_write_get(struct mdt_object *o);
 void mdt_write_put(struct mdt_object *o);
 int mdt_write_read(struct mdt_object *o);
@@ -888,6 +935,10 @@ int mdt_version_get_check(struct mdt_thread_info *, struct mdt_object *, int);
 void mdt_version_get_save(struct mdt_thread_info *, struct mdt_object *, int);
 int mdt_version_get_check_save(struct mdt_thread_info *, struct mdt_object *,
                                int);
+int mdt_lookup_version_check(struct mdt_thread_info *info,
+                            struct mdt_object *p,
+                            const struct lu_name *lname,
+                            struct lu_fid *fid, int idx);
 void mdt_thread_info_init(struct ptlrpc_request *req,
                          struct mdt_thread_info *mti);
 void mdt_thread_info_fini(struct mdt_thread_info *mti);
@@ -1360,5 +1411,24 @@ static inline bool mdt_is_rootadmin(struct mdt_thread_info *info)
        return is_admin;
 }
 
+int mdt_reint_migrate(struct mdt_thread_info *info,
+                     struct mdt_lock_handle *unused);
+int mdt_dir_layout_update(struct mdt_thread_info *info);
+
+/* directory restripe */
+int mdt_restripe_internal(struct mdt_thread_info *info,
+                         struct mdt_object *parent,
+                         struct mdt_object *child,
+                         const struct lu_name *lname,
+                         struct lu_fid *tfid,
+                         struct md_op_spec *spec,
+                         struct md_attr *ma);
+int mdt_restriper_start(struct mdt_device *mdt);
+void mdt_restriper_stop(struct mdt_device *mdt);
+void mdt_auto_split_add(struct mdt_thread_info *info, struct mdt_object *o);
+void mdt_restripe_migrate_add(struct mdt_thread_info *info,
+                             struct mdt_object *o);
+void mdt_restripe_update_add(struct mdt_thread_info *info,
+                            struct mdt_object *o);
 
 #endif /* _MDT_INTERNAL_H */
index ff76d12..ff79c25 100644 (file)
@@ -1523,6 +1523,7 @@ static int mdt_migrate_unpack(struct mdt_thread_info *info)
        } else {
                spec->sp_migrate_close = 0;
        }
+       spec->sp_migrate_nsonly = 0;
 
        /* lustre version > 2.11 migration packs lum */
        if (req_capsule_has_field(pill, &RMF_EADATA, RCL_CLIENT)) {
index 6e63b64..4d7ac6e 100644 (file)
@@ -794,6 +794,36 @@ static ssize_t enable_dir_restripe_store(struct kobject *kobj,
 }
 LUSTRE_RW_ATTR(enable_dir_restripe);
 
+static ssize_t enable_dir_auto_split_show(struct kobject *kobj,
+                                         struct attribute *attr, char *buf)
+{
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+       return scnprintf(buf, PAGE_SIZE, "%u\n",
+                        mdt->mdt_enable_dir_auto_split);
+}
+
+static ssize_t enable_dir_auto_split_store(struct kobject *kobj,
+                                          struct attribute *attr,
+                                          const char *buffer, size_t count)
+{
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       bool val;
+       int rc;
+
+       rc = kstrtobool(buffer, &val);
+       if (rc)
+               return rc;
+
+       mdt->mdt_enable_dir_auto_split = val;
+       return count;
+}
+LUSTRE_RW_ATTR(enable_dir_auto_split);
+
 /**
  * Show MDT async commit count.
  *
@@ -1101,6 +1131,108 @@ static ssize_t enable_remote_rename_store(struct kobject *kobj,
 }
 LUSTRE_RW_ATTR(enable_remote_rename);
 
+static ssize_t dir_split_count_show(struct kobject *kobj,
+                                    struct attribute *attr,
+                                    char *buf)
+{
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+       return scnprintf(buf, PAGE_SIZE, "%llu\n",
+                        mdt->mdt_restriper.mdr_dir_split_count);
+}
+
+static ssize_t dir_split_count_store(struct kobject *kobj,
+                                     struct attribute *attr,
+                                     const char *buffer, size_t count)
+{
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       char tbuf[22] = "";
+       s64 val;
+       int rc;
+
+       if (count > (sizeof(tbuf) - 1))
+               return -EINVAL;
+
+       memcpy(tbuf, buffer, count);
+
+       rc = sysfs_memparse(buffer, count, &val, "B");
+       if (rc < 0)
+               return rc;
+
+       if (val < 0)
+               return -ERANGE;
+
+       mdt->mdt_restriper.mdr_dir_split_count = val;
+
+       return count;
+}
+LUSTRE_RW_ATTR(dir_split_count);
+
+static ssize_t dir_split_delta_show(struct kobject *kobj,
+                                   struct attribute *attr,
+                                   char *buf)
+{
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+       return scnprintf(buf, PAGE_SIZE, "%u\n",
+                        mdt->mdt_restriper.mdr_dir_split_delta);
+}
+
+static ssize_t dir_split_delta_store(struct kobject *kobj,
+                                    struct attribute *attr,
+                                    const char *buffer, size_t count)
+{
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       u32 val;
+       int rc;
+
+       rc = kstrtouint(buffer, 0, &val);
+       if (rc)
+               return rc;
+
+       mdt->mdt_restriper.mdr_dir_split_delta = val;
+
+       return count;
+}
+LUSTRE_RW_ATTR(dir_split_delta);
+
+static ssize_t dir_restripe_nsonly_show(struct kobject *kobj,
+                                       struct attribute *attr, char *buf)
+{
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+       return scnprintf(buf, PAGE_SIZE, "%u\n", mdt->mdt_dir_restripe_nsonly);
+}
+
+static ssize_t dir_restripe_nsonly_store(struct kobject *kobj,
+                                        struct attribute *attr,
+                                        const char *buffer, size_t count)
+{
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       bool val;
+       int rc;
+
+       rc = kstrtobool(buffer, &val);
+       if (rc)
+               return rc;
+
+       mdt->mdt_dir_restripe_nsonly = val;
+       return count;
+}
+LUSTRE_RW_ATTR(dir_restripe_nsonly);
+
 LPROC_SEQ_FOPS_RO_TYPE(mdt, hash);
 LPROC_SEQ_FOPS_WR_ONLY(mdt, mds_evict_client);
 LUSTRE_RW_ATTR(job_cleanup_interval);
@@ -1141,6 +1273,7 @@ static struct attribute *mdt_attrs[] = {
        &lustre_attr_enable_striped_dir.attr,
        &lustre_attr_enable_dir_migration.attr,
        &lustre_attr_enable_dir_restripe.attr,
+       &lustre_attr_enable_dir_auto_split.attr,
        &lustre_attr_enable_remote_rename.attr,
        &lustre_attr_commit_on_sharing.attr,
        &lustre_attr_local_recovery.attr,
@@ -1152,6 +1285,9 @@ static struct attribute *mdt_attrs[] = {
        &lustre_attr_hsm_control.attr,
        &lustre_attr_job_cleanup_interval.attr,
        &lustre_attr_readonly.attr,
+       &lustre_attr_dir_split_count.attr,
+       &lustre_attr_dir_split_delta.attr,
+       &lustre_attr_dir_restripe_nsonly.attr,
        NULL,
 };
 
index 1b65208..7a53b7a 100644 (file)
@@ -190,10 +190,10 @@ int mdt_version_get_check_save(struct mdt_thread_info *info,
  * This checks version of 'name'. Many reint functions uses 'name' for child not
  * FID, therefore we need to get object by name and check its version.
  */
-static int mdt_lookup_version_check(struct mdt_thread_info *info,
-                                   struct mdt_object *p,
-                                   const struct lu_name *lname,
-                                   struct lu_fid *fid, int idx)
+int mdt_lookup_version_check(struct mdt_thread_info *info,
+                            struct mdt_object *p,
+                            const struct lu_name *lname,
+                            struct lu_fid *fid, int idx)
 {
         int rc, vbrc;
 
@@ -343,25 +343,20 @@ void mdt_reint_striped_unlock(struct mdt_thread_info *info,
 }
 
 static int mdt_restripe(struct mdt_thread_info *info,
-                       struct mdt_object *pobj,
+                       struct mdt_object *parent,
                        const struct lu_name *lname,
                        const struct lu_fid *tfid,
                        struct md_op_spec *spec,
                        struct md_attr *ma)
 {
-       const struct lu_env *env = info->mti_env;
        struct mdt_device *mdt = info->mti_mdt;
-       struct lu_fid *cfid = &info->mti_tmp_fid2;
-       struct lmv_user_md *lum = spec->u.sp_ea.eadata;
-       struct md_layout_change *mlc = &info->mti_mlc;
+       struct lu_fid *fid = &info->mti_tmp_fid2;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
        struct lmv_mds_md_v1 *lmv;
        struct mdt_object *child;
-       struct mdt_object *tobj = NULL;
-       struct mdt_lock_handle *lhp = NULL;
+       struct mdt_lock_handle *lhp;
        struct mdt_lock_handle *lhc;
        struct mdt_body *repbody;
-       u32 lmv_stripe_count = 0;
        int rc;
 
        ENTRY;
@@ -369,47 +364,37 @@ static int mdt_restripe(struct mdt_thread_info *info,
        if (!mdt->mdt_enable_dir_restripe)
                RETURN(-EPERM);
 
-       /* mti_big_lmm is used to save LMV, but it may be uninitialized. */
-       if (unlikely(!info->mti_big_lmm)) {
-               info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
-               OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
-               if (!info->mti_big_lmm)
-                       RETURN(-ENOMEM);
-       }
-
-       rc = mdt_version_get_check_save(info, pobj, 0);
+       rc = mdt_version_get_check_save(info, parent, 0);
        if (rc)
                RETURN(rc);
 
-       ma->ma_lmv = info->mti_big_lmm;
-       ma->ma_lmv_size = info->mti_big_lmmsize;
-       ma->ma_valid = 0;
-       rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
+       lhp = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_pdo_init(lhp, LCK_PW, lname);
+       rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
+                                  true);
        if (rc)
                RETURN(rc);
 
+       rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
+       if (rc)
+               GOTO(unlock_parent, rc);
+
        if (ma->ma_valid & MA_LMV) {
                /* don't allow restripe if parent dir layout is changing */
                lmv = &ma->ma_lmv->lmv_md_v1;
                if (!lmv_is_sane(lmv))
-                       RETURN(-EBADF);
+                       GOTO(unlock_parent, rc = -EBADF);
 
                if (lmv_is_layout_changing(lmv))
-                       RETURN(-EBUSY);
+                       GOTO(unlock_parent, rc = -EBUSY);
        }
 
-       lhp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lhp, LCK_PW, lname);
-       rc = mdt_reint_object_lock(info, pobj, lhp, MDS_INODELOCK_UPDATE, true);
-       if (rc)
-               RETURN(rc);
-
-       fid_zero(cfid);
-       rc = mdt_lookup_version_check(info, pobj, lname, cfid, 1);
+       fid_zero(fid);
+       rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
        if (rc)
                GOTO(unlock_parent, rc);
 
-       child = mdt_object_find(info->mti_env, mdt, cfid);
+       child = mdt_object_find(info->mti_env, mdt, fid);
        if (IS_ERR(child))
                GOTO(unlock_parent, rc = PTR_ERR(child));
 
@@ -423,7 +408,7 @@ static int mdt_restripe(struct mdt_thread_info *info,
                if (!repbody)
                        GOTO(out_child, rc = -EPROTO);
 
-               repbody->mbo_fid1 = *cfid;
+               repbody->mbo_fid1 = *fid;
                repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
                GOTO(out_child, rc = -EREMOTE);
        }
@@ -433,8 +418,9 @@ static int mdt_restripe(struct mdt_thread_info *info,
        mdt_lock_reg_init(lhc, LCK_EX);
 
        /* enqueue object remote LOOKUP lock */
-       if (mdt_object_remote(pobj)) {
-               rc = mdt_remote_object_lock(info, pobj, cfid, &lhc->mlh_rreg_lh,
+       if (mdt_object_remote(parent)) {
+               rc = mdt_remote_object_lock(info, parent, fid,
+                                           &lhc->mlh_rreg_lh,
                                            lhc->mlh_rreg_mode,
                                            MDS_INODELOCK_LOOKUP, false);
                if (rc != ELDLM_OK)
@@ -451,114 +437,35 @@ static int mdt_restripe(struct mdt_thread_info *info,
        if (rc)
                GOTO(unlock_child, rc);
 
-       ma->ma_valid = 0;
-       rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
-       if (rc)
-               GOTO(unlock_child, rc);
-
-       if (ma->ma_valid & MA_LMV) {
-               lmv = &ma->ma_lmv->lmv_md_v1;
-               if (!lmv_is_sane(lmv))
-                       GOTO(unlock_child, rc = -EBADF);
-
-               /* don't allow restripe if dir layout is changing */
-               if (lmv_is_layout_changing(lmv))
-                       GOTO(unlock_child, rc = -EBUSY);
-
-               /* check whether stripe count and hash unchanged */
-               if (lum->lum_stripe_count == lmv->lmv_stripe_count &&
-                   lum->lum_hash_type == lmv->lmv_hash_type)
-                       GOTO(unlock_child, rc = -EALREADY);
-
-               lmv_stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
-       } else if (le32_to_cpu(lum->lum_stripe_count) < 2) {
-               /* stripe count unchanged for plain directory */
-               GOTO(unlock_child, rc = -EALREADY);
-       }
-
-       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-       if (!repbody)
-               GOTO(unlock_child, rc = -EPROTO);
-
-       if (le32_to_cpu(lum->lum_stripe_count) > lmv_stripe_count) {
-               /* split */
-               ma->ma_need = MA_INODE;
-               ma->ma_valid = 0;
-               rc = mdt_attr_get_complex(info, child, ma);
-               if (rc)
-                       GOTO(unlock_child, rc);
-
-               if (!(ma->ma_valid & MA_INODE))
-                       GOTO(unlock_child, rc = -EBADF);
-
-               if (!lmv_stripe_count) {
-                       /* if child is plain directory, allocate @tobj as the
-                        * master object, and make child the first stripe of
-                        * @tobj.
-                        */
-                       tobj = mdt_object_new(info->mti_env, mdt, tfid);
-                       if (unlikely(IS_ERR(tobj)))
-                               GOTO(unlock_child, rc = PTR_ERR(tobj));
-               }
-
-               mlc->mlc_opc = MD_LAYOUT_SPLIT;
-               mlc->mlc_parent = mdt_object_child(pobj);
-               mlc->mlc_target = tobj ? mdt_object_child(tobj) : NULL;
-               mlc->mlc_attr = &ma->ma_attr;
-               mlc->mlc_name = lname;
-               mlc->mlc_spec = spec;
-               rc = mo_layout_change(env, mdt_object_child(child), mlc);
-               if (rc)
-                       GOTO(out_tobj, rc);
-       } else {
-               /* merge only needs to override LMV */
-               struct lu_buf *buf = &info->mti_buf;
-               __u32 version;
-
-               LASSERT(ma->ma_valid & MA_LMV);
-               lmv = &ma->ma_lmv->lmv_md_v1;
-               version = cpu_to_le32(lmv->lmv_layout_version);
-
-               /* adjust 0 to 1 */
-               if (lum->lum_stripe_count == 0)
-                       lum->lum_stripe_count = cpu_to_le32(1);
-
-               lmv->lmv_hash_type |= cpu_to_le32(LMV_HASH_FLAG_MERGE |
-                                                 LMV_HASH_FLAG_MIGRATION);
-               lmv->lmv_merge_offset = lum->lum_stripe_count;
-               lmv->lmv_merge_hash = lum->lum_hash_type;
-               lmv->lmv_layout_version = cpu_to_le32(++version);
-
-               buf->lb_buf = lmv;
-               buf->lb_len = sizeof(*lmv);
-               rc = mo_xattr_set(env, mdt_object_child(child), buf,
-                                 XATTR_NAME_LMV, LU_XATTR_REPLACE);
-               if (rc)
-                       GOTO(unlock_child, rc);
+       spin_lock(&mdt->mdt_restriper.mdr_lock);
+       if (child->mot_restriping) {
+               /* race? */
+               spin_unlock(&mdt->mdt_restriper.mdr_lock);
+               GOTO(unlock_child, rc = -EBUSY);
        }
+       child->mot_restriping = 1;
+       spin_unlock(&mdt->mdt_restriper.mdr_lock);
 
-       ma->ma_need = MA_INODE;
-       ma->ma_valid = 0;
-       rc = mdt_attr_get_complex(info, tobj ? tobj : child, ma);
+       *fid = *tfid;
+       rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
        if (rc)
-               GOTO(out_tobj, rc);
+               GOTO(restriping_clear, rc);
 
-       if (!(ma->ma_valid & MA_INODE))
-               GOTO(out_tobj, rc = -EBADF);
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       if (!repbody)
+               GOTO(restriping_clear, rc = -EPROTO);
 
-       mdt_pack_attr2body(info, repbody, &ma->ma_attr,
-                          mdt_object_fid(tobj ? tobj : child));
+       mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
        EXIT;
 
-out_tobj:
-       if (tobj)
-               mdt_object_put(env, tobj);
+restriping_clear:
+       child->mot_restriping = 0;
 unlock_child:
        mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
 out_child:
-       mdt_object_put(env, child);
+       mdt_object_put(info->mti_env, child);
 unlock_parent:
-       mdt_object_unlock(info, pobj, lhp, rc);
+       mdt_object_unlock(info, parent, lhp, rc);
 
        return rc;
 }
@@ -753,13 +660,14 @@ static int mdt_create(struct mdt_thread_info *info)
        if (ma->ma_valid & MA_INODE)
                mdt_pack_attr2body(info, repbody, &ma->ma_attr,
                                   mdt_object_fid(child));
+       EXIT;
 put_child:
        mdt_object_put(info->mti_env, child);
 unlock_parent:
        mdt_object_unlock(info, parent, lh, rc);
 put_parent:
        mdt_object_put(info->mti_env, parent);
-       RETURN(rc);
+       return rc;
 }
 
 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
@@ -2004,9 +1912,6 @@ static int mdt_migrate_object_lock(struct mdt_thread_info *info,
                if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
                        struct md_attr *ma = &info->mti_attr;
 
-                       ma->ma_lmv = info->mti_big_lmm;
-                       ma->ma_lmv_size = info->mti_big_lmmsize;
-                       ma->ma_valid = 0;
                        rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
                        if (rc) {
                                mdt_object_unlock(info, obj, lh, rc);
@@ -2078,9 +1983,9 @@ static int mdt_migrate_lookup(struct mdt_thread_info *info,
                        /*
                         * if parent layout is changeing, and lookup child
                         * failed on source stripe, lookup again on target
-                        *  stripe, if it exists, it means previous migration
-                        *  was interrupted, and current file was migrated
-                        *  already.
+                        * stripe, if it exists, it means previous migration
+                        * was interrupted, and current file was migrated
+                        * already.
                         */
                        mdt_object_put(env, stripe);
 
@@ -2194,8 +2099,8 @@ close:
  *  9. unlock above locks
  * 10. sync device if source has links
  */
-static int mdt_reint_migrate(struct mdt_thread_info *info,
-                            struct mdt_lock_handle *unused)
+int mdt_reint_migrate(struct mdt_thread_info *info,
+                     struct mdt_lock_handle *unused)
 {
        const struct lu_env *env = info->mti_env;
        struct mdt_device *mdt = info->mti_mdt;
@@ -2239,7 +2144,7 @@ static int mdt_reint_migrate(struct mdt_thread_info *info,
        if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
                RETURN(-EPERM);
 
-       if (!md_capable(uc, CFS_CAP_SYS_ADMIN) &&
+       if (uc && !md_capable(uc, CFS_CAP_SYS_ADMIN) &&
            uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
            mdt->mdt_enable_remote_dir_gid != -1)
                RETURN(-EPERM);
@@ -2249,8 +2154,10 @@ static int mdt_reint_migrate(struct mdt_thread_info *info,
         * if other MDT holds rename lock, but being blocked to wait for
         * this MDT to finish its recovery, and the failover MDT can not
         * get rename lock, which will cause deadlock.
+        *
+        * req is NULL if this is called by directory auto-split.
         */
-       if (!req_is_replay(req)) {
+       if (req && !req_is_replay(req)) {
                rc = mdt_rename_lock(info, &rename_lh);
                if (rc != 0) {
                        CERROR("%s: can't lock FS for rename: rc = %d\n",
@@ -2260,20 +2167,22 @@ static int mdt_reint_migrate(struct mdt_thread_info *info,
        }
 
        /* pobj is master object of parent */
-       pobj = mdt_parent_find_check(info, rr->rr_fid1, 0);
+       pobj = mdt_object_find(env, mdt, rr->rr_fid1);
        if (IS_ERR(pobj))
                GOTO(unlock_rename, rc = PTR_ERR(pobj));
 
-       if (unlikely(!info->mti_big_lmm)) {
-               info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
-               OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
-               if (!info->mti_big_lmm)
-                       GOTO(put_parent, rc = -ENOMEM);
+       if (req) {
+               rc = mdt_version_get_check(info, pobj, 0);
+               if (rc)
+                       GOTO(put_parent, rc);
        }
 
-       ma->ma_lmv = info->mti_big_lmm;
-       ma->ma_lmv_size = info->mti_big_lmmsize;
-       ma->ma_valid = 0;
+       if (!mdt_object_exists(pobj))
+               GOTO(put_parent, rc = -ENOENT);
+
+       if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
+               GOTO(put_parent, rc = -ENOTDIR);
+
        rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
        if (rc)
                GOTO(put_parent, rc);
@@ -2315,17 +2224,14 @@ lock_parent:
         */
        do_sync = rc;
 
-       /* TODO: DoM migration is not supported yet */
+       /* TODO: DoM migration is not supported, migrate dirent only */
        if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
-               ma->ma_lmm = info->mti_big_lmm;
-               ma->ma_lmm_size = info->mti_big_lmmsize;
-               ma->ma_valid = 0;
                rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
                if (rc)
-                       GOTO(put_source, rc);
+                       GOTO(unlock_links, rc);
 
                if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
-                       GOTO(put_source, rc = -EOPNOTSUPP);
+                       info->mti_spec.sp_migrate_nsonly = 1;
        }
 
        /* if migration HSM is allowed */
@@ -2383,7 +2289,8 @@ lock_parent:
                         mdt_object_child(tobj),
                         &info->mti_spec, ma);
        if (!rc)
-               mdt_counter_incr(req, LPROC_MDT_MIGRATE);
+               lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
+                                    LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
        EXIT;
 
        mdt_object_unlock(info, tobj, lht, rc);
@@ -2490,10 +2397,6 @@ static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
                return 0;
 
        /* check whether sobj and tobj are sibling stripes */
-       ma->ma_need = MA_LMV;
-       ma->ma_valid = 0;
-       ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
-       ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
        rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
        if (rc)
                return rc;
diff --git a/lustre/mdt/mdt_restripe.c b/lustre/mdt/mdt_restripe.c
new file mode 100644 (file)
index 0000000..cddf47c
--- /dev/null
@@ -0,0 +1,991 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ *
+ * lustre/mdt/mdt_restriper.c
+ *
+ * Lustre directory restripe and auto-split
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <linux/sched.h>
+#include <linux/kthread.h>
+#include "mdt_internal.h"
+
+/* add directory into splitting list and wake up restripe thread */
+void mdt_auto_split_add(struct mdt_thread_info *info, struct mdt_object *o)
+{
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
+
+       spin_lock(&restriper->mdr_lock);
+       if (mdt->mdt_enable_dir_auto_split && !o->mot_restriping) {
+               o->mot_restriping = 1;
+               mdt_object_get(NULL, o);
+               LASSERT(list_empty(&o->mot_restripe_linkage));
+               list_add_tail(&o->mot_restripe_linkage,
+                             &restriper->mdr_auto_splitting);
+
+               CDEBUG(D_INFO, "add "DFID" into auto split list.\n",
+                      PFID(mdt_object_fid(o)));
+       }
+       spin_unlock(&restriper->mdr_lock);
+
+       wake_up_process(restriper->mdr_task);
+}
+
+void mdt_restripe_migrate_add(struct mdt_thread_info *info,
+                             struct mdt_object *o)
+{
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
+
+       spin_lock(&restriper->mdr_lock);
+       if (!o->mot_restriping) {
+               o->mot_restriping = 1;
+               o->mot_restripe_offset = 0;
+               mdt_object_get(NULL, o);
+               LASSERT(list_empty(&o->mot_restripe_linkage));
+               list_add_tail(&o->mot_restripe_linkage,
+                             &restriper->mdr_migrating);
+
+               CDEBUG(D_INFO, "add "DFID" into migrate list.\n",
+                      PFID(mdt_object_fid(o)));
+       }
+       spin_unlock(&restriper->mdr_lock);
+
+       wake_up_process(restriper->mdr_task);
+}
+
+void mdt_restripe_update_add(struct mdt_thread_info *info,
+                            struct mdt_object *o)
+{
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
+
+       spin_lock(&restriper->mdr_lock);
+       if (!o->mot_restriping) {
+               /* update LMV */
+               o->mot_restriping = 1;
+               mdt_object_get(NULL, o);
+               if (list_empty(&restriper->mdr_updating))
+                       restriper->mdr_update_time = ktime_get_real_seconds();
+               LASSERT(list_empty(&o->mot_restripe_linkage));
+               list_add_tail(&o->mot_restripe_linkage,
+                             &restriper->mdr_updating);
+
+               CDEBUG(D_INFO, "add "DFID" into update list.\n",
+                      PFID(mdt_object_fid(o)));
+       }
+       spin_unlock(&restriper->mdr_lock);
+
+       wake_up_process(restriper->mdr_task);
+}
+
+static inline int mdt_fid_alloc(const struct lu_env *env,
+                               struct mdt_device *mdt,
+                               struct lu_fid *fid,
+                               struct mdt_object *parent,
+                               const struct lu_name *name)
+{
+       struct lu_device *next = &mdt->mdt_child->md_lu_dev;
+       struct lu_object *o = lu_object_next(&parent->mot_obj);
+
+       return next->ld_ops->ldo_fid_alloc(env, next, fid, o, name);
+}
+
+static void mdt_auto_split_prep(struct mdt_thread_info *info,
+                               struct md_op_spec *spec,
+                               struct md_attr *ma,
+                               u32 lum_stripe_count)
+{
+       struct lu_attr *attr = &ma->ma_attr;
+       struct lmv_user_md_v1 *lum;
+
+       attr->la_ctime = attr->la_mtime = ktime_get_real_seconds();
+       attr->la_valid = LA_CTIME | LA_MTIME;
+
+       lum = &info->mti_mdt->mdt_restriper.mdr_lmv.lmv_user_md;
+       lum->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
+       lum->lum_stripe_count = cpu_to_le32(lum_stripe_count);
+       lum->lum_stripe_offset = cpu_to_le32(LMV_OFFSET_DEFAULT);
+       lum->lum_hash_type = 0;
+
+       spec->u.sp_ea.eadatalen = sizeof(*lum);
+       spec->u.sp_ea.eadata = lum;
+       spec->sp_cr_flags = MDS_OPEN_HAS_EA;
+       spec->no_create = 0;
+       spec->sp_migrate_close = 0;
+}
+
+/* restripe directory: split or merge stripes */
+int mdt_restripe_internal(struct mdt_thread_info *info,
+                         struct mdt_object *parent,
+                         struct mdt_object *child,
+                         const struct lu_name *lname,
+                         struct lu_fid *tfid,
+                         struct md_op_spec *spec,
+                         struct md_attr *ma)
+{
+       const struct lu_env *env = info->mti_env;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct lmv_user_md *lum = spec->u.sp_ea.eadata;
+       struct lmv_mds_md_v1 *lmv;
+       u32 lmv_stripe_count = 0;
+       int rc;
+
+       ENTRY;
+
+       rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
+       if (rc)
+               RETURN(rc);
+
+       if (ma->ma_valid & MA_LMV) {
+               lmv = &ma->ma_lmv->lmv_md_v1;
+               if (!lmv_is_sane(lmv))
+                       RETURN(-EBADF);
+
+               /* don't allow restripe if dir layout is changing */
+               if (lmv_is_layout_changing(lmv))
+                       RETURN(-EBUSY);
+
+               /* check whether stripe count and hash unchanged */
+               if (lum->lum_stripe_count == lmv->lmv_stripe_count &&
+                   lum->lum_hash_type == lmv->lmv_hash_type)
+                       RETURN(-EALREADY);
+
+               lmv_stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+       } else if (le32_to_cpu(lum->lum_stripe_count) < 2) {
+               /* stripe count unchanged for plain directory */
+               RETURN(-EALREADY);
+       }
+
+       if (le32_to_cpu(lum->lum_stripe_count) > lmv_stripe_count) {
+               /* split */
+               struct md_layout_change *mlc = &info->mti_mlc;
+               struct mdt_object *tobj = NULL;
+               s64 mtime = ma->ma_attr.la_mtime;
+
+               ma->ma_need = MA_INODE;
+               ma->ma_valid = 0;
+               rc = mdt_attr_get_complex(info, child, ma);
+               if (rc)
+                       RETURN(rc);
+
+               if (!(ma->ma_valid & MA_INODE))
+                       RETURN(-EBADF);
+
+               /* mtime is from from client or set outside */
+               ma->ma_attr.la_mtime = mtime;
+
+               if (!lmv_stripe_count) {
+                       /* if child is plain directory, allocate @tobj as the
+                        * master object, and make child the first stripe of
+                        * @tobj.
+                        */
+                       tobj = mdt_object_new(env, mdt, tfid);
+                       if (unlikely(IS_ERR(tobj)))
+                               RETURN(PTR_ERR(tobj));
+               }
+
+               mlc->mlc_opc = MD_LAYOUT_SPLIT;
+               mlc->mlc_parent = mdt_object_child(parent);
+               mlc->mlc_target = tobj ? mdt_object_child(tobj) : NULL;
+               mlc->mlc_attr = &ma->ma_attr;
+               mlc->mlc_name = lname;
+               mlc->mlc_spec = spec;
+               rc = mo_layout_change(env, mdt_object_child(child), mlc);
+               if (!rc) {
+                       /* FID and attr need to be replied to client for manual
+                        * restripe.
+                        */
+                       ma->ma_need = MA_INODE;
+                       ma->ma_valid = 0;
+                       rc = mdt_attr_get_complex(info,
+                                       lmv_stripe_count ? child : tobj, ma);
+               }
+               if (tobj)
+                       mdt_object_put(env, tobj);
+               else
+                       *tfid = *mdt_object_fid(child);
+       } else {
+               /* merge only needs to override LMV */
+               struct lu_buf *buf = &info->mti_buf;
+               __u32 version;
+
+               LASSERT(ma->ma_valid & MA_LMV);
+               lmv = &ma->ma_lmv->lmv_md_v1;
+               version = cpu_to_le32(lmv->lmv_layout_version);
+
+               /* adjust 0 to 1 */
+               if (lum->lum_stripe_count == 0)
+                       lum->lum_stripe_count = cpu_to_le32(1);
+
+               lmv->lmv_hash_type |= cpu_to_le32(LMV_HASH_FLAG_MERGE |
+                                                 LMV_HASH_FLAG_MIGRATION);
+               lmv->lmv_merge_offset = lum->lum_stripe_count;
+               lmv->lmv_merge_hash = lum->lum_hash_type;
+               lmv->lmv_layout_version = cpu_to_le32(++version);
+
+               buf->lb_buf = lmv;
+               buf->lb_len = sizeof(*lmv);
+               rc = mo_xattr_set(env, mdt_object_child(child), buf,
+                                 XATTR_NAME_LMV, LU_XATTR_REPLACE);
+               if (rc)
+                       RETURN(rc);
+
+               *tfid = *mdt_object_fid(child);
+               ma->ma_need = MA_INODE;
+               ma->ma_valid = 0;
+               rc = mdt_attr_get_complex(info, child, ma);
+       }
+
+       RETURN(rc);
+}
+
+static int mdt_auto_split(struct mdt_thread_info *info)
+{
+       const struct lu_env *env = info->mti_env;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
+       struct md_attr *ma = &info->mti_attr;
+       struct md_op_spec *spec = &info->mti_spec;
+       struct lu_name *lname = &info->mti_name;
+       struct lu_fid *fid = &info->mti_tmp_fid2;
+       struct mdt_object *parent = NULL;
+       struct mdt_object *child = NULL;
+       struct mdt_object *stripe = NULL;
+       struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+       struct mdt_lock_handle *lhp;
+       struct mdt_lock_handle *lhc;
+       u32 lmv_stripe_count = 0;
+       u32 lum_stripe_count = 0;
+       int rc;
+
+       ENTRY;
+
+       if (!atomic_read(&mdt->mdt_mds_mds_conns))
+               RETURN(-EINVAL);
+
+       spin_lock(&restriper->mdr_lock);
+       if (!list_empty(&restriper->mdr_auto_splitting)) {
+               child = list_entry(restriper->mdr_auto_splitting.next,
+                                  typeof(*child), mot_restripe_linkage);
+               list_del_init(&child->mot_restripe_linkage);
+       }
+       spin_unlock(&restriper->mdr_lock);
+
+       if (!child)
+               RETURN(0);
+
+       LASSERT(child->mot_restriping);
+
+       rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
+       if (rc)
+               GOTO(out, rc);
+
+       if (ma->ma_valid & MA_LMV) {
+               /* stripe dirent exceeds threshold, find its master object */
+               struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+
+               /* auto-split won't be done on striped directory master object
+                * directly, because it's triggered when dirent count exceeds
+                * threshold, however dirent count of master object is its
+                * stripe count.
+                */
+               if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_STRIPE)
+                       GOTO(out, rc = -EINVAL);
+
+               lmv_stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+
+               /* save stripe to clear 'restriping' flag in the end to avoid
+                * trigger auto-split multiple times.
+                */
+               stripe = child;
+               child = NULL;
+
+               /* get master object FID from linkea */
+               rc = mdt_attr_get_pfid(info, stripe, &ma->ma_pfid);
+               if (rc)
+                       GOTO(out, rc);
+
+               child = mdt_object_find(env, mdt, &ma->ma_pfid);
+               if (IS_ERR(child))
+                       GOTO(out, rc = PTR_ERR(child));
+
+               spin_lock(&restriper->mdr_lock);
+               if (child->mot_restriping) {
+                       /* race? */
+                       spin_unlock(&restriper->mdr_lock);
+                       GOTO(out, rc = -EBUSY);
+               }
+               child->mot_restriping = 1;
+               spin_unlock(&restriper->mdr_lock);
+
+               /* skip if master object is remote, let the first stripe
+                * to start splitting because dir split needs to be done
+                * on where master object is.
+                */
+               if (mdt_object_remote(child))
+                       GOTO(restriping_clear, rc = -EREMOTE);
+       }
+
+       /* striped directory split adds mdr_auto_split_delta stripes */
+       lum_stripe_count = min_t(unsigned int,
+                               lmv_stripe_count +
+                                       mdt->mdt_restriper.mdr_dir_split_delta,
+                               atomic_read(&mdt->mdt_mds_mds_conns) + 1);
+       if (lmv_stripe_count >= lum_stripe_count)
+               GOTO(restriping_clear, rc = -EALREADY);
+
+       /* get dir name and parent FID */
+       rc = mdt_attr_get_pfid_name(info, child, fid, lname);
+       if (rc)
+               GOTO(restriping_clear, rc);
+
+       /* copy name out because mti_linkea will be used later, and name should
+        * end with '\0'
+        */
+       memcpy(info->mti_filename, lname->ln_name, lname->ln_namelen);
+       info->mti_filename[lname->ln_namelen] = '\0';
+       lname->ln_name = info->mti_filename;
+       CDEBUG(D_INFO, "split "DFID"/"DNAME" to count %u (MDT count %d)\n",
+              PFID(fid), PNAME(lname), lum_stripe_count,
+              atomic_read(&mdt->mdt_mds_mds_conns) + 1);
+
+       parent = mdt_object_find(env, mdt, fid);
+       if (IS_ERR(parent))
+               GOTO(restriping_clear, rc = PTR_ERR(parent));
+
+       rc = mdt_fid_alloc(env, mdt, fid, child, NULL);
+       if (rc < 0)
+               GOTO(restriping_clear, rc);
+
+       lhp = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_pdo_init(lhp, LCK_PW, lname);
+       rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
+                                  true);
+       if (rc)
+               GOTO(restriping_clear, rc);
+
+       lhc = &info->mti_lh[MDT_LH_CHILD];
+       mdt_lock_reg_init(lhc, LCK_EX);
+       if (mdt_object_remote(parent)) {
+               /* enqueue object remote LOOKUP lock */
+               rc = mdt_remote_object_lock(info, parent, mdt_object_fid(child),
+                                           &lhc->mlh_rreg_lh,
+                                           lhc->mlh_rreg_mode,
+                                           MDS_INODELOCK_LOOKUP, false);
+               if (rc != ELDLM_OK)
+                       GOTO(unlock_parent, rc);
+       }
+
+       rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
+                                   true);
+       if (rc)
+               GOTO(unlock_child, rc);
+
+       mdt_auto_split_prep(info, spec, ma, lum_stripe_count);
+
+       rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
+       EXIT;
+
+unlock_child:
+       mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+unlock_parent:
+       mdt_object_unlock(info, parent, lhp, rc);
+restriping_clear:
+       child->mot_restriping = 0;
+       LASSERT(list_empty(&child->mot_restripe_linkage));
+out:
+       /* -EALREADY:   dir is split already.
+        * -EBUSY:      dir is opened, or is splitting by others.
+        * -EREMOTE:    dir is remote.
+        */
+       if (rc && rc != -EALREADY && rc != -EBUSY && rc != -EREMOTE)
+               CERROR("%s: split "DFID"/"DNAME" to count %u failed: rc = %d\n",
+                      mdt_obd_name(mdt), PFID(mdt_object_fid(child)),
+                      PNAME(lname), lum_stripe_count, rc);
+
+       if (!IS_ERR_OR_NULL(child))
+               mdt_object_put(env, child);
+
+       if (stripe) {
+               LASSERT(stripe->mot_restriping);
+               LASSERT(list_empty(&stripe->mot_restripe_linkage));
+               stripe->mot_restriping = 0;
+               /* lock may not be taken, don't cache stripe LMV */
+               mo_invalidate(env, mdt_object_child(stripe));
+               mdt_object_put(env, stripe);
+       }
+
+       if (!IS_ERR_OR_NULL(parent))
+               mdt_object_put(env, parent);
+
+       return rc;
+}
+
+/* sub-files under one stripe are migrated, clear MIGRATION flag in its LMV */
+static int mdt_restripe_migrate_finish(struct mdt_thread_info *info,
+                                      struct mdt_object *stripe,
+                                      struct lmv_mds_md_v1 *lmv)
+{
+       struct mdt_device *mdt = info->mti_mdt;
+       struct lu_buf buf;
+       struct mdt_lock_handle *lh;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE);
+       LASSERT(lmv_is_restriping(lmv));
+
+       lmv->lmv_hash_type &= ~cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
+       buf.lb_buf = lmv;
+       buf.lb_len = sizeof(*lmv);
+
+       lh = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_reint_object_lock(info, stripe, lh, MDS_INODELOCK_XATTR,
+                                  false);
+       if (!rc)
+               rc = mo_xattr_set(info->mti_env, mdt_object_child(stripe), &buf,
+                                 XATTR_NAME_LMV, LU_XATTR_REPLACE);
+       mdt_object_unlock(info, stripe, lh, rc);
+       if (rc)
+               CERROR("%s: update "DFID" LMV failed: rc = %d\n",
+                      mdt_obd_name(mdt), PFID(mdt_object_fid(stripe)), rc);
+
+       LASSERT(!list_empty(&stripe->mot_restripe_linkage));
+       LASSERT(stripe->mot_restriping);
+
+       spin_lock(&mdt->mdt_lock);
+       stripe->mot_restriping = 0;
+       list_del_init(&stripe->mot_restripe_linkage);
+       spin_unlock(&mdt->mdt_lock);
+
+       mdt_object_put(info->mti_env, stripe);
+
+       RETURN(rc);
+}
+
+static void mdt_restripe_migrate_prep(struct mdt_thread_info *info,
+                                     const struct lu_fid *fid1,
+                                     const struct lu_fid *fid2,
+                                     const struct lu_name *lname,
+                                     __u16 type,
+                                     const struct lmv_mds_md_v1 *lmv)
+{
+       struct lu_attr *attr = &info->mti_attr.ma_attr;
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct md_op_spec *spec = &info->mti_spec;
+       struct lmv_user_md_v1 *lum;
+
+       attr->la_ctime = attr->la_mtime = ktime_get_real_seconds();
+       attr->la_valid = LA_CTIME | LA_MTIME;
+
+       rr->rr_fid1 = fid1;
+       rr->rr_fid2 = fid2;
+       rr->rr_name = *lname;
+
+       lum = &info->mti_mdt->mdt_restriper.mdr_lmv.lmv_user_md;
+       lum->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
+       lum->lum_stripe_offset = cpu_to_le32(LMV_OFFSET_DEFAULT);
+       if (lmv_is_splitting(lmv)) {
+               lum->lum_stripe_count = lmv->lmv_stripe_count;
+               lum->lum_hash_type =
+                       lmv->lmv_hash_type & le32_to_cpu(LMV_HASH_TYPE_MASK);
+       } else if (lmv_is_merging(lmv)) {
+               lum->lum_stripe_count = lmv->lmv_merge_offset;
+               lum->lum_hash_type = lmv->lmv_merge_hash;
+       }
+
+       spec->u.sp_ea.eadatalen = sizeof(*lum);
+       spec->u.sp_ea.eadata = lum;
+       spec->sp_cr_flags = MDS_OPEN_HAS_EA;
+       spec->no_create = 0;
+       spec->sp_migrate_close = 0;
+       /* if 'nsonly' is set, don't migrate inode */
+       if (S_ISDIR(type))
+               spec->sp_migrate_nsonly = 1;
+       else
+               spec->sp_migrate_nsonly =
+                       info->mti_mdt->mdt_dir_restripe_nsonly;
+}
+
+/* migrate sub-file from @mdr_restripe_offset */
+static int mdt_restripe_migrate(struct mdt_thread_info *info)
+{
+       const struct lu_env *env = info->mti_env;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
+       struct mdt_object *stripe = NULL;
+       struct mdt_object *master = NULL;
+       struct md_attr *ma = &info->mti_attr;
+       struct lmv_mds_md_v1 *lmv;
+       struct lu_name *lname = &info->mti_name;
+       struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
+       struct lu_fid fid1;
+       struct lu_fid fid2;
+       struct lu_dirpage *dp;
+       struct lu_dirent *ent;
+       const char *name = NULL;
+       int namelen = 0;
+       __u16 type;
+       int idx = 0;
+       int len;
+       int rc;
+
+       ENTRY;
+
+       if (list_empty(&restriper->mdr_migrating))
+               RETURN(0);
+
+       stripe = list_entry(restriper->mdr_migrating.next, typeof(*stripe),
+                           mot_restripe_linkage);
+
+       /* get master object FID and stripe name */
+       rc = mdt_attr_get_pfid_name(info, stripe, &fid1, lname);
+       if (rc)
+               GOTO(out, rc);
+
+       snprintf(info->mti_filename, sizeof(info->mti_filename), DFID,
+                PFID(mdt_object_fid(stripe)));
+       len = strlen(info->mti_filename) + 1;
+       if (len >= lname->ln_namelen)
+               GOTO(out, rc = -EBADF);
+
+       while (len < lname->ln_namelen) {
+               if (!isdigit(lname->ln_name[len]))
+                       GOTO(out, rc = -EBADF);
+
+               idx = idx * 10 + lname->ln_name[len++] - '0';
+       };
+
+       /* check whether stripe is newly created in split */
+       rc = mdt_stripe_get(info, stripe, ma, XATTR_NAME_LMV);
+       if (rc)
+               GOTO(out, rc);
+
+       if (!(ma->ma_valid & MA_LMV))
+               GOTO(out, rc = -ENODATA);
+
+       lmv = &ma->ma_lmv->lmv_md_v1;
+       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_STRIPE)
+               GOTO(out, rc = -EBADF);
+
+       if (!lmv_is_restriping(lmv))
+               GOTO(out, rc = -EINVAL);
+
+       if ((lmv_is_splitting(lmv) &&
+            idx >= le32_to_cpu(lmv->lmv_split_offset)) ||
+           (lmv_is_merging(lmv) &&
+            le32_to_cpu(lmv->lmv_hash_type) == LMV_HASH_TYPE_CRUSH &&
+            idx < le32_to_cpu(lmv->lmv_merge_offset))) {
+               /* new stripes doesn't need to migrate sub files in dir
+                * split, neither for target stripes in dir merge if hash type
+                * is CRUSH.
+                */
+               rc = mdt_restripe_migrate_finish(info, stripe, lmv);
+               RETURN(rc);
+       }
+
+       /* get sub file name @mot_restripe_offset.
+        * TODO: read one dirent instead of whole page.
+        */
+       rdpg->rp_hash = stripe->mot_restripe_offset;
+       rdpg->rp_count = PAGE_SIZE;
+       rdpg->rp_npages = 1;
+       rdpg->rp_attrs = LUDA_64BITHASH | LUDA_FID | LUDA_TYPE;
+       rdpg->rp_pages = &restriper->mdr_page;
+       rc = mo_readpage(env, mdt_object_child(stripe), rdpg);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       dp = page_address(restriper->mdr_page);
+       for (ent = lu_dirent_start(dp); ent; ent = lu_dirent_next(ent)) {
+               LASSERT(le64_to_cpu(ent->lde_hash) >= rdpg->rp_hash);
+
+               if (unlikely(!(le32_to_cpu(ent->lde_attrs) & LUDA_TYPE)))
+                       GOTO(out, rc = -EINVAL);
+
+               namelen = le16_to_cpu(ent->lde_namelen);
+               if (!namelen)
+                       continue;
+
+               if (name_is_dot_or_dotdot(ent->lde_name, namelen))
+                       continue;
+
+               name = ent->lde_name;
+               type = lu_dirent_type_get(ent);
+               break;
+       }
+
+       if (!name) {
+               if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) {
+                       rc = mdt_restripe_migrate_finish(info, stripe, lmv);
+                       RETURN(rc);
+               }
+
+               GOTO(out, rc = -EBADF);
+       }
+
+       /* copy name out because it should end with '\0' */
+       memcpy(info->mti_filename, name, namelen);
+       info->mti_filename[namelen] = '\0';
+       lname->ln_name = info->mti_filename;
+       lname->ln_namelen = namelen;
+
+       CDEBUG(D_INFO, "migrate "DFID"/"DNAME" type %ho\n",
+              PFID(&fid1), PNAME(lname), type);
+
+       master = mdt_object_find(env, mdt, &fid1);
+       if (IS_ERR(master))
+               GOTO(out, rc = PTR_ERR(master));
+
+       rc = mdt_fid_alloc(env, mdt, &fid2, master, lname);
+       mdt_object_put(env, master);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       mdt_restripe_migrate_prep(info, &fid1, &fid2, lname, type, lmv);
+
+       rc = mdt_reint_migrate(info, NULL);
+       /* mti_big_buf is allocated in XATTR migration */
+       if (unlikely(info->mti_big_buf.lb_buf))
+               lu_buf_free(&info->mti_big_buf);
+       if (rc == -EALREADY)
+               rc = 0;
+       if (rc)
+               GOTO(out, rc);
+
+       LASSERT(ent);
+       do {
+               ent = lu_dirent_next(ent);
+               if (!ent)
+                       break;
+
+               namelen = le16_to_cpu(ent->lde_namelen);
+       } while (namelen == 0); /* Skip dummy record */
+
+       if (ent)
+               stripe->mot_restripe_offset = le64_to_cpu(ent->lde_hash);
+       else
+               stripe->mot_restripe_offset = le64_to_cpu(dp->ldp_hash_end);
+
+       EXIT;
+out:
+       if (rc) {
+               /* -EBUSY: file is opened by others */
+               if (rc != -EBUSY)
+                       CERROR("%s: migrate "DFID"/"DNAME" failed: rc = %d\n",
+                              mdt_obd_name(mdt), PFID(&fid1), PNAME(lname),
+                              rc);
+
+               spin_lock(&mdt->mdt_lock);
+               stripe->mot_restriping = 0;
+               list_del_init(&stripe->mot_restripe_linkage);
+               spin_unlock(&mdt->mdt_lock);
+
+               mdt_object_put(env, stripe);
+       }
+
+       return rc;
+}
+
+static inline bool mdt_restripe_update_pending(struct mdt_thread_info *info)
+{
+       struct mdt_device *mdt = info->mti_mdt;
+
+       if (list_empty(&mdt->mdt_restriper.mdr_updating))
+               return false;
+
+       return mdt->mdt_restriper.mdr_update_time < ktime_get_real_seconds();
+}
+
+static void mdt_restripe_layout_update_prep(struct mdt_thread_info *info,
+                                           const struct lu_fid *fid,
+                                           const struct lmv_mds_md_v1 *lmv)
+{
+       struct lu_attr *attr = &info->mti_attr.ma_attr;
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct lmv_user_md_v1 *lum;
+
+       attr->la_ctime = attr->la_mtime = ktime_get_real_seconds();
+       attr->la_valid = LA_CTIME | LA_MTIME;
+
+       strncpy(info->mti_filename, XATTR_NAME_LMV,
+               sizeof(info->mti_filename));
+
+       lum = &info->mti_mdt->mdt_restriper.mdr_lmv.lmv_user_md;
+       lum->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
+       lum->lum_stripe_offset = cpu_to_le32(LMV_OFFSET_DEFAULT);
+       if (lmv_is_splitting(lmv)) {
+               lum->lum_stripe_count = lmv->lmv_stripe_count;
+               lum->lum_hash_type =
+                       lmv->lmv_hash_type & le32_to_cpu(LMV_HASH_TYPE_MASK);
+       } else if (lmv_is_merging(lmv)) {
+               lum->lum_stripe_count = lmv->lmv_merge_offset;
+               lum->lum_hash_type = lmv->lmv_merge_hash;
+       }
+
+       rr->rr_opcode = REINT_SETXATTR;
+       rr->rr_fid1 = fid;
+       rr->rr_name.ln_name = info->mti_filename;
+       rr->rr_name.ln_namelen = strlen(info->mti_filename);
+       rr->rr_eadata = lum;
+       rr->rr_eadatalen = sizeof(*lum);
+}
+
+static int mdt_restripe_layout_update(struct mdt_thread_info *info)
+{
+       const struct lu_env *env = info->mti_env;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
+       struct md_attr *ma = &info->mti_attr;
+       struct lu_fid *fid = &info->mti_tmp_fid1;
+       struct mdt_object *master;
+       struct mdt_object *stripe;
+       struct lmv_mds_md_v1 *lmv;
+       int i;
+       int rc;
+
+       ENTRY;
+
+       if (list_empty(&restriper->mdr_updating))
+               RETURN(0);
+
+       master = list_entry(restriper->mdr_updating.next, typeof(*master),
+                           mot_restripe_linkage);
+
+       rc = mdt_stripe_get(info, master, ma, XATTR_NAME_LMV);
+       if (rc)
+               GOTO(out, rc);
+
+       if (!(ma->ma_valid & MA_LMV))
+               GOTO(out, rc = -ENODATA);
+
+       lmv = &ma->ma_lmv->lmv_md_v1;
+       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
+               GOTO(out, rc = -EBADF);
+
+       if (!lmv_is_restriping(lmv))
+               GOTO(out, rc = -EINVAL);
+
+       /* use different buffer to store stripe LMV */
+       ma->ma_lmv = &restriper->mdr_lmv;
+       ma->ma_lmv_size = sizeof(restriper->mdr_lmv);
+       for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
+               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
+               stripe = mdt_object_find(env, mdt, fid);
+               if (IS_ERR(stripe))
+                       GOTO(out, rc = PTR_ERR(stripe));
+
+               ma->ma_valid = 0;
+               rc = __mdt_stripe_get(info, stripe, ma, XATTR_NAME_LMV);
+               /* LMV is checked without lock, don't cache it */
+               mo_invalidate(env, mdt_object_child(stripe));
+               mdt_object_put(env, stripe);
+               if (rc)
+                       GOTO(out, rc);
+
+               if (!(ma->ma_valid & MA_LMV))
+                       GOTO(out, rc = -ENODATA);
+
+               /* check MIGRATION flag cleared on all stripes */
+               if (lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
+                       GOTO(out, rc = -EINPROGRESS);
+       }
+
+       mdt_restripe_layout_update_prep(info, mdt_object_fid(master), lmv);
+
+       rc = mdt_dir_layout_update(info);
+       if (rc) {
+               CERROR("update "DFID" layout failed: rc = %d\n",
+                      PFID(mdt_object_fid(master)), rc);
+               GOTO(out, rc);
+       }
+
+out:
+       LASSERT(!list_empty(&master->mot_restripe_linkage));
+       if (rc == -EINPROGRESS) {
+               restriper->mdr_update_time = ktime_get_real_seconds() + 5;
+       } else {
+               spin_lock(&restriper->mdr_lock);
+               master->mot_restriping = 0;
+               list_del_init(&master->mot_restripe_linkage);
+               spin_unlock(&restriper->mdr_lock);
+
+               mdt_object_put(env, master);
+       }
+
+       return rc;
+}
+
+static int mdt_restriper_main(void *arg)
+{
+       struct mdt_thread_info *info = arg;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
+
+       ENTRY;
+
+       while (({set_current_state(TASK_IDLE);
+                !kthread_should_stop(); })) {
+               if (!list_empty(&restriper->mdr_auto_splitting)) {
+                       __set_current_state(TASK_RUNNING);
+                       mdt_auto_split(info);
+                       cond_resched();
+               } else if (mdt_restripe_update_pending(info)) {
+                       __set_current_state(TASK_RUNNING);
+                       mdt_restripe_layout_update(info);
+                       cond_resched();
+               } else if (!list_empty(&restriper->mdr_migrating)) {
+                       __set_current_state(TASK_RUNNING);
+                       mdt_restripe_migrate(info);
+                       cond_resched();
+               } else {
+                       schedule();
+               }
+       }
+       __set_current_state(TASK_RUNNING);
+
+       RETURN(0);
+}
+
+int mdt_restriper_start(struct mdt_device *mdt)
+{
+       struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
+       struct task_struct *task;
+       struct mdt_thread_info *info;
+       struct lu_ucred *uc;
+       int rc;
+
+       ENTRY;
+
+       spin_lock_init(&restriper->mdr_lock);
+       INIT_LIST_HEAD(&restriper->mdr_auto_splitting);
+       INIT_LIST_HEAD(&restriper->mdr_migrating);
+       INIT_LIST_HEAD(&restriper->mdr_updating);
+       restriper->mdr_dir_split_count = DIR_SPLIT_COUNT_DEFAULT;
+       restriper->mdr_dir_split_delta = DIR_SPLIT_DELTA_DEFAULT;
+
+       restriper->mdr_page = alloc_page(GFP_KERNEL);
+       if (!restriper->mdr_page)
+               RETURN(-ENOMEM);
+
+       rc = lu_env_init(&restriper->mdr_env, LCT_MD_THREAD);
+       if (rc)
+               GOTO(out_page, rc);
+
+       rc = lu_context_init(&restriper->mdr_session, LCT_SERVER_SESSION);
+       if (rc)
+               GOTO(out_env, rc);
+
+       lu_context_enter(&restriper->mdr_session);
+       restriper->mdr_env.le_ses = &restriper->mdr_session;
+
+       info = lu_context_key_get(&restriper->mdr_env.le_ctx, &mdt_thread_key);
+       info->mti_env = &restriper->mdr_env;
+       info->mti_mdt = mdt;
+       info->mti_pill = NULL;
+       info->mti_dlm_req = NULL;
+
+       uc = mdt_ucred(info);
+       uc->uc_valid = UCRED_OLD;
+       uc->uc_o_uid = 0;
+       uc->uc_o_gid = 0;
+       uc->uc_o_fsuid = 0;
+       uc->uc_o_fsgid = 0;
+       uc->uc_uid = 0;
+       uc->uc_gid = 0;
+       uc->uc_fsuid = 0;
+       uc->uc_fsgid = 0;
+       uc->uc_suppgids[0] = -1;
+       uc->uc_suppgids[1] = -1;
+       uc->uc_cap = CFS_CAP_FS_MASK;
+       uc->uc_umask = 0644;
+       uc->uc_ginfo = NULL;
+       uc->uc_identity = NULL;
+
+       task = kthread_create(mdt_restriper_main, info, "mdt_restriper_%03d",
+                             mdt_seq_site(mdt)->ss_node_id);
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CERROR("%s: Can't start directory restripe thread: rc %d\n",
+                      mdt_obd_name(mdt), rc);
+               GOTO(out_ses, rc);
+       }
+       restriper->mdr_task = task;
+       wake_up_process(task);
+
+       RETURN(0);
+
+out_ses:
+       lu_context_exit(restriper->mdr_env.le_ses);
+       lu_context_fini(restriper->mdr_env.le_ses);
+out_env:
+       lu_env_fini(&restriper->mdr_env);
+out_page:
+       __free_page(restriper->mdr_page);
+
+       return rc;
+}
+
+void mdt_restriper_stop(struct mdt_device *mdt)
+{
+       struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
+       struct lu_env *env = &restriper->mdr_env;
+       struct mdt_object *mo, *next;
+
+       if (!restriper->mdr_task)
+               return;
+
+       kthread_stop(restriper->mdr_task);
+       restriper->mdr_task = NULL;
+
+       list_for_each_entry_safe(mo, next, &restriper->mdr_auto_splitting,
+                                mot_restripe_linkage) {
+               list_del_init(&mo->mot_restripe_linkage);
+               mdt_object_put(env, mo);
+       }
+
+       list_for_each_entry_safe(mo, next, &restriper->mdr_migrating,
+                                mot_restripe_linkage) {
+               list_del_init(&mo->mot_restripe_linkage);
+               mdt_object_put(env, mo);
+       }
+
+       list_for_each_entry_safe(mo, next, &restriper->mdr_updating,
+                                mot_restripe_linkage) {
+               list_del_init(&mo->mot_restripe_linkage);
+               mdt_object_put(env, mo);
+       }
+
+       __free_page(restriper->mdr_page);
+
+       lu_context_exit(env->le_ses);
+       lu_context_fini(env->le_ses);
+       lu_env_fini(env);
+}
index 2f0fe84..91f5fdc 100644 (file)
@@ -318,7 +318,7 @@ out:
 }
 
 /* update dir layout after migration/restripe */
-static int mdt_dir_layout_update(struct mdt_thread_info *info)
+int mdt_dir_layout_update(struct mdt_thread_info *info)
 {
        const struct lu_env *env = info->mti_env;
        struct mdt_device *mdt = info->mti_mdt;
@@ -347,14 +347,6 @@ static int mdt_dir_layout_update(struct mdt_thread_info *info)
            mdt->mdt_enable_remote_dir_gid != -1)
                RETURN(-EPERM);
 
-       /* mti_big_lmm is used to save LMV, but it may be uninitialized. */
-       if (unlikely(!info->mti_big_lmm)) {
-               info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
-               OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
-               if (!info->mti_big_lmm)
-                       RETURN(-ENOMEM);
-       }
-
        obj = mdt_object_find(env, mdt, rr->rr_fid1);
        if (IS_ERR(obj))
                RETURN(PTR_ERR(obj));
@@ -398,9 +390,6 @@ static int mdt_dir_layout_update(struct mdt_thread_info *info)
        if (rc)
                GOTO(unlock_pobj, rc);
 
-       ma->ma_lmv = info->mti_big_lmm;
-       ma->ma_lmv_size = info->mti_big_lmmsize;
-       ma->ma_valid = 0;
        rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
        if (rc)
                GOTO(unlock_obj, rc);
@@ -439,7 +428,7 @@ static int mdt_dir_layout_update(struct mdt_thread_info *info)
 
                if (lum_stripe_count > 1 && lmu->lum_hash_type &&
                    lmu->lum_hash_type !=
-                   (lmv->lmv_merge_hash & cpu_to_le32(LMV_HASH_TYPE_MASK))) {
+                   (lmv->lmv_hash_type & cpu_to_le32(LMV_HASH_TYPE_MASK))) {
                        CERROR("%s: "DFID" migrate mdt hash mismatch %u != %u\n",
                                mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
                                lmv->lmv_hash_type, lmu->lum_hash_type);
@@ -479,7 +468,7 @@ static int mdt_dir_layout_update(struct mdt_thread_info *info)
                }
 
                if (lmu->lum_stripe_offset != LMV_OFFSET_DEFAULT) {
-                       CERROR("%s: "DFID" dir split offset %u != -1\n",
+                       CERROR("%s: "DFID" dir merge offset %u != -1\n",
                                mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
                                lmu->lum_stripe_offset);
                        GOTO(unlock_obj, rc = -EINVAL);
@@ -488,7 +477,7 @@ static int mdt_dir_layout_update(struct mdt_thread_info *info)
                if (lmu->lum_hash_type &&
                    lmu->lum_hash_type !=
                    (lmv->lmv_merge_hash & cpu_to_le32(LMV_HASH_TYPE_MASK))) {
-                       CERROR("%s: "DFID" split hash mismatch %u != %u\n",
+                       CERROR("%s: "DFID" merge hash mismatch %u != %u\n",
                                mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
                                lmv->lmv_merge_hash, lmu->lum_hash_type);
                        GOTO(unlock_obj, rc = -EINVAL);
index 1fde0e9..1e10a42 100755 (executable)
@@ -17729,7 +17729,7 @@ run_test 230i "lfs migrate -m tolerates trailing slashes"
 
 test_230j() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs"
-       [ $MDS1_VERSION -lt $(version_code 2.11.52) ] &&
+       [ $MDS1_VERSION -lt $(version_code 2.13.52) ] &&
                skip "Need MDS version at least 2.11.52"
 
        $LFS mkdir -m 0 -c 1 $DIR/$tdir || error "mkdir $tdir failed"
@@ -17737,7 +17737,7 @@ test_230j() {
                error "create $tfile failed"
        cat /etc/passwd > $DIR/$tdir/$tfile
 
-       $LFS migrate -m 1 $DIR/$tdir
+       $LFS migrate -m 1 $DIR/$tdir || error "migrate failed"
 
        cmp /etc/passwd $DIR/$tdir/$tfile ||
                error "DoM file mismatch after migration"
@@ -17876,6 +17876,156 @@ test_230n() {
 }
 run_test 230n "Dir migration with mirrored file"
 
+test_230o() {
+       [ $MDSCOUNT -ge 2 ] || skip "needs >= 2 MDTs"
+       [ $MDS1_VERSION -ge $(version_code 2.13.52) ] ||
+               skip "Need MDS version at least 2.13.52"
+
+       local mdts=$(comma_list $(mdts_nodes))
+
+       local restripe_status
+       local delta
+       local i
+       local j
+
+       # in case "crush" hash type is not set
+       do_nodes $mdts "$LCTL set_param lod.*.mdt_hash=crush"
+
+       restripe_status=$(do_facet mds1 $LCTL get_param -n \
+                          mdt.*MDT0000.enable_dir_restripe)
+       do_nodes $mdts "$LCTL set_param mdt.*.enable_dir_restripe=1"
+       stack_trap "do_nodes $mdts $LCTL set_param \
+                   mdt.*.enable_dir_restripe=$restripe_status"
+
+       mkdir $DIR/$tdir
+       createmany -m $DIR/$tdir/f 100 ||
+               error "create files under remote dir failed $i"
+       createmany -d $DIR/$tdir/d 100 ||
+               error "create dirs under remote dir failed $i"
+
+       for i in $(seq 2 $MDSCOUNT); do
+               do_nodes $mdts "$LCTL set_param mdt.*.md_stats=clear > /dev/null"
+               $LFS setdirstripe -c $i $DIR/$tdir ||
+                       error "split -c $i $tdir failed"
+               wait_update $HOSTNAME \
+                       "$LFS getdirstripe -H $DIR/$tdir" "crush" 100 ||
+                       error "dir split not finished"
+               delta=$(do_nodes $mdts "lctl get_param -n mdt.*MDT*.md_stats" |
+                       awk '/migrate/ {sum += $2} END { print sum }')
+               echo "$delta files migrated when dir split from $((i - 1)) to $i stripes"
+               # delta is around total_files/stripe_count
+               [ $delta -lt $((200 /(i - 1))) ] ||
+                       error "$delta files migrated"
+       done
+}
+run_test 230o "dir split"
+
+test_230p() {
+       [ $MDSCOUNT -ge 2 ] || skip "needs >= 2 MDTs"
+       [ $MDS1_VERSION -ge $(version_code 2.13.52) ] ||
+               skip "Need MDS version at least 2.13.52"
+
+       local mdts=$(comma_list $(mdts_nodes))
+
+       local restripe_status
+       local delta
+       local i
+       local j
+
+       do_nodes $mdts "$LCTL set_param lod.*.mdt_hash=crush"
+
+       restripe_status=$(do_facet mds1 $LCTL get_param -n \
+                          mdt.*MDT0000.enable_dir_restripe)
+       do_nodes $mdts "$LCTL set_param mdt.*.enable_dir_restripe=1"
+       stack_trap "do_nodes $mdts $LCTL set_param \
+                   mdt.*.enable_dir_restripe=$restripe_status"
+
+       test_mkdir -c $MDSCOUNT -H crush $DIR/$tdir
+       createmany -m $DIR/$tdir/f 100 ||
+               error "create files under remote dir failed $i"
+       createmany -d $DIR/$tdir/d 100 ||
+               error "create dirs under remote dir failed $i"
+
+       for i in $(seq $((MDSCOUNT - 1)) -1 1); do
+               local mdt_hash="crush"
+
+               do_nodes $mdts "$LCTL set_param mdt.*.md_stats=clear > /dev/null"
+               $LFS setdirstripe -c $i $DIR/$tdir ||
+                       error "split -c $i $tdir failed"
+               [ $i -eq 1 ] && mdt_hash="none"
+               wait_update $HOSTNAME \
+                       "$LFS getdirstripe -H $DIR/$tdir" $mdt_hash 100 ||
+                       error "dir merge not finished"
+               delta=$(do_nodes $mdts "lctl get_param -n mdt.*MDT*.md_stats" |
+                       awk '/migrate/ {sum += $2} END { print sum }')
+               echo "$delta files migrated when dir merge from $((i + 1)) to $i stripes"
+               # delta is around total_files/stripe_count
+               [ $delta -lt $((200 / i)) ] ||
+                       error "$delta files migrated"
+       done
+}
+run_test 230p "dir merge"
+
+test_230q() {
+       [ $MDSCOUNT -ge 2 ] || skip "needs >= 2 MDTs"
+       [ $MDS1_VERSION -ge $(version_code 2.13.52) ] ||
+               skip "Need MDS version at least 2.13.52"
+
+       local mdts=$(comma_list $(mdts_nodes))
+       local saved_threshold=$(do_facet mds1 \
+                       $LCTL get_param -n mdt.*-MDT0000.dir_split_count)
+       local saved_delta=$(do_facet mds1 \
+                       $LCTL get_param -n mdt.*-MDT0000.dir_split_delta)
+       local threshold=100
+       local delta=2
+       local total=0
+       local stripe_count=0
+       local stripe_index
+       local nr_files
+
+       stack_trap "do_nodes $mdts $LCTL set_param \
+                   mdt.*.dir_split_count=$saved_threshold"
+       stack_trap "do_nodes $mdts $LCTL set_param \
+                   mdt.*.dir_split_delta=$saved_delta"
+       stack_trap "do_nodes $mdts $LCTL set_param mdt.*.dir_restripe_nsonly=1"
+       do_nodes $mdts "$LCTL set_param mdt.*.enable_dir_auto_split=1"
+       do_nodes $mdts "$LCTL set_param mdt.*.dir_split_count=$threshold"
+       do_nodes $mdts "$LCTL set_param mdt.*.dir_split_delta=$delta"
+       do_nodes $mdts "$LCTL set_param mdt.*.dir_restripe_nsonly=0"
+       do_nodes $mdts "$LCTL set_param lod.*.mdt_hash=crush"
+
+       $LFS mkdir -i -1 -c 1 $DIR/$tdir || error "mkdir $tdir failed"
+       stripe_index=$($LFS getdirstripe -i $DIR/$tdir)
+
+       while [ $stripe_count -lt $MDSCOUNT ]; do
+               createmany -m $DIR/$tdir/f $total $((threshold * 3 / 2)) ||
+                       error "create sub files failed"
+               stat $DIR/$tdir > /dev/null
+               total=$((total + threshold * 3 / 2))
+               stripe_count=$((stripe_count + delta))
+               [ $stripe_count -gt $MDSCOUNT ] && stripe_count=$MDSCOUNT
+
+               wait_update $HOSTNAME \
+                       "$LFS getdirstripe -c $DIR/$tdir" "$stripe_count" 40 ||
+                       error "stripe count $($LFS getdirstripe -c $DIR/$tdir) != $stripe_count"
+
+               wait_update $HOSTNAME \
+                       "$LFS getdirstripe -H $DIR/$tdir" "crush" 200 ||
+                       error "stripe hash $($LFS getdirstripe -H $DIR/$tdir) != crush"
+
+               nr_files=$($LFS getstripe -m $DIR/$tdir/* |
+                          grep -w $stripe_index | wc -l)
+               echo "$nr_files files on MDT$stripe_index after split"
+               [ $nr_files -lt $((total / (stripe_count - 1))) ] ||
+                       error "$nr_files files on MDT$stripe_index after split"
+
+               nr_files=$(ls $DIR/$tdir | wc -w)
+               [ $nr_files -eq $total ] ||
+                       error "total sub files $nr_files != $total"
+       done
+}
+run_test 230q "dir auto split"
+
 test_231a()
 {
        # For simplicity this test assumes that max_pages_per_rpc