Whamcloud - gitweb
LU-11025 dne: support directory restripe 98/36898/14
authorLai Siyao <lai.siyao@whamcloud.com>
Sat, 10 Aug 2019 05:00:01 +0000 (13:00 +0800)
committerOleg Drokin <green@whamcloud.com>
Wed, 20 May 2020 08:22:39 +0000 (08:22 +0000)
This patch adds directory restripe support:
* 'lfs setdirstripe -m -1 -c <stripe_count>' on an existed directory
  will change this directory layout, if 'stripe_count' is larger than
  current count, new stripes are allocated after current stripes,
  otherwise merge stripes of this directory, NB, if stripe count is
  unchanged, but hash type changed, it's treated as merging, but
  rehashing actually.
* mdt_restripe() ia added to restripe directory.
* mdd_dir_declare_layout_split() is added to split directory, which
  handles both plain and striped directory split.
* lod_dir_declare_layout_split() will handle the internal of directory
  split.
* directory merge is simple compared to split, which just records
  target stripe count in LMV, and update it.

NB. this patch only restripe directory, but doesn't add the code to
migrate sub files, which will be implemented in the following patch.

Signed-off-by: Lai Siyao <lai.siyao@whamcloud.com>
Change-Id: I526f7423b909eb83cf8723e65981d713b3e42499
Reviewed-on: https://review.whamcloud.com/36898
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Hongchao Zhang <hongchao@whamcloud.com>
Reviewed-by: Yingjin Qian <qian@ddn.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
20 files changed:
lustre/include/lustre_lmv.h
lustre/include/md_object.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/lfsck/lfsck_striped_dir.c
lustre/lmv/lmv_obd.c
lustre/lod/lod_internal.h
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lproc.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c
lustre/ptlrpc/wiretest.c
lustre/utils/liblustreapi.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 002f07f..e33cdc1 100644 (file)
@@ -67,7 +67,7 @@ static inline bool lmv_dir_foreign(const struct lmv_stripe_md *lsm)
 static inline bool lmv_dir_layout_changing(const struct lmv_stripe_md *lsm)
 {
        return lmv_dir_striped(lsm) &&
-              (lsm->lsm_md_hash_type & LMV_HASH_FLAG_LAYOUT_CHANGE);
+              lmv_hash_is_layout_changing(lsm->lsm_md_hash_type);
 }
 
 static inline bool lmv_dir_bad_hash(const struct lmv_stripe_md *lsm)
@@ -275,6 +275,15 @@ lmv_hash_crush(unsigned int count, const char *name, int namelen)
        return idx;
 }
 
+/* directory layout may change in three ways:
+ * 1. directory migration, in its LMV source stripes are appended after
+ *    target stripes, \a migrate_hash is source hash type, \a migrate_offset is
+ *    target stripe count,
+ * 2. directory split, \a migrate_hash is hash type before split,
+ *    \a migrate_offset is stripe count before split.
+ * 3. directory merge, \a migrate_hash is hash type after merge,
+ *    \a migrate_offset is stripe count after merge.
+ */
 static inline int
 __lmv_name_to_stripe_index(__u32 hash_type, __u32 stripe_count,
                           __u32 migrate_hash, __u32 migrate_offset,
@@ -287,7 +296,17 @@ __lmv_name_to_stripe_index(__u32 hash_type, __u32 stripe_count,
        LASSERT(namelen > 0);
        LASSERT(stripe_count > 0);
 
-       if (hash_type & LMV_HASH_FLAG_MIGRATION) {
+       if (lmv_hash_is_splitting(hash_type)) {
+               if (!new_layout) {
+                       hash_type = migrate_hash;
+                       stripe_count = migrate_offset;
+               }
+       } else if (lmv_hash_is_merging(hash_type)) {
+               if (new_layout) {
+                       hash_type = migrate_hash;
+                       stripe_count = migrate_offset;
+               }
+       } else if (lmv_hash_is_migrating(hash_type)) {
                if (new_layout) {
                        stripe_count = migrate_offset;
                } else {
@@ -317,12 +336,12 @@ __lmv_name_to_stripe_index(__u32 hash_type, __u32 stripe_count,
 
        LASSERT(stripe_index < stripe_count);
 
-       if ((saved_hash & LMV_HASH_FLAG_MIGRATION) && !new_layout)
+       if (!new_layout && lmv_hash_is_migrating(saved_hash))
                stripe_index += migrate_offset;
 
        LASSERT(stripe_index < saved_count);
 
-       CDEBUG(D_INFO, "name %.*s hash %#x/%#x idx %d/%u/%u under %s layout\n",
+       CDEBUG(D_INFO, "name %.*s hash=%#x/%#x idx=%d/%u/%u under %s layout\n",
               namelen, name, saved_hash, migrate_hash, stripe_index,
               saved_count, migrate_offset, new_layout ? "new" : "old");
 
@@ -380,15 +399,19 @@ static inline bool lmv_user_magic_supported(__u32 lum_magic)
               lum_magic == LMV_MAGIC_FOREIGN;
 }
 
+/* master LMV is sane */
 static inline bool lmv_is_sane(const struct lmv_mds_md_v1 *lmv)
 {
+       if (!lmv)
+               return false;
+
        if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
                goto insane;
 
        if (le32_to_cpu(lmv->lmv_stripe_count) == 0)
                goto insane;
 
-       if (!lmv_is_known_hash_type(lmv->lmv_hash_type))
+       if (!lmv_is_known_hash_type(le32_to_cpu(lmv->lmv_hash_type)))
                goto insane;
 
        return true;
@@ -397,4 +420,59 @@ insane:
        return false;
 }
 
+/* LMV can be either master or stripe LMV */
+static inline bool lmv_is_sane2(const struct lmv_mds_md_v1 *lmv)
+{
+       if (!lmv)
+               return false;
+
+       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1 &&
+           le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_STRIPE)
+               goto insane;
+
+       if (le32_to_cpu(lmv->lmv_stripe_count) == 0)
+               goto insane;
+
+       if (!lmv_is_known_hash_type(le32_to_cpu(lmv->lmv_hash_type)))
+               goto insane;
+
+       return true;
+insane:
+       LMV_DEBUG(D_ERROR, lmv, "insane");
+       return false;
+}
+
+static inline bool lmv_is_splitting(const struct lmv_mds_md_v1 *lmv)
+{
+       LASSERT(lmv_is_sane2(lmv));
+       return lmv_hash_is_splitting(cpu_to_le32(lmv->lmv_hash_type));
+}
+
+static inline bool lmv_is_merging(const struct lmv_mds_md_v1 *lmv)
+{
+       LASSERT(lmv_is_sane2(lmv));
+       return lmv_hash_is_merging(cpu_to_le32(lmv->lmv_hash_type));
+}
+
+static inline bool lmv_is_migrating(const struct lmv_mds_md_v1 *lmv)
+{
+       LASSERT(lmv_is_sane(lmv));
+       return lmv_hash_is_migrating(cpu_to_le32(lmv->lmv_hash_type));
+}
+
+static inline bool lmv_is_restriping(const struct lmv_mds_md_v1 *lmv)
+{
+       LASSERT(lmv_is_sane2(lmv));
+       return lmv_hash_is_splitting(cpu_to_le32(lmv->lmv_hash_type)) ||
+              lmv_hash_is_merging(cpu_to_le32(lmv->lmv_hash_type));
+}
+
+static inline bool lmv_is_layout_changing(const struct lmv_mds_md_v1 *lmv)
+{
+       LASSERT(lmv_is_sane2(lmv));
+       return lmv_hash_is_splitting(cpu_to_le32(lmv->lmv_hash_type)) ||
+              lmv_hash_is_merging(cpu_to_le32(lmv->lmv_hash_type)) ||
+              lmv_hash_is_migrating(cpu_to_le32(lmv->lmv_hash_type));
+}
+
 #endif
index 263c55d..c6b02c7 100644 (file)
@@ -168,7 +168,8 @@ struct md_op_spec {
                     sp_cr_lookup:1, /* do lookup sanity check or not. */
                     sp_rm_entry:1,  /* only remove name entry */
                     sp_permitted:1, /* do not check permission */
-                    sp_migrate_close:1; /* close the file during migrate */
+                    sp_migrate_close:1, /* close the file during migrate */
+                    sp_migrate_nsonly:1; /* migrate dirent only */
        /** Current lock mode for parent dir where create is performing. */
        mdl_mode_t sp_cr_mode;
 
@@ -181,9 +182,10 @@ enum md_layout_opc {
        MD_LAYOUT_WRITE,        /* FLR: write the file */
        MD_LAYOUT_RESYNC,       /* FLR: resync starts */
        MD_LAYOUT_RESYNC_DONE,  /* FLR: resync done */
-       MD_LAYOUT_ATTACH,       /* attach stripes to target dir */
-       MD_LAYOUT_DETACH,       /* detach stripes from dir */
-       MD_LAYOUT_SHRINK,       /* shrink stripes (check empty and destroy) */
+       MD_LAYOUT_ATTACH,       /* attach stripes */
+       MD_LAYOUT_DETACH,       /* detach stripes */
+       MD_LAYOUT_SHRINK,       /* shrink striped directory (destroy stripes) */
+       MD_LAYOUT_SPLIT,        /* split directory (allocate new stripes) */
        MD_LAYOUT_MAX,
 };
 
@@ -191,13 +193,24 @@ enum md_layout_opc {
  * Parameters for layout change API.
  */
 struct md_layout_change {
-       enum md_layout_opc       mlc_opc;
-       __u16                    mlc_mirror_id;
-       struct layout_intent    *mlc_intent;
-       struct lu_buf            mlc_buf;
-       struct lustre_som_attrs  mlc_som;
-       size_t                   mlc_resync_count;
-       __u32                   *mlc_resync_ids;
+       enum md_layout_opc                       mlc_opc;
+       struct lu_buf                            mlc_buf;
+       union {
+               struct {
+                       __u16                    mlc_mirror_id;
+                       struct layout_intent    *mlc_intent;
+                       struct lustre_som_attrs  mlc_som;
+                       size_t                   mlc_resync_count;
+                       __u32                   *mlc_resync_ids;
+               }; /* file */
+               struct {
+                       struct md_object        *mlc_parent;    /* parent obj in plain dir split */
+                       struct md_object        *mlc_target;    /* target obj in plain dir split */
+                       struct lu_attr          *mlc_attr;      /* target attr in plain dir split */
+                       const struct lu_name    *mlc_name;      /* target name in plain dir split */
+                       struct md_op_spec       *mlc_spec;      /* dir split spec */
+               }; /* dir */
+       };
 };
 
 union ldlm_policy_data;
index 9b5eb9f..8c992d4 100644 (file)
@@ -2171,12 +2171,19 @@ struct lmv_mds_md_v1 {
 };
 
 #define LMV_DEBUG(mask, lmv, msg)                                      \
-       CDEBUG(mask, "%s LMV: magic %#x count %u index %u hash %#x version %u migrate offset %u migrate hash %u.\n",    \
+       CDEBUG(mask, "%s LMV: magic=%#x count=%u index=%u hash=%#x version=%u migrate offset=%u migrate hash=%u.\n",    \
               msg, (lmv)->lmv_magic, (lmv)->lmv_stripe_count,          \
               (lmv)->lmv_master_mdt_index, (lmv)->lmv_hash_type,       \
               (lmv)->lmv_layout_version, (lmv)->lmv_migrate_offset,    \
               (lmv)->lmv_migrate_hash)
 
+/* stripe count before directory split */
+#define lmv_split_offset       lmv_migrate_offset
+/* stripe count after directory merge */
+#define lmv_merge_offset       lmv_migrate_offset
+/* directory hash type after merge */
+#define lmv_merge_hash         lmv_migrate_hash
+
 /* foreign LMV EA */
 struct lmv_foreign_md {
        __u32 lfm_magic;        /* magic number = LMV_MAGIC_FOREIGN */
index a837ef1..3ca4e56 100644 (file)
@@ -1014,6 +1014,9 @@ static inline bool lmv_is_known_hash_type(__u32 type)
               (type & LMV_HASH_TYPE_MASK) == LMV_HASH_TYPE_CRUSH;
 }
 
+#define LMV_HASH_FLAG_MERGE            0x04000000
+#define LMV_HASH_FLAG_SPLIT            0x08000000
+
 /* The striped directory has ever lost its master LMV EA, then LFSCK
  * re-generated it. This flag is used to indicate such case. It is an
  * on-disk flag. */
@@ -1022,7 +1025,39 @@ static inline bool lmv_is_known_hash_type(__u32 type)
 #define LMV_HASH_FLAG_BAD_TYPE         0x20000000
 #define LMV_HASH_FLAG_MIGRATION                0x80000000
 
-#define LMV_HASH_FLAG_LAYOUT_CHANGE    LMV_HASH_FLAG_MIGRATION
+#define LMV_HASH_FLAG_LAYOUT_CHANGE    \
+       (LMV_HASH_FLAG_MIGRATION | LMV_HASH_FLAG_SPLIT | LMV_HASH_FLAG_MERGE)
+
+/* both SPLIT and MIGRATION are set for directory split */
+static inline bool lmv_hash_is_splitting(__u32 hash)
+{
+       return (hash & LMV_HASH_FLAG_LAYOUT_CHANGE) ==
+              (LMV_HASH_FLAG_SPLIT | LMV_HASH_FLAG_MIGRATION);
+}
+
+/* both MERGE and MIGRATION are set for directory merge */
+static inline bool lmv_hash_is_merging(__u32 hash)
+{
+       return (hash & LMV_HASH_FLAG_LAYOUT_CHANGE) ==
+              (LMV_HASH_FLAG_MERGE | LMV_HASH_FLAG_MIGRATION);
+}
+
+/* only MIGRATION is set for directory migration */
+static inline bool lmv_hash_is_migrating(__u32 hash)
+{
+       return (hash & LMV_HASH_FLAG_LAYOUT_CHANGE) == LMV_HASH_FLAG_MIGRATION;
+}
+
+static inline bool lmv_hash_is_restriping(__u32 hash)
+{
+       return lmv_hash_is_splitting(hash) || lmv_hash_is_merging(hash);
+}
+
+static inline bool lmv_hash_is_layout_changing(__u32 hash)
+{
+       return lmv_hash_is_splitting(hash) || lmv_hash_is_merging(hash) ||
+              lmv_hash_is_migrating(hash);
+}
 
 extern char *mdt_hash_name[LMV_HASH_TYPE_MAX];
 
index 2e7bef7..854504d 100644 (file)
@@ -979,7 +979,7 @@ static inline bool lfsck_name_hash_match(struct lmv_mds_md_v1 *lmv,
        if (idx == lmv->lmv_master_mdt_index)
                return true;
 
-       if (!(lmv->lmv_hash_type & LMV_HASH_FLAG_LAYOUT_CHANGE))
+       if (!lmv_hash_is_layout_changing(lmv->lmv_hash_type))
                return false;
 
        idx = lmv_name_to_stripe_index(lmv, name, namelen);
index 421a9b7..37d341d 100644 (file)
@@ -1768,6 +1768,7 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
        struct obd_device *obd = exp->exp_obd;
        struct lmv_obd *lmv = &obd->u.lmv;
        struct lmv_tgt_desc *tgt;
+       struct mdt_body *repbody;
        int rc;
 
        ENTRY;
@@ -1794,19 +1795,7 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
 
-       if (lmv_op_qos_mkdir(op_data)) {
-               tgt = lmv_locate_tgt_qos(lmv, &op_data->op_mds);
-               if (tgt == ERR_PTR(-EAGAIN))
-                       tgt = lmv_locate_tgt_rr(lmv, &op_data->op_mds);
-               /*
-                * only update statfs after QoS mkdir, this means the cached
-                * statfs may be stale, and current mkdir may not follow QoS
-                * accurately, but it's not serious, and avoids periodic statfs
-                * when client doesn't mkdir by QoS.
-                */
-               if (!IS_ERR(tgt))
-                       lmv_statfs_check_update(obd, tgt);
-       } else if (lmv_op_user_specific_mkdir(op_data)) {
+       if (lmv_op_user_specific_mkdir(op_data)) {
                struct lmv_user_md *lum = op_data->op_data;
 
                op_data->op_mds = le32_to_cpu(lum->lum_stripe_offset);
@@ -1819,11 +1808,22 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
                tgt = lmv_tgt(lmv, op_data->op_mds);
                if (!tgt)
                        RETURN(-ENODEV);
+       } else if (lmv_op_qos_mkdir(op_data)) {
+               tgt = lmv_locate_tgt_qos(lmv, &op_data->op_mds);
+               if (tgt == ERR_PTR(-EAGAIN))
+                       tgt = lmv_locate_tgt_rr(lmv, &op_data->op_mds);
+               if (IS_ERR(tgt))
+                       RETURN(PTR_ERR(tgt));
+               /*
+                * only update statfs after QoS mkdir, this means the cached
+                * statfs may be stale, and current mkdir may not follow QoS
+                * accurately, but it's not serious, and avoids periodic statfs
+                * when client doesn't mkdir by QoS.
+                */
+               lmv_statfs_check_update(obd, tgt);
        }
 
-       if (IS_ERR(tgt))
-               RETURN(PTR_ERR(tgt));
-
+retry:
        rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
        if (rc)
                RETURN(rc);
@@ -1841,7 +1841,30 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
                        RETURN(rc);
                CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
        }
-       RETURN(rc);
+
+       /* dir restripe needs to send to MDT where dir is located */
+       if (rc != -EREMOTE ||
+           !(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH))
+               RETURN(rc);
+
+       repbody = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
+       if (repbody == NULL)
+               RETURN(-EPROTO);
+
+       /* Not cross-ref case, just get out of here. */
+       if (likely(!(repbody->mbo_valid & OBD_MD_MDS)))
+               RETURN(rc);
+
+       op_data->op_fid2 = repbody->mbo_fid1;
+       ptlrpc_req_finished(*request);
+       *request = NULL;
+
+       tgt = lmv_fid2tgt(lmv, &op_data->op_fid2);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
+       op_data->op_mds = tgt->ltd_index;
+       goto retry;
 }
 
 static int
index df3eeba..8e36b08 100644 (file)
@@ -266,6 +266,9 @@ struct lod_object {
        };
 };
 
+#define ldo_dir_split_offset   ldo_dir_migrate_offset
+#define ldo_dir_split_hash     ldo_dir_migrate_hash
+
 #define lod_foreach_mirror_comp(comp, lo, mirror_idx)                      \
 for (comp = &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_start];  \
      comp <= &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_end];   \
@@ -279,6 +282,21 @@ static inline bool lod_is_flr(const struct lod_object *lo)
        return (lo->ldo_flr_state & LCM_FL_FLR_MASK) != LCM_FL_NONE;
 }
 
+static inline bool lod_is_splitting(const struct lod_object *lo)
+{
+       return lmv_hash_is_splitting(lo->ldo_dir_hash_type);
+}
+
+static inline bool lod_is_migrating(const struct lod_object *lo)
+{
+       return lmv_hash_is_migrating(lo->ldo_dir_hash_type);
+}
+
+static inline bool lod_is_layout_changing(const struct lod_object *lo)
+{
+       return lmv_hash_is_layout_changing(lo->ldo_dir_hash_type);
+}
+
 static inline int lod_set_pool(char **pool, const char *new_pool)
 {
        int len;
index bef2d6d..316d7ff 100644 (file)
@@ -1560,13 +1560,13 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
                        /* The on-disk LMV EA only contains header, but the
                         * returned LMV EA size should contain the space for
                         * the FIDs of all shards of the striped directory. */
-                       if (lmv_is_sane(lmv1))
+                       if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1)
                                rc = lmv_mds_md_size(
-                                       le32_to_cpu(lmv1->lmv_stripe_count),
-                                       le32_to_cpu(lmv1->lmv_magic));
+                                               le32_to_cpu(lmv1->lmv_stripe_count),
+                                               le32_to_cpu(lmv1->lmv_magic));
                } else {
-                       lfm = buf->lb_buf;
-                       if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN)
+                       lmv1 = buf->lb_buf;
+                       if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
                                RETURN(rc);
 
                        if (rc != sizeof(*lmv1))
@@ -1712,7 +1712,8 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
        lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
        lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
        lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
-       if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) {
+       lmm1->lmv_layout_version = cpu_to_le32(lo->ldo_dir_layout_version);
+       if (lod_is_layout_changing(lo)) {
                lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash);
                lmm1->lmv_migrate_offset =
                        cpu_to_le32(lo->ldo_dir_migrate_offset);
@@ -1818,6 +1819,8 @@ out:
        lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count);
        lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
        lo->ldo_dir_layout_version = le32_to_cpu(lmv1->lmv_layout_version);
+       lo->ldo_dir_migrate_offset = le32_to_cpu(lmv1->lmv_migrate_offset);
+       lo->ldo_dir_migrate_hash = le32_to_cpu(lmv1->lmv_migrate_hash);
        lo->ldo_dir_hash_type = le32_to_cpu(lmv1->lmv_hash_type);
        if (rc != 0)
                lod_striping_free_nolock(env, lo);
@@ -1889,31 +1892,74 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env,
                if (!dto)
                        continue;
 
-               rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+               /* directory split skip create for existing stripes */
+               if (!(lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) {
+                       rc = lod_sub_declare_create(env, dto, attr, NULL, dof,
+                                                   th);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-               if (!dt_try_as_dir(env, dto))
-                       GOTO(out, rc = -EINVAL);
+                       if (!dt_try_as_dir(env, dto))
+                               GOTO(out, rc = -EINVAL);
 
-               rc = lod_sub_declare_ref_add(env, dto, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       rc = lod_sub_declare_ref_add(env, dto, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = lod_sub_declare_insert(env, dto,
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)dot, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       rec->rec_fid = lu_object_fid(&dto->do_lu);
+                       rc = lod_sub_declare_insert(env, dto,
+                                                   (const struct dt_rec *)rec,
+                                                   (const struct dt_key *)dot,
+                                                   th);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-               /* master stripe FID will be put to .. */
-               rec->rec_fid = lu_object_fid(&dt->do_lu);
-               rc = lod_sub_declare_insert(env, dto,
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)dotdot, th);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       /* master stripe FID will be put to .. */
+                       rec->rec_fid = lu_object_fid(&dt->do_lu);
+                       rc = lod_sub_declare_insert(env, dto,
+                                                 (const struct dt_rec *)rec,
+                                                 (const struct dt_key *)dotdot,
+                                                 th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
+                           cfs_fail_val == i)
+                               snprintf(stripe_name, sizeof(info->lti_key),
+                                        DFID":%u",
+                                        PFID(lu_object_fid(&dto->do_lu)),
+                                        i + 1);
+                       else
+                               snprintf(stripe_name, sizeof(info->lti_key),
+                                        DFID":%u",
+                                        PFID(lu_object_fid(&dto->do_lu)), i);
+
+                       sname = lod_name_get(env, stripe_name,
+                                            strlen(stripe_name));
+                       rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+                                             sname, lu_object_fid(&dt->do_lu));
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+                       linkea_buf.lb_len = ldata.ld_leh->leh_len;
+                       rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
+                                                      XATTR_NAME_LINK, 0, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       rec->rec_fid = lu_object_fid(&dto->do_lu);
+                       rc = lod_sub_declare_insert(env, dt_object_child(dt),
+                                       (const struct dt_rec *)rec,
+                                       (const struct dt_key *)stripe_name, th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+
+                       rc = lod_sub_declare_ref_add(env, dt_object_child(dt),
+                                                    th);
+                       if (rc != 0)
+                               GOTO(out, rc);
+               }
 
                if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
                    cfs_fail_val != i) {
@@ -1929,39 +1975,6 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env,
                        if (rc != 0)
                                GOTO(out, rc);
                }
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
-                   cfs_fail_val == i)
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                               PFID(lu_object_fid(&dto->do_lu)), i + 1);
-               else
-                       snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                               PFID(lu_object_fid(&dto->do_lu)), i);
-
-               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
-               rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
-                                     sname, lu_object_fid(&dt->do_lu));
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
-               linkea_buf.lb_len = ldata.ld_leh->leh_len;
-               rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
-                                              XATTR_NAME_LINK, 0, th);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = lod_sub_declare_insert(env, dt_object_child(dt),
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)stripe_name,
-                                           th);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               rc = lod_sub_declare_ref_add(env, dt_object_child(dt), th);
-               if (rc != 0)
-                       GOTO(out, rc);
        }
 
        rc = lod_sub_declare_xattr_set(env, dt_object_child(dt),
@@ -2335,6 +2348,7 @@ static int lod_dir_layout_set(const struct lu_env *env,
 {
        struct dt_object *next = dt_object_child(dt);
        struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
        struct lmv_mds_md_v1 *lmv = buf->lb_buf;
        struct lmv_mds_md_v1 *slave_lmv;
        struct lu_buf slave_buf;
@@ -2343,10 +2357,29 @@ static int lod_dir_layout_set(const struct lu_env *env,
 
        ENTRY;
 
+       if (!lmv_is_sane2(lmv))
+               RETURN(-EINVAL);
+
+       /* adjust hash for dir merge, which may not be set in user command */
+       if (lmv_is_merging(lmv) && !lmv->lmv_migrate_hash)
+               lmv->lmv_merge_hash =
+                       lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern;
+
+       LMV_DEBUG(D_INFO, lmv, "set");
+
        rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, fl, th);
        if (rc)
                RETURN(rc);
 
+       /* directory restripe may update stripe LMV directly */
+       if (!lo->ldo_dir_stripe_count)
+               RETURN(0);
+
+       lo->ldo_dir_hash_type = le32_to_cpu(lmv->lmv_hash_type);
+       lo->ldo_dir_migrate_offset = le32_to_cpu(lmv->lmv_migrate_offset);
+       lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_migrate_hash);
+       lo->ldo_dir_layout_version = le32_to_cpu(lmv->lmv_layout_version);
+
        OBD_ALLOC_PTR(slave_lmv);
        if (!slave_lmv)
                RETURN(-ENOMEM);
@@ -2368,7 +2401,6 @@ static int lod_dir_layout_set(const struct lu_env *env,
                        break;
        }
 
-       lod_striping_free(env, lod_dt_obj(dt));
        OBD_FREE_PTR(slave_lmv);
 
        RETURN(rc);
@@ -3743,9 +3775,15 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE))
                        continue;
 
-               /* if it's source stripe of migrating directory, don't create */
-               if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) &&
-                     i >= lo->ldo_dir_migrate_offset)) {
+               /* don't create stripe if:
+                * 1. it's source stripe of migrating directory
+                * 2. it's existed stripe of splitting directory
+                */
+               if ((lod_is_migrating(lo) && i >= lo->ldo_dir_migrate_offset) ||
+                   (lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) {
+                       if (!dt_object_exists(dto))
+                               GOTO(out, rc = -EINVAL);
+               } else {
                        dt_write_lock(env, dto, DT_TGT_CHILD);
                        rc = lod_sub_create(env, dto, attr, NULL, dof, th);
                        if (rc != 0) {
@@ -3766,12 +3804,6 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                                GOTO(out, rc);
                }
 
-               rec->rec_fid = lu_object_fid(&dt->do_lu);
-               rc = lod_sub_insert(env, dto, (struct dt_rec *)rec,
-                                   (const struct dt_key *)dotdot, th);
-               if (rc != 0)
-                       GOTO(out, rc);
-
                if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) ||
                    cfs_fail_val != i) {
                        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) &&
@@ -3788,6 +3820,21 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                                GOTO(out, rc);
                }
 
+               /* don't insert stripe if it's existed stripe of splitting
+                * directory (this directory is striped).
+                * NB, plain directory will insert itself as the first
+                * stripe in target.
+                */
+               if (lod_is_splitting(lo) && lo->ldo_dir_split_offset > 1 &&
+                   lo->ldo_dir_split_offset > i)
+                       continue;
+
+               rec->rec_fid = lu_object_fid(&dt->do_lu);
+               rc = lod_sub_insert(env, dto, (struct dt_rec *)rec,
+                                   (const struct dt_key *)dotdot, th);
+               if (rc != 0)
+                       GOTO(out, rc);
+
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
                    cfs_fail_val == i)
                        snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
@@ -7727,7 +7774,12 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env,
        lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type);
        lo->ldo_dir_stripe_count += stripe_count;
        lo->ldo_dir_stripes_allocated += stripe_count;
-       lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION;
+
+       /* plain directory split creates target as a plain directory, while
+        * after source attached as the first stripe, it becomes a striped
+        * directory, set correct do_index_ops, otherwise it can't be unlinked.
+        */
+       dt->do_index_ops = &lod_striped_index_ops;
 
        RETURN(0);
 out:
@@ -7906,6 +7958,86 @@ static int lod_dir_declare_layout_shrink(const struct lu_env *env,
        return rc;
 }
 
+/**
+ * Allocate stripes for split directory.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       target object
+ * \param[in] mlc      layout change data
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 on success
+ * \retval             negative if failed
+ */
+static int lod_dir_declare_layout_split(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       const struct md_layout_change *mlc,
+                                       struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object_format *dof = &info->lti_format;
+       struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata;
+       struct dt_object **stripes;
+       u32 stripe_count;
+       u32 saved_count;
+       int i;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
+       LASSERT(le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT);
+
+       saved_count = lo->ldo_dir_stripes_allocated;
+       stripe_count = le32_to_cpu(lum->lum_stripe_count);
+       if (stripe_count <= saved_count)
+               RETURN(-EINVAL);
+
+       dof->dof_type = DFT_DIR;
+
+       OBD_ALLOC(stripes, sizeof(*stripes) * stripe_count);
+       if (!stripes)
+               RETURN(-ENOMEM);
+
+       for (i = 0; i < lo->ldo_dir_stripes_allocated; i++)
+               stripes[i] = lo->ldo_stripe[i];
+
+       lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs);
+       rc = lod_mdt_alloc_qos(env, lo, stripes, saved_count, stripe_count);
+       if (rc == -EAGAIN)
+               rc = lod_mdt_alloc_rr(env, lo, stripes, saved_count,
+                                     stripe_count);
+       if (rc < 0) {
+               OBD_FREE(stripes, sizeof(*stripes) * stripe_count);
+               RETURN(rc);
+       }
+
+       LASSERT(rc > saved_count);
+       OBD_FREE(lo->ldo_stripe,
+                sizeof(*stripes) * lo->ldo_dir_stripes_allocated);
+       lo->ldo_stripe = stripes;
+       lo->ldo_dir_striped = 1;
+       lo->ldo_dir_stripe_count = rc;
+       lo->ldo_dir_stripes_allocated = stripe_count;
+       lo->ldo_dir_split_hash = lo->ldo_dir_hash_type;
+       lo->ldo_dir_hash_type = le32_to_cpu(lum->lum_hash_type);
+       if (!lmv_is_known_hash_type(lo->ldo_dir_hash_type))
+               lo->ldo_dir_hash_type =
+                       lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern;
+       lo->ldo_dir_hash_type |= LMV_HASH_FLAG_SPLIT | LMV_HASH_FLAG_MIGRATION;
+       lo->ldo_dir_split_offset = saved_count;
+       lo->ldo_dir_layout_version++;
+       lo->ldo_dir_stripe_loaded = 1;
+
+       rc = lod_dir_declare_create_stripes(env, dt, mlc->mlc_attr, dof, th);
+       if (rc)
+               lod_striping_free(env, lo);
+
+       RETURN(rc);
+}
+
 /*
  * detach all stripes from dir master object, NB, stripes are not destroyed, but
  * deleted from it's parent namespace, this function is called in two places:
@@ -8093,6 +8225,7 @@ static mlc_handler dir_mlc_declare_ops[MD_LAYOUT_MAX] = {
        [MD_LAYOUT_ATTACH] = lod_dir_declare_layout_attach,
        [MD_LAYOUT_DETACH] = lod_dir_declare_layout_detach,
        [MD_LAYOUT_SHRINK] = lod_dir_declare_layout_shrink,
+       [MD_LAYOUT_SPLIT]  = lod_dir_declare_layout_split,
 };
 
 static mlc_handler dir_mlc_ops[MD_LAYOUT_MAX] = {
index c978a46..7ebf57d 100644 (file)
@@ -168,9 +168,9 @@ static int __mdd_links_read(const struct lu_env *env,
        return linkea_init(ldata);
 }
 
-static int mdd_links_read(const struct lu_env *env,
-                         struct mdd_object *mdd_obj,
-                         struct linkea_data *ldata)
+int mdd_links_read(const struct lu_env *env,
+                  struct mdd_object *mdd_obj,
+                  struct linkea_data *ldata)
 {
        int rc;
 
@@ -4040,6 +4040,10 @@ static int mdd_migrate_create(const struct lu_env *env,
        RETURN(rc);
 }
 
+/* NB: if user issued different migrate command, we can't ajust it silently
+ * here, because this command will decide target MDT in subdir migration in
+ * LMV.
+ */
 static int mdd_migrate_cmd_check(struct mdd_device *mdd,
                                 const struct lmv_mds_md_v1 *lmv,
                                 const struct lmv_user_md_v1 *lum,
@@ -4136,12 +4140,14 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
 
        lmv = pbuf.lb_buf;
        if (lmv) {
-               __u32 hash_type = le32_to_cpu(lmv->lmv_hash_type);
                int index;
 
+               if (!lmv_is_sane(lmv))
+                       GOTO(out, rc = -EBADF);
+
                /* locate target parent stripe */
                /* fail check here to make sure top dir migration succeed. */
-               if ((hash_type & LMV_HASH_FLAG_MIGRATION) &&
+               if (lmv_is_migrating(lmv) &&
                    OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
                        GOTO(out, rc = -EIO);
 
@@ -4156,7 +4162,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
                        GOTO(out, rc = PTR_ERR(tpobj));
 
                /* locate source parent stripe */
-               if (hash_type & LMV_HASH_FLAG_LAYOUT_CHANGE) {
+               if (lmv_is_layout_changing(lmv)) {
                        index = lmv_name_to_stripe_index_old(lmv,
                                                             lname->ln_name,
                                                             lname->ln_namelen);
@@ -4167,6 +4173,15 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
                        spobj = mdd_object_find(env, mdd, fid);
                        if (IS_ERR(spobj))
                                GOTO(out, rc = PTR_ERR(spobj));
+
+                       /* parent stripe unchanged */
+                       if (spobj == tpobj) {
+                               if (!lmv_is_restriping(lmv))
+                                       GOTO(out, rc = -EINVAL);
+                               GOTO(out, rc = -EALREADY);
+                       }
+                       if (S_ISDIR(attr->la_mode))
+                               nsonly = spec->sp_migrate_nsonly;
                } else {
                        spobj = tpobj;
                        mdd_object_get(spobj);
@@ -4186,7 +4201,7 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
        if (rc)
                GOTO(out, rc);
 
-       if (S_ISDIR(attr->la_mode)) {
+       if (S_ISDIR(attr->la_mode) && !nsonly) {
                struct lmv_user_md_v1 *lum = spec->u.sp_ea.eadata;
 
                LASSERT(lum);
@@ -4202,13 +4217,16 @@ static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
                        GOTO(out, rc);
 
                lmv = sbuf.lb_buf;
-               if (lmv &&
-                   (le32_to_cpu(lmv->lmv_hash_type) &
-                    LMV_HASH_FLAG_MIGRATION)) {
-                       rc = mdd_migrate_cmd_check(mdd, lmv, lum, lname);
-                       GOTO(out, rc);
+               if (lmv) {
+                       if (!lmv_is_sane(lmv))
+                               GOTO(out, rc = -EBADF);
+                       if (lmv_is_migrating(lmv)) {
+                               rc = mdd_migrate_cmd_check(mdd, lmv, lum,
+                                                          lname);
+                               GOTO(out, rc);
+                       }
                }
-       } else {
+       } else if (!S_ISDIR(attr->la_mode)) {
                if (spobj == tpobj)
                        GOTO(out, rc = -EALREADY);
 
@@ -4476,20 +4494,23 @@ int mdd_dir_layout_shrink(const struct lu_env *env,
                RETURN(rc);
 
        lmv = lmv_buf.lb_buf;
+       if (!lmv_is_sane(lmv))
+               RETURN(-EBADF);
+
        lmu = mlc->mlc_buf.lb_buf;
 
        /* adjust the default value '0' to '1' */
        if (lmu->lum_stripe_count == 0)
                lmu->lum_stripe_count = cpu_to_le32(1);
 
-       /* this was checked in MDT */
+       /* these were checked in MDT */
        LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
                le32_to_cpu(lmv->lmv_stripe_count));
+       LASSERT(!lmv_is_splitting(lmv));
+       LASSERT(lmv_is_migrating(lmv) || lmv_is_merging(lmv));
 
-       /*
-        * if obj stripe count will be shrunk to 1, we need to convert it to a
-        * normal dir, which will change its fid and update parent namespace,
-        * get obj name and parent fid from linkea.
+       /* if dir stripe count will be shrunk to 1, it needs to be transformed
+        * to a plain dir, which will cause FID change and namespace update.
         */
        if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
                struct linkea_data *ldata = &info->mti_link_data;
@@ -4585,6 +4606,257 @@ out:
        return rc;
 }
 
+static int mdd_dir_declare_split_plain(const struct lu_env *env,
+                                       struct mdd_device *mdd,
+                                       struct mdd_object *pobj,
+                                       struct mdd_object *obj,
+                                       struct mdd_object *tobj,
+                                       struct md_layout_change *mlc,
+                                       struct dt_allocation_hint *hint,
+                                       struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       const struct lu_name *lname = mlc->mlc_name;
+       struct lu_attr *la = &info->mti_la_for_fix;
+       struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata;
+       struct linkea_data *ldata = &info->mti_link_data;
+       struct lmv_mds_md_v1 *lmv;
+       __u32 count;
+       int rc;
+
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+       rc = mdo_declare_layout_change(env, obj, mlc, handle);
+       if (rc)
+               return rc;
+
+       memset(ldata, 0, sizeof(*ldata));
+       rc = mdd_linkea_prepare(env, obj, NULL, NULL, mdd_object_fid(pobj),
+                               lname, 1, 0, ldata);
+       if (rc)
+               return rc;
+
+       count = lum->lum_stripe_count;
+       lum->lum_stripe_count = 0;
+       mdd_object_make_hint(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
+                            hint);
+       rc = mdd_declare_create(env, mdo2mdd(&pobj->mod_obj), pobj, tobj,
+                               lname, mlc->mlc_attr, handle, mlc->mlc_spec,
+                               ldata, NULL, NULL, NULL, hint);
+       if (rc)
+               return rc;
+
+       /* tobj mode will be used in lod_declare_xattr_set(), but it's not
+        * createb yet.
+        */
+       tobj->mod_obj.mo_lu.lo_header->loh_attr |= S_IFDIR;
+
+       lmv = (typeof(lmv))info->mti_key;
+       memset(lmv, 0, sizeof(*lmv));
+       lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+       lmv->lmv_stripe_count = cpu_to_le32(1);
+       lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
+       fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdd_object_fid(obj));
+
+       mlc->mlc_opc = MD_LAYOUT_ATTACH;
+       mlc->mlc_buf.lb_buf = lmv;
+       mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
+       rc = mdo_declare_layout_change(env, tobj, mlc, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_iterate_xattrs(env, obj, tobj, true, handle,
+                               mdo_declare_xattr_set);
+       if (rc)
+               return rc;
+
+       lum->lum_stripe_count = count;
+       mlc->mlc_opc = MD_LAYOUT_SPLIT;
+       rc = mdo_declare_layout_change(env, tobj, mlc, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_index_insert(env, pobj, mdd_object_fid(tobj),
+                                     S_IFDIR, lname->ln_name, handle);
+       if (rc)
+               return rc;
+
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdo_declare_attr_set(env, obj, la, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_attr_set(env, pobj, la, handle);
+       if (rc)
+               return rc;
+
+       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+                                        handle);
+       return rc;
+}
+
+/**
+ * plain directory split:
+ * 1. create \a tobj as plain directory.
+ * 2. append \a obj as first stripe of \a tobj.
+ * 3. migrate xattrs from \a obj to \a tobj.
+ * 4. split \a tobj to specific stripe count.
+ */
+static int mdd_dir_split_plain(const struct lu_env *env,
+                               struct mdd_device *mdd,
+                               struct mdd_object *pobj,
+                               struct mdd_object *obj,
+                               struct mdd_object *tobj,
+                               struct md_layout_change *mlc,
+                               struct dt_allocation_hint *hint,
+                               struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lu_attr *pattr = &info->mti_pattr;
+       struct lu_attr *la = &info->mti_la_for_fix;
+       const struct lu_name *lname = mlc->mlc_name;
+       struct linkea_data *ldata = &info->mti_link_data;
+       int rc;
+
+       ENTRY;
+
+       /* copy linkea out and set on target later */
+       rc = mdd_links_read(env, obj, ldata);
+       if (rc)
+               RETURN(rc);
+
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       if (rc)
+               RETURN(rc);
+
+       /* don't set nlink from obj */
+       mlc->mlc_attr->la_valid &= ~LA_NLINK;
+
+       rc = mdd_create_object(env, pobj, tobj, mlc->mlc_attr, mlc->mlc_spec,
+                              NULL, NULL, NULL, hint, handle, false);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_iterate_xattrs(env, obj, tobj, true, handle, mdo_xattr_set);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_links_write(env, tobj, ldata, handle);
+       if (rc)
+               RETURN(rc);
+
+       rc = __mdd_index_delete(env, pobj, lname->ln_name, true, handle);
+       if (rc)
+               RETURN(rc);
+
+       rc = __mdd_index_insert(env, pobj, mdd_object_fid(tobj), S_IFDIR,
+                               lname->ln_name, handle);
+       if (rc)
+               RETURN(rc);
+
+       la->la_ctime = la->la_mtime = mlc->mlc_attr->la_mtime;
+       la->la_valid = LA_CTIME | LA_MTIME;
+
+       mdd_write_lock(env, obj, DT_SRC_CHILD);
+       rc = mdd_update_time(env, tobj, mlc->mlc_attr, la, handle);
+       mdd_write_unlock(env, obj);
+       if (rc)
+               RETURN(rc);
+
+       rc = mdd_la_get(env, pobj, pattr);
+       if (rc)
+               RETURN(rc);
+
+       la->la_valid = LA_CTIME | LA_MTIME;
+
+       mdd_write_lock(env, pobj, DT_SRC_PARENT);
+       rc = mdd_update_time(env, pobj, pattr, la, handle);
+       mdd_write_unlock(env, pobj);
+       if (rc)
+               RETURN(rc);
+
+       /* FID changes, record it as CL_MIGRATE */
+       rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
+                                   mdd_object_fid(pobj), mdd_object_fid(obj),
+                                   mdd_object_fid(pobj), lname, lname, handle);
+       RETURN(rc);
+}
+
+int mdd_dir_layout_split(const struct lu_env *env, struct md_object *o,
+                        struct md_layout_change *mlc)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_device *mdd = mdo2mdd(o);
+       struct mdd_object *obj = md2mdd_obj(o);
+       struct mdd_object *pobj = md2mdd_obj(mlc->mlc_parent);
+       struct mdd_object *tobj = md2mdd_obj(mlc->mlc_target);
+       struct dt_allocation_hint *hint = &info->mti_hint;
+       bool is_plain = false;
+       struct thandle *handle;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(S_ISDIR(mdd_object_type(obj)));
+
+       rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LMV);
+       if (rc == -ENODATA)
+               is_plain = true;
+       else if (rc < 0)
+               RETURN(rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       if (is_plain) {
+               rc = mdd_dir_declare_split_plain(env, mdd, pobj, obj, tobj, mlc,
+                                                hint, handle);
+       } else {
+               mlc->mlc_opc = MD_LAYOUT_SPLIT;
+               rc = mdo_declare_layout_change(env, obj, mlc, handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
+
+               rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL,
+                                                NULL, handle);
+       }
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       if (is_plain) {
+               rc = mdd_dir_split_plain(env, mdd, pobj, obj, tobj, mlc, hint,
+                                        handle);
+       } else {
+               mdd_write_lock(env, obj, DT_TGT_CHILD);
+               rc = mdo_xattr_set(env, obj, NULL, XATTR_NAME_LMV,
+                                  LU_XATTR_CREATE, handle);
+               mdd_write_unlock(env, obj);
+               if (rc)
+                       GOTO(stop_trans, rc);
+
+               rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
+                                                   XATTR_NAME_LMV, handle);
+       }
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       EXIT;
+
+stop_trans:
+       rc = mdd_trans_stop(env, mdd, rc, handle);
+
+       return rc;
+}
+
 const struct md_dir_operations mdd_dir_ops = {
        .mdo_is_subdir     = mdd_is_subdir,
        .mdo_lookup        = mdd_lookup,
index 5e5c727..5080b83 100644 (file)
@@ -267,6 +267,9 @@ int mdd_lookup(const struct lu_env *env,
                struct lu_fid* fid, struct md_op_spec *spec);
 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
                    struct linkea_data *ldata, struct thandle *handle);
+int mdd_links_read(const struct lu_env *env,
+                  struct mdd_object *mdd_obj,
+                  struct linkea_data *ldata);
 struct lu_buf *mdd_links_get(const struct lu_env *env,
                              struct mdd_object *mdd_obj);
 int mdd_links_rename(const struct lu_env *env,
@@ -281,6 +284,8 @@ int mdd_links_rename(const struct lu_env *env,
 int mdd_dir_layout_shrink(const struct lu_env *env,
                          struct md_object *md_obj,
                          struct md_layout_change *mlc);
+int mdd_dir_layout_split(const struct lu_env *env, struct md_object *o,
+                        struct md_layout_change *mlc);
 
 int mdd_changelog_write_rec(const struct lu_env *env,
                            struct llog_handle *loghandle,
index beab56b..d32da73 100644 (file)
@@ -2949,6 +2949,9 @@ mdd_layout_change(const struct lu_env *env, struct md_object *o,
                case MD_LAYOUT_SHRINK:
                        rc = mdd_dir_layout_shrink(env, o, mlc);
                        break;
+               case MD_LAYOUT_SPLIT:
+                       rc = mdd_dir_layout_split(env, o, mlc);
+                       break;
                default:
                        LBUG();
                }
index 0891ba5..235cd89 100644 (file)
@@ -5510,6 +5510,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        m->mdt_enable_remote_dir = 1;
        m->mdt_enable_striped_dir = 1;
        m->mdt_enable_dir_migration = 1;
+       m->mdt_enable_dir_restripe = 1;
        m->mdt_enable_remote_dir_gid = 0;
        m->mdt_enable_chprojid_gid = 0;
        m->mdt_enable_remote_rename = 1;
index 3e868bd..6f7cedb 100644 (file)
@@ -255,6 +255,7 @@ struct mdt_device {
                                   mdt_enable_remote_dir:1,
                                   mdt_enable_striped_dir:1,
                                   mdt_enable_dir_migration:1,
+                                  mdt_enable_dir_restripe:1,
                                   mdt_enable_remote_rename:1,
                                   mdt_skip_lfsck:1,
                                   mdt_readonly:1;
index 81f80f4..6e63b64 100644 (file)
@@ -765,6 +765,35 @@ static ssize_t enable_dir_migration_store(struct kobject *kobj,
 }
 LUSTRE_RW_ATTR(enable_dir_migration);
 
+static ssize_t enable_dir_restripe_show(struct kobject *kobj,
+                                       struct attribute *attr, char *buf)
+{
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+       return scnprintf(buf, PAGE_SIZE, "%u\n", mdt->mdt_enable_dir_restripe);
+}
+
+static ssize_t enable_dir_restripe_store(struct kobject *kobj,
+                                        struct attribute *attr,
+                                        const char *buffer, size_t count)
+{
+       struct obd_device *obd = container_of(kobj, struct obd_device,
+                                             obd_kset.kobj);
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       bool val;
+       int rc;
+
+       rc = kstrtobool(buffer, &val);
+       if (rc)
+               return rc;
+
+       mdt->mdt_enable_dir_restripe = val;
+       return count;
+}
+LUSTRE_RW_ATTR(enable_dir_restripe);
+
 /**
  * Show MDT async commit count.
  *
@@ -1111,6 +1140,7 @@ static struct attribute *mdt_attrs[] = {
        &lustre_attr_enable_chprojid_gid.attr,
        &lustre_attr_enable_striped_dir.attr,
        &lustre_attr_enable_dir_migration.attr,
+       &lustre_attr_enable_dir_restripe.attr,
        &lustre_attr_enable_remote_rename.attr,
        &lustre_attr_commit_on_sharing.attr,
        &lustre_attr_local_recovery.attr,
index 458b6b7..1b65208 100644 (file)
@@ -342,6 +342,227 @@ void mdt_reint_striped_unlock(struct mdt_thread_info *info,
        mdt_object_unlock(info, o, lh, decref);
 }
 
+static int mdt_restripe(struct mdt_thread_info *info,
+                       struct mdt_object *pobj,
+                       const struct lu_name *lname,
+                       const struct lu_fid *tfid,
+                       struct md_op_spec *spec,
+                       struct md_attr *ma)
+{
+       const struct lu_env *env = info->mti_env;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct lu_fid *cfid = &info->mti_tmp_fid2;
+       struct lmv_user_md *lum = spec->u.sp_ea.eadata;
+       struct md_layout_change *mlc = &info->mti_mlc;
+       struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
+       struct lmv_mds_md_v1 *lmv;
+       struct mdt_object *child;
+       struct mdt_object *tobj = NULL;
+       struct mdt_lock_handle *lhp = NULL;
+       struct mdt_lock_handle *lhc;
+       struct mdt_body *repbody;
+       u32 lmv_stripe_count = 0;
+       int rc;
+
+       ENTRY;
+
+       if (!mdt->mdt_enable_dir_restripe)
+               RETURN(-EPERM);
+
+       /* mti_big_lmm is used to save LMV, but it may be uninitialized. */
+       if (unlikely(!info->mti_big_lmm)) {
+               info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
+               OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
+               if (!info->mti_big_lmm)
+                       RETURN(-ENOMEM);
+       }
+
+       rc = mdt_version_get_check_save(info, pobj, 0);
+       if (rc)
+               RETURN(rc);
+
+       ma->ma_lmv = info->mti_big_lmm;
+       ma->ma_lmv_size = info->mti_big_lmmsize;
+       ma->ma_valid = 0;
+       rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
+       if (rc)
+               RETURN(rc);
+
+       if (ma->ma_valid & MA_LMV) {
+               /* don't allow restripe if parent dir layout is changing */
+               lmv = &ma->ma_lmv->lmv_md_v1;
+               if (!lmv_is_sane(lmv))
+                       RETURN(-EBADF);
+
+               if (lmv_is_layout_changing(lmv))
+                       RETURN(-EBUSY);
+       }
+
+       lhp = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_pdo_init(lhp, LCK_PW, lname);
+       rc = mdt_reint_object_lock(info, pobj, lhp, MDS_INODELOCK_UPDATE, true);
+       if (rc)
+               RETURN(rc);
+
+       fid_zero(cfid);
+       rc = mdt_lookup_version_check(info, pobj, lname, cfid, 1);
+       if (rc)
+               GOTO(unlock_parent, rc);
+
+       child = mdt_object_find(info->mti_env, mdt, cfid);
+       if (IS_ERR(child))
+               GOTO(unlock_parent, rc = PTR_ERR(child));
+
+       if (!mdt_object_exists(child))
+               GOTO(out_child, rc = -ENOENT);
+
+       if (mdt_object_remote(child)) {
+               struct mdt_body *repbody;
+
+               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+               if (!repbody)
+                       GOTO(out_child, rc = -EPROTO);
+
+               repbody->mbo_fid1 = *cfid;
+               repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
+               GOTO(out_child, rc = -EREMOTE);
+       }
+
+       /* lock object */
+       lhc = &info->mti_lh[MDT_LH_CHILD];
+       mdt_lock_reg_init(lhc, LCK_EX);
+
+       /* enqueue object remote LOOKUP lock */
+       if (mdt_object_remote(pobj)) {
+               rc = mdt_remote_object_lock(info, pobj, cfid, &lhc->mlh_rreg_lh,
+                                           lhc->mlh_rreg_mode,
+                                           MDS_INODELOCK_LOOKUP, false);
+               if (rc != ELDLM_OK)
+                       GOTO(out_child, rc);
+       }
+
+       rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
+                                   true);
+       if (rc)
+               GOTO(unlock_child, rc);
+
+       tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
+       rc = mdt_version_get_check_save(info, child, 1);
+       if (rc)
+               GOTO(unlock_child, rc);
+
+       ma->ma_valid = 0;
+       rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
+       if (rc)
+               GOTO(unlock_child, rc);
+
+       if (ma->ma_valid & MA_LMV) {
+               lmv = &ma->ma_lmv->lmv_md_v1;
+               if (!lmv_is_sane(lmv))
+                       GOTO(unlock_child, rc = -EBADF);
+
+               /* don't allow restripe if dir layout is changing */
+               if (lmv_is_layout_changing(lmv))
+                       GOTO(unlock_child, rc = -EBUSY);
+
+               /* check whether stripe count and hash unchanged */
+               if (lum->lum_stripe_count == lmv->lmv_stripe_count &&
+                   lum->lum_hash_type == lmv->lmv_hash_type)
+                       GOTO(unlock_child, rc = -EALREADY);
+
+               lmv_stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+       } else if (le32_to_cpu(lum->lum_stripe_count) < 2) {
+               /* stripe count unchanged for plain directory */
+               GOTO(unlock_child, rc = -EALREADY);
+       }
+
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       if (!repbody)
+               GOTO(unlock_child, rc = -EPROTO);
+
+       if (le32_to_cpu(lum->lum_stripe_count) > lmv_stripe_count) {
+               /* split */
+               ma->ma_need = MA_INODE;
+               ma->ma_valid = 0;
+               rc = mdt_attr_get_complex(info, child, ma);
+               if (rc)
+                       GOTO(unlock_child, rc);
+
+               if (!(ma->ma_valid & MA_INODE))
+                       GOTO(unlock_child, rc = -EBADF);
+
+               if (!lmv_stripe_count) {
+                       /* if child is plain directory, allocate @tobj as the
+                        * master object, and make child the first stripe of
+                        * @tobj.
+                        */
+                       tobj = mdt_object_new(info->mti_env, mdt, tfid);
+                       if (unlikely(IS_ERR(tobj)))
+                               GOTO(unlock_child, rc = PTR_ERR(tobj));
+               }
+
+               mlc->mlc_opc = MD_LAYOUT_SPLIT;
+               mlc->mlc_parent = mdt_object_child(pobj);
+               mlc->mlc_target = tobj ? mdt_object_child(tobj) : NULL;
+               mlc->mlc_attr = &ma->ma_attr;
+               mlc->mlc_name = lname;
+               mlc->mlc_spec = spec;
+               rc = mo_layout_change(env, mdt_object_child(child), mlc);
+               if (rc)
+                       GOTO(out_tobj, rc);
+       } else {
+               /* merge only needs to override LMV */
+               struct lu_buf *buf = &info->mti_buf;
+               __u32 version;
+
+               LASSERT(ma->ma_valid & MA_LMV);
+               lmv = &ma->ma_lmv->lmv_md_v1;
+               version = cpu_to_le32(lmv->lmv_layout_version);
+
+               /* adjust 0 to 1 */
+               if (lum->lum_stripe_count == 0)
+                       lum->lum_stripe_count = cpu_to_le32(1);
+
+               lmv->lmv_hash_type |= cpu_to_le32(LMV_HASH_FLAG_MERGE |
+                                                 LMV_HASH_FLAG_MIGRATION);
+               lmv->lmv_merge_offset = lum->lum_stripe_count;
+               lmv->lmv_merge_hash = lum->lum_hash_type;
+               lmv->lmv_layout_version = cpu_to_le32(++version);
+
+               buf->lb_buf = lmv;
+               buf->lb_len = sizeof(*lmv);
+               rc = mo_xattr_set(env, mdt_object_child(child), buf,
+                                 XATTR_NAME_LMV, LU_XATTR_REPLACE);
+               if (rc)
+                       GOTO(unlock_child, rc);
+       }
+
+       ma->ma_need = MA_INODE;
+       ma->ma_valid = 0;
+       rc = mdt_attr_get_complex(info, tobj ? tobj : child, ma);
+       if (rc)
+               GOTO(out_tobj, rc);
+
+       if (!(ma->ma_valid & MA_INODE))
+               GOTO(out_tobj, rc = -EBADF);
+
+       mdt_pack_attr2body(info, repbody, &ma->ma_attr,
+                          mdt_object_fid(tobj ? tobj : child));
+       EXIT;
+
+out_tobj:
+       if (tobj)
+               mdt_object_put(env, tobj);
+unlock_child:
+       mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
+out_child:
+       mdt_object_put(env, child);
+unlock_parent:
+       mdt_object_unlock(info, pobj, lhp, rc);
+
+       return rc;
+}
+
 /*
  * VBR: we save three versions in reply:
  * 0 - parent. Check that parent version is the same during replay.
@@ -352,14 +573,15 @@ void mdt_reint_striped_unlock(struct mdt_thread_info *info,
  */
 static int mdt_create(struct mdt_thread_info *info)
 {
-       struct mdt_device       *mdt = info->mti_mdt;
-       struct mdt_object       *parent;
-       struct mdt_object       *child;
-       struct mdt_lock_handle  *lh;
-       struct mdt_body         *repbody;
-       struct md_attr          *ma = &info->mti_attr;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_object *parent;
+       struct mdt_object *child;
+       struct mdt_lock_handle *lh;
+       struct mdt_body *repbody;
+       struct md_attr *ma = &info->mti_attr;
        struct mdt_reint_record *rr = &info->mti_rr;
-       struct md_op_spec       *spec = &info->mti_spec;
+       struct md_op_spec *spec = &info->mti_spec;
+       bool restripe = false;
        int rc;
        ENTRY;
 
@@ -400,6 +622,10 @@ static int mdt_create(struct mdt_thread_info *info)
                    uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
                    mdt->mdt_enable_remote_dir_gid != -1)
                        RETURN(-EPERM);
+
+               /* restripe if later found dir exists */
+               if (le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT)
+                       restripe = true;
        }
 
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
@@ -417,8 +643,13 @@ static int mdt_create(struct mdt_thread_info *info)
         */
        rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
                                      &info->mti_tmp_fid1, 1);
-       if (rc == 0)
-               GOTO(put_parent, rc = -EEXIST);
+       if (rc == 0) {
+               if (!restripe)
+                       GOTO(put_parent, rc = -EEXIST);
+
+               rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
+                                 ma);
+       }
 
        /* -ENOENT is expected here */
        if (rc != -ENOENT)
@@ -1826,6 +2057,9 @@ static int mdt_migrate_lookup(struct mdt_thread_info *info,
                /* if parent is striped, lookup on corresponding stripe */
                struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
 
+               if (!lmv_is_sane(lmv))
+                       return -EBADF;
+
                rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
                                                  lname->ln_namelen);
                if (rc < 0)
@@ -1840,14 +2074,13 @@ static int mdt_migrate_lookup(struct mdt_thread_info *info,
                fid_zero(fid);
                rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
                                &info->mti_spec);
-               if (rc == -ENOENT &&
-                   (cpu_to_le32(lmv->lmv_hash_type) &
-                    LMV_HASH_FLAG_LAYOUT_CHANGE)) {
+               if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
                        /*
-                        * if parent is migrating, and lookup child failed on
-                        * source stripe, lookup again on target stripe, if it
-                        * exists, it means previous migration was interrupted,
-                        * and current file was migrated already.
+                        * if parent layout is changeing, and lookup child
+                        * failed on source stripe, lookup again on target
+                        *  stripe, if it exists, it means previous migration
+                        *  was interrupted, and current file was migrated
+                        *  already.
                         */
                        mdt_object_put(env, stripe);
 
index e1f81f3..2f0fe84 100644 (file)
@@ -45,6 +45,7 @@
 #include <obd_class.h>
 #include <lustre_nodemap.h>
 #include <lustre_acl.h>
+#include <lustre_lmv.h>
 #include "mdt_internal.h"
 
 
@@ -316,7 +317,7 @@ out:
        return rc;
 }
 
-/* update dir layout after migration */
+/* update dir layout after migration/restripe */
 static int mdt_dir_layout_update(struct mdt_thread_info *info)
 {
        const struct lu_env *env = info->mti_env;
@@ -333,6 +334,7 @@ static int mdt_dir_layout_update(struct mdt_thread_info *info)
        struct mdt_object *obj;
        struct mdt_lock_handle *lhp = NULL;
        struct mdt_lock_handle *lhc;
+       bool shrink = false;
        int rc;
 
        ENTRY;
@@ -408,16 +410,18 @@ static int mdt_dir_layout_update(struct mdt_thread_info *info)
                GOTO(unlock_obj, rc = -EALREADY);
 
        lmv = &ma->ma_lmv->lmv_md_v1;
+       if (!lmv_is_sane(lmv))
+               GOTO(unlock_obj, rc = -EBADF);
 
        /* ditto */
-       if (!(le32_to_cpu(lmv->lmv_hash_type) & LMV_HASH_FLAG_LAYOUT_CHANGE))
+       if (!lmv_is_layout_changing(lmv))
                GOTO(unlock_obj, rc = -EALREADY);
 
        lum_stripe_count = lmu->lum_stripe_count;
        if (!lum_stripe_count)
                lum_stripe_count = cpu_to_le32(1);
 
-       if ((le32_to_cpu(lmv->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)) {
+       if (lmv_is_migrating(lmv)) {
                if (lmv->lmv_migrate_offset != lum_stripe_count) {
                        CERROR("%s: "DFID" migrate mdt count mismatch %u != %u\n",
                                mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
@@ -434,19 +438,82 @@ static int mdt_dir_layout_update(struct mdt_thread_info *info)
                }
 
                if (lum_stripe_count > 1 && lmu->lum_hash_type &&
-                   (lmv->lmv_hash_type & ~cpu_to_le32(LMV_HASH_FLAG_MIGRATION))
-                   != lmu->lum_hash_type) {
+                   lmu->lum_hash_type !=
+                   (lmv->lmv_merge_hash & cpu_to_le32(LMV_HASH_TYPE_MASK))) {
                        CERROR("%s: "DFID" migrate mdt hash mismatch %u != %u\n",
                                mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
                                lmv->lmv_hash_type, lmu->lum_hash_type);
                        GOTO(unlock_obj, rc = -EINVAL);
                }
+
+               shrink = true;
+       } else if (lmv_is_splitting(lmv)) {
+               if (lmv->lmv_stripe_count != lum_stripe_count) {
+                       CERROR("%s: "DFID" stripe count mismatch %u != %u\n",
+                               mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+                               lmv->lmv_stripe_count, lmu->lum_stripe_count);
+                       GOTO(unlock_obj, rc = -EINVAL);
+               }
+
+               if (lmu->lum_stripe_offset != LMV_OFFSET_DEFAULT) {
+                       CERROR("%s: "DFID" dir split offset %u != -1\n",
+                               mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+                               lmu->lum_stripe_offset);
+                       GOTO(unlock_obj, rc = -EINVAL);
+               }
+
+               if (lmu->lum_hash_type &&
+                   lmu->lum_hash_type !=
+                   (lmv->lmv_hash_type & cpu_to_le32(LMV_HASH_TYPE_MASK))) {
+                       CERROR("%s: "DFID" split hash mismatch %u != %u\n",
+                               mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+                               lmv->lmv_hash_type, lmu->lum_hash_type);
+                       GOTO(unlock_obj, rc = -EINVAL);
+               }
+       } else if (lmv_is_merging(lmv)) {
+               if (lmv->lmv_merge_offset != lum_stripe_count) {
+                       CERROR("%s: "DFID" stripe count mismatch %u != %u\n",
+                               mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+                               lmv->lmv_merge_offset, lmu->lum_stripe_count);
+                       GOTO(unlock_obj, rc = -EINVAL);
+               }
+
+               if (lmu->lum_stripe_offset != LMV_OFFSET_DEFAULT) {
+                       CERROR("%s: "DFID" dir split offset %u != -1\n",
+                               mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+                               lmu->lum_stripe_offset);
+                       GOTO(unlock_obj, rc = -EINVAL);
+               }
+
+               if (lmu->lum_hash_type &&
+                   lmu->lum_hash_type !=
+                   (lmv->lmv_merge_hash & cpu_to_le32(LMV_HASH_TYPE_MASK))) {
+                       CERROR("%s: "DFID" split hash mismatch %u != %u\n",
+                               mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+                               lmv->lmv_merge_hash, lmu->lum_hash_type);
+                       GOTO(unlock_obj, rc = -EINVAL);
+               }
+
+               if (lum_stripe_count < lmv->lmv_stripe_count)
+                       shrink = true;
        }
 
-       mlc->mlc_opc = MD_LAYOUT_SHRINK;
-       mlc->mlc_buf.lb_buf = rr->rr_eadata;
-       mlc->mlc_buf.lb_len = rr->rr_eadatalen;
-       rc = mo_layout_change(env, mdt_object_child(obj), mlc);
+       if (shrink) {
+               mlc->mlc_opc = MD_LAYOUT_SHRINK;
+               mlc->mlc_buf.lb_buf = rr->rr_eadata;
+               mlc->mlc_buf.lb_len = rr->rr_eadatalen;
+               rc = mo_layout_change(env, mdt_object_child(obj), mlc);
+       } else {
+               struct lu_buf *buf = &info->mti_buf;
+               u32 version = le32_to_cpu(lmv->lmv_layout_version);
+
+               lmv->lmv_hash_type &= ~LMV_HASH_FLAG_LAYOUT_CHANGE;
+               lmv->lmv_layout_version = cpu_to_le32(++version);
+               buf->lb_buf = lmv;
+               buf->lb_len = sizeof(*lmv);
+               rc = mo_xattr_set(env, mdt_object_child(obj), buf,
+                                 XATTR_NAME_LMV, LU_XATTR_REPLACE);
+       }
        GOTO(unlock_obj, rc);
 
 unlock_obj:
index e069b60..019d45e 100644 (file)
@@ -1899,6 +1899,8 @@ void lustre_assert_wire_constants(void)
        BUILD_BUG_ON(LMV_MAGIC_V1 != 0x0CD20CD0);
        BUILD_BUG_ON(LMV_MAGIC_STRIPE != 0x0CD40CD0);
        BUILD_BUG_ON(LMV_HASH_TYPE_MASK != 0x0000ffff);
+       BUILD_BUG_ON(LMV_HASH_FLAG_MERGE != 0x04000000);
+       BUILD_BUG_ON(LMV_HASH_FLAG_SPLIT != 0x08000000);
        BUILD_BUG_ON(LMV_HASH_FLAG_LOST_LMV != 0x10000000);
        BUILD_BUG_ON(LMV_HASH_FLAG_BAD_TYPE != 0x20000000);
        BUILD_BUG_ON(LMV_HASH_FLAG_MIGRATION != 0x80000000);
@@ -2351,6 +2353,8 @@ void lustre_assert_wire_constants(void)
                (unsigned)MDS_OWNEROVERRIDE);
        LASSERTF(MDS_HSM_RELEASE == 0x00001000UL, "found 0x%.8xUL\n",
                (unsigned)MDS_HSM_RELEASE);
+       LASSERTF(MDS_CLOSE_MIGRATE == 0x00002000UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_CLOSE_MIGRATE);
        LASSERTF(MDS_CLOSE_LAYOUT_SWAP == 0x00004000UL, "found 0x%.8xUL\n",
                (unsigned)MDS_CLOSE_LAYOUT_SWAP);
        LASSERTF(MDS_CLOSE_LAYOUT_MERGE == 0x00008000UL, "found 0x%.8xUL\n",
index 16e140c..1401d92 100644 (file)
@@ -3101,8 +3101,8 @@ void lmv_dump_user_lmm(struct lmv_user_md *lum, char *pool_name,
                        llapi_printf(LLAPI_MSG_NORMAL, ",bad_type");
                if (flags & LMV_HASH_FLAG_LOST_LMV)
                        llapi_printf(LLAPI_MSG_NORMAL, ",lost_lmv");
-               separator = "\n";
 
+               separator = "\n";
        }
 
        if (verbose & VERBOSE_OBJID && lum->lum_magic != LMV_USER_MAGIC) {
index 54a1082..c4d5e33 100644 (file)
@@ -877,7 +877,8 @@ check_lmv_mds_md_v1(void)
        CHECK_CDEFINE(LMV_MAGIC_V1);
        CHECK_CDEFINE(LMV_MAGIC_STRIPE);
        CHECK_CDEFINE(LMV_HASH_TYPE_MASK);
-       CHECK_CDEFINE(LMV_HASH_FLAG_LOST_LMV);
+       CHECK_CDEFINE(LMV_HASH_FLAG_MERGE);
+       CHECK_CDEFINE(LMV_HASH_FLAG_SPLIT);
        CHECK_CDEFINE(LMV_HASH_FLAG_BAD_TYPE);
        CHECK_CDEFINE(LMV_HASH_FLAG_MIGRATION);
        CHECK_CDEFINE(LMV_CRUSH_PG_COUNT);
@@ -1090,6 +1091,7 @@ check_mds_op_bias(void)
        CHECK_VALUE_X(MDS_CREATE_VOLATILE);
        CHECK_VALUE_X(MDS_OWNEROVERRIDE);
        CHECK_VALUE_X(MDS_HSM_RELEASE);
+       CHECK_VALUE_X(MDS_CLOSE_MIGRATE);
        CHECK_VALUE_X(MDS_CLOSE_LAYOUT_SWAP);
        CHECK_VALUE_X(MDS_CLOSE_LAYOUT_MERGE);
        CHECK_VALUE_X(MDS_CLOSE_RESYNC_DONE);
index 3c9a431..3c9d0a4 100644 (file)
@@ -1925,6 +1925,8 @@ void lustre_assert_wire_constants(void)
        BUILD_BUG_ON(LMV_MAGIC_V1 != 0x0CD20CD0);
        BUILD_BUG_ON(LMV_MAGIC_STRIPE != 0x0CD40CD0);
        BUILD_BUG_ON(LMV_HASH_TYPE_MASK != 0x0000ffff);
+       BUILD_BUG_ON(LMV_HASH_FLAG_MERGE != 0x04000000);
+       BUILD_BUG_ON(LMV_HASH_FLAG_SPLIT != 0x08000000);
        BUILD_BUG_ON(LMV_HASH_FLAG_LOST_LMV != 0x10000000);
        BUILD_BUG_ON(LMV_HASH_FLAG_BAD_TYPE != 0x20000000);
        BUILD_BUG_ON(LMV_HASH_FLAG_MIGRATION != 0x80000000);
@@ -2377,6 +2379,8 @@ void lustre_assert_wire_constants(void)
                (unsigned)MDS_OWNEROVERRIDE);
        LASSERTF(MDS_HSM_RELEASE == 0x00001000UL, "found 0x%.8xUL\n",
                (unsigned)MDS_HSM_RELEASE);
+       LASSERTF(MDS_CLOSE_MIGRATE == 0x00002000UL, "found 0x%.8xUL\n",
+               (unsigned)MDS_CLOSE_MIGRATE);
        LASSERTF(MDS_CLOSE_LAYOUT_SWAP == 0x00004000UL, "found 0x%.8xUL\n",
                (unsigned)MDS_CLOSE_LAYOUT_SWAP);
        LASSERTF(MDS_CLOSE_LAYOUT_MERGE == 0x00008000UL, "found 0x%.8xUL\n",