Whamcloud - gitweb
LU-11025 dne: change dir layout via dt_layout_change 34/37634/5
authorLai Siyao <lai.siyao@whamcloud.com>
Fri, 14 Feb 2020 09:37:58 +0000 (17:37 +0800)
committerOleg Drokin <green@whamcloud.com>
Tue, 31 Mar 2020 06:59:09 +0000 (06:59 +0000)
This patch includes following changes:

* Use existing mo_layout_change()/dt_layout_change() interface to
  change directory layout, which is cleaner and easier to add new
  features. The supported layout change operations are:
  1. MD_LAYOUT_DETACH: detach stripes from directory master object.
  2. MD_LAYOUT_ATTACH: attach stripes after target's.
  3. MD_LAYOUT_SHRINK: destroy stripes from specific offset.

* Previously layout change is done in MDD layer if striped directory
  is remote, however LOD can handle remote striped directory the same
  as local one, NB, remember to initialize index operations before
  altering directory layout. In this way layer violation can be
  avoided.

* Use setxattr(XATTR_NAME_LMV, LU_XATTR_CREATE) to create stripes
  for directories, and '0' or 'LU_XATTR_REPLACE' to set or replace
  directory LMV instead of setxattr(XATTR_NAME_LMV".set").

Signed-off-by: Lai Siyao <lai.siyao@whamcloud.com>
Change-Id: Iac226f9e0e504f19837e14e93cee2542a2cb5e3d
Reviewed-on: https://review.whamcloud.com/37634
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Hongchao Zhang <hongchao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/md_object.h
lustre/lfsck/lfsck_namespace.c
lustre/lfsck/lfsck_striped_dir.c
lustre/lod/lod_internal.h
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_xattr.c

index cb0bf71..263c55d 100644 (file)
@@ -181,6 +181,10 @@ enum md_layout_opc {
        MD_LAYOUT_WRITE,        /* FLR: write the file */
        MD_LAYOUT_RESYNC,       /* FLR: resync starts */
        MD_LAYOUT_RESYNC_DONE,  /* FLR: resync done */
        MD_LAYOUT_WRITE,        /* FLR: write the file */
        MD_LAYOUT_RESYNC,       /* FLR: resync starts */
        MD_LAYOUT_RESYNC_DONE,  /* FLR: resync done */
+       MD_LAYOUT_ATTACH,       /* attach stripes to target dir */
+       MD_LAYOUT_DETACH,       /* detach stripes from dir */
+       MD_LAYOUT_SHRINK,       /* shrink stripes (check empty and destroy) */
+       MD_LAYOUT_MAX,
 };
 
 /**
 };
 
 /**
index 14ed2a5..51e45e5 100644 (file)
@@ -1129,7 +1129,7 @@ static int lfsck_lmv_set(const struct lu_env *env,
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
-       rc = dt_declare_xattr_set(env, obj, &buf, XATTR_NAME_LMV".set", 0, th);
+       rc = dt_declare_xattr_set(env, obj, &buf, XATTR_NAME_LMV, 0, th);
        if (rc)
                GOTO(stop, rc);
 
        if (rc)
                GOTO(stop, rc);
 
@@ -1137,7 +1137,7 @@ static int lfsck_lmv_set(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop, rc);
 
        if (rc != 0)
                GOTO(stop, rc);
 
-       rc = dt_xattr_set(env, obj, &buf, XATTR_NAME_LMV".set", 0, th);
+       rc = dt_xattr_set(env, obj, &buf, XATTR_NAME_LMV, 0, th);
        if (rc)
                GOTO(stop, rc);
 
        if (rc)
                GOTO(stop, rc);
 
@@ -1341,8 +1341,8 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env,
        }
 
        if (parent_lmv_lost) {
        }
 
        if (parent_lmv_lost) {
-               rc = dt_declare_xattr_set(env, parent, &buf,
-                                         XATTR_NAME_LMV".set", 0, th);
+               rc = dt_declare_xattr_set(env, parent, &buf, XATTR_NAME_LMV,
+                                         0, th);
                if (rc)
                        GOTO(stop, rc);
        }
                if (rc)
                        GOTO(stop, rc);
        }
@@ -1375,8 +1375,7 @@ static int lfsck_namespace_insert_normal(const struct lu_env *env,
        }
 
        if (parent_lmv_lost) {
        }
 
        if (parent_lmv_lost) {
-               rc = dt_xattr_set(env, parent, &buf, XATTR_NAME_LMV".set", 0,
-                                 th);
+               rc = dt_xattr_set(env, parent, &buf, XATTR_NAME_LMV, 0, th);
                if (rc)
                        GOTO(stop, rc);
        }
                if (rc)
                        GOTO(stop, rc);
        }
@@ -1584,8 +1583,8 @@ again:
                lmv->lmv_master_mdt_index = lfsck_dev_idx(lfsck);
                lfsck_lmv_header_cpu_to_le(lmv2, lmv);
                lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
                lmv->lmv_master_mdt_index = lfsck_dev_idx(lfsck);
                lfsck_lmv_header_cpu_to_le(lmv2, lmv);
                lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
-               rc = dt_declare_xattr_set(env, orphan, &lmv_buf,
-                                         XATTR_NAME_LMV, 0, th);
+               rc = dt_declare_xattr_set(env, orphan, &lmv_buf, XATTR_NAME_LMV,
+                                         0, th);
                if (rc != 0)
                        GOTO(stop, rc);
        }
                if (rc != 0)
                        GOTO(stop, rc);
        }
@@ -5371,6 +5370,8 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env,
         */
        if (S_ISREG(type))
                child->do_ops->do_ah_init(env, hint,  parent, child, type);
         */
        if (S_ISREG(type))
                child->do_ops->do_ah_init(env, hint,  parent, child, type);
+       else if (S_ISDIR(type))
+               child->do_ops->do_ah_init(env, hint,  NULL, child, type);
 
        memset(dof, 0, sizeof(*dof));
        dof->dof_type = dt_mode_to_dft(type);
 
        memset(dof, 0, sizeof(*dof));
        dof->dof_type = dt_mode_to_dft(type);
@@ -5430,7 +5431,7 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env,
                        lfsck_lmv_header_cpu_to_le(lmv2, lmv2);
                        lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
                        rc = dt_declare_xattr_set(env, child, &lmv_buf,
                        lfsck_lmv_header_cpu_to_le(lmv2, lmv2);
                        lfsck_buf_init(&lmv_buf, lmv2, sizeof(*lmv2));
                        rc = dt_declare_xattr_set(env, child, &lmv_buf,
-                                                 XATTR_NAME_LMV".set", 0, th);
+                                                 XATTR_NAME_LMV, 0, th);
                        if (rc != 0)
                                GOTO(stop, rc);
                }
                        if (rc != 0)
                                GOTO(stop, rc);
                }
@@ -5492,8 +5493,8 @@ int lfsck_namespace_repair_dangling(const struct lu_env *env,
 
                /* 5b. generate slave LMV EA. */
                if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) {
 
                /* 5b. generate slave LMV EA. */
                if (lnr->lnr_lmv != NULL && lnr->lnr_lmv->ll_lmv_master) {
-                       rc = dt_xattr_set(env, child, &lmv_buf,
-                                         XATTR_NAME_LMV".set", 0, th);
+                       rc = dt_xattr_set(env, child, &lmv_buf, XATTR_NAME_LMV,
+                                         0, th);
                        if (rc != 0)
                                GOTO(unlock, rc);
                }
                        if (rc != 0)
                                GOTO(unlock, rc);
                }
index a222e6e..ed0d3d5 100644 (file)
@@ -2192,11 +2192,6 @@ repair:
                if (repair_linkea) {
                        struct lustre_handle lh = { 0 };
 
                if (repair_linkea) {
                        struct lustre_handle lh = { 0 };
 
-                       rc1 = linkea_links_new(&ldata, &info->lti_big_buf,
-                                              cname, lfsck_dto2fid(dir));
-                       if (rc1 != 0)
-                               goto next;
-
                        if (dir == NULL) {
                                dir = lfsck_assistant_object_load(env, lfsck,
                                                                  lso);
                        if (dir == NULL) {
                                dir = lfsck_assistant_object_load(env, lfsck,
                                                                  lso);
@@ -2211,6 +2206,11 @@ repair:
                                }
                        }
 
                                }
                        }
 
+                       rc1 = linkea_links_new(&ldata, &info->lti_big_buf,
+                                              cname, lfsck_dto2fid(dir));
+                       if (rc1 != 0)
+                               goto next;
+
                        rc1 = lfsck_ibits_lock(env, lfsck, obj, &lh,
                                               MDS_INODELOCK_UPDATE |
                                               MDS_INODELOCK_XATTR, LCK_EX);
                        rc1 = lfsck_ibits_lock(env, lfsck, obj, &lh,
                                               MDS_INODELOCK_UPDATE |
                                               MDS_INODELOCK_XATTR, LCK_EX);
index f474541..0aedc2e 100644 (file)
@@ -216,6 +216,7 @@ struct lod_object {
                        __u32           ldo_dir_hash_type;
                        __u32           ldo_dir_migrate_offset;
                        __u32           ldo_dir_migrate_hash;
                        __u32           ldo_dir_hash_type;
                        __u32           ldo_dir_migrate_offset;
                        __u32           ldo_dir_migrate_hash;
+                       __u32           ldo_dir_layout_version;
                        /* Is a slave stripe of striped directory? */
                        __u32           ldo_dir_slave_stripe:1,
                                        ldo_dir_striped:1,
                        /* Is a slave stripe of striped directory? */
                        __u32           ldo_dir_slave_stripe:1,
                                        ldo_dir_striped:1,
@@ -359,6 +360,7 @@ struct lod_thread_info {
        struct lu_attr                  lti_layout_attr;
        /* object allocation avoid guide info */
        struct lod_avoid_guide          lti_avoid;
        struct lu_attr                  lti_layout_attr;
        /* object allocation avoid guide info */
        struct lod_avoid_guide          lti_avoid;
+       union lmv_mds_md                lti_lmv;
 };
 
 extern const struct lu_device_operations lod_lu_ops;
 };
 
 extern const struct lu_device_operations lod_lu_ops;
index 8d3eb9a..27313ce 100644 (file)
@@ -1786,6 +1786,8 @@ out:
        lo->ldo_stripe = stripe;
        lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count);
        lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
        lo->ldo_stripe = stripe;
        lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count);
        lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count);
+       lo->ldo_dir_layout_version = le32_to_cpu(lmv1->lmv_layout_version);
+       lo->ldo_dir_hash_type = le32_to_cpu(lmv1->lmv_hash_type);
        if (rc != 0)
                lod_striping_free_nolock(env, lo);
 
        if (rc != 0)
                lod_striping_free_nolock(env, lo);
 
@@ -2283,278 +2285,61 @@ out:
 }
 
 /**
 }
 
 /**
- * Append source stripes after target stripes for migrating directory. NB, we
- * only need to declare this, the append is done inside lod_xattr_set_lmv().
+ * Set or replace striped directory layout, and LFSCK may set layout on a plain
+ * directory, so don't check stripe count.
  *
  * \param[in] env      execution environment
  * \param[in] dt       target object
  * \param[in] buf      LMV buf which contains source stripe fids
  *
  * \param[in] env      execution environment
  * \param[in] dt       target object
  * \param[in] buf      LMV buf which contains source stripe fids
+ * \param[in] fl       set or replace
  * \param[in] th       transaction handle
  *
  * \retval             0 on success
  * \retval             negative if failed
  */
  * \param[in] th       transaction handle
  *
  * \retval             0 on success
  * \retval             negative if failed
  */
-static int lod_dir_declare_layout_add(const struct lu_env *env,
-                                     struct dt_object *dt,
-                                     const struct lu_buf *buf,
-                                     struct thandle *th)
+static int lod_dir_layout_set(const struct lu_env *env,
+                             struct dt_object *dt,
+                             const struct lu_buf *buf,
+                             int fl,
+                             struct thandle *th)
 {
 {
-       struct lod_thread_info *info = lod_env_info(env);
-       struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
-       struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
-       struct lod_object *lo = lod_dt_obj(dt);
        struct dt_object *next = dt_object_child(dt);
        struct dt_object *next = dt_object_child(dt);
-       struct dt_object_format *dof = &info->lti_format;
+       struct lod_object *lo = lod_dt_obj(dt);
        struct lmv_mds_md_v1 *lmv = buf->lb_buf;
        struct lmv_mds_md_v1 *lmv = buf->lb_buf;
-       struct dt_object **stripe;
-       __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
-       struct lu_fid *fid = &info->lti_fid;
-       struct lod_tgt_desc *tgt;
-       struct dt_object *dto;
-       struct dt_device *tgt_dt;
-       int type = LU_SEQ_RANGE_ANY;
-       struct dt_insert_rec *rec = &info->lti_dt_rec;
-       char *stripe_name = info->lti_key;
-       struct lu_name *sname;
-       struct linkea_data ldata = { NULL };
-       struct lu_buf linkea_buf;
-       __u32 idx;
+       struct lmv_mds_md_v1 *slave_lmv;
+       struct lu_buf slave_buf;
        int i;
        int rc;
 
        ENTRY;
 
        int i;
        int rc;
 
        ENTRY;
 
-       if (!lmv_is_sane(lmv))
-               RETURN(-EINVAL);
-
-       dof->dof_type = DFT_DIR;
+       rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, fl, th);
+       if (rc)
+               RETURN(rc);
 
 
-       OBD_ALLOC(stripe,
-                 sizeof(*stripe) * (lo->ldo_dir_stripe_count + stripe_count));
-       if (stripe == NULL)
+       OBD_ALLOC_PTR(slave_lmv);
+       if (!slave_lmv)
                RETURN(-ENOMEM);
 
                RETURN(-ENOMEM);
 
-       for (i = 0; i < lo->ldo_dir_stripe_count; i++)
-               stripe[i] = lo->ldo_stripe[i];
-
-       for (i = 0; i < stripe_count; i++) {
-               fid_le_to_cpu(fid,
-                       &lmv->lmv_stripe_fids[i]);
-               if (!fid_is_sane(fid))
-                       continue;
-
-               rc = lod_fld_lookup(env, lod, fid, &idx, &type);
-               if (rc)
-                       GOTO(out, rc);
-
-               if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
-                       tgt_dt = lod->lod_child;
-               } else {
-                       tgt = LTD_TGT(ltd, idx);
-                       if (tgt == NULL)
-                               GOTO(out, rc = -ESTALE);
-                       tgt_dt = tgt->ltd_tgt;
-               }
-
-               dto = dt_locate_at(env, tgt_dt, fid,
-                                 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
-                                 NULL);
-               if (IS_ERR(dto))
-                       GOTO(out, rc = PTR_ERR(dto));
-
-               stripe[i + lo->ldo_dir_stripe_count] = dto;
-
-               if (!dt_try_as_dir(env, dto))
-                       GOTO(out, rc = -ENOTDIR);
-
-               rc = lod_sub_declare_ref_add(env, dto, th);
-               if (rc)
-                       GOTO(out, rc);
-
-               rc = lod_sub_declare_insert(env, dto,
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)dot, th);
-               if (rc)
-                       GOTO(out, rc);
-
-               rc = lod_sub_declare_insert(env, dto,
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)dotdot, th);
-               if (rc)
-                       GOTO(out, rc);
-
-               rc = lod_sub_declare_xattr_set(env, dto, buf,
-                                               XATTR_NAME_LMV, 0, th);
-               if (rc)
-                       GOTO(out, rc);
-
-               snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
-                        PFID(lu_object_fid(&dto->do_lu)),
-                        i + lo->ldo_dir_stripe_count);
-
-               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
-               rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
-                                     sname, lu_object_fid(&dt->do_lu));
-               if (rc)
-                       GOTO(out, rc);
-
-               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
-               linkea_buf.lb_len = ldata.ld_leh->leh_len;
-               rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
-                                              XATTR_NAME_LINK, 0, th);
-               if (rc)
-                       GOTO(out, rc);
-
-               rc = lod_sub_declare_insert(env, next,
-                                           (const struct dt_rec *)rec,
-                                           (const struct dt_key *)stripe_name,
-                                           th);
-               if (rc)
-                       GOTO(out, rc);
-
-               rc = lod_sub_declare_ref_add(env, next, th);
-               if (rc)
-                       GOTO(out, rc);
-       }
-
-       if (lo->ldo_stripe)
-               OBD_FREE(lo->ldo_stripe,
-                        sizeof(*stripe) * lo->ldo_dir_stripes_allocated);
-       lo->ldo_stripe = stripe;
-       lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count;
-       lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type);
-       lo->ldo_dir_stripe_count += stripe_count;
-       lo->ldo_dir_stripes_allocated += stripe_count;
-       lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION;
-
-       RETURN(0);
-out:
-       i = lo->ldo_dir_stripe_count;
-       while (i < lo->ldo_dir_stripe_count + stripe_count && stripe[i])
-               dt_object_put(env, stripe[i++]);
-
-       OBD_FREE(stripe,
-                sizeof(*stripe) * (stripe_count + lo->ldo_dir_stripe_count));
-       RETURN(rc);
-}
-
-static int lod_dir_declare_layout_delete(const struct lu_env *env,
-                                        struct dt_object *dt,
-                                        const struct lu_buf *buf,
-                                        struct thandle *th)
-{
-       struct lod_thread_info *info = lod_env_info(env);
-       struct lod_object *lo = lod_dt_obj(dt);
-       struct dt_object *next = dt_object_child(dt);
-       struct lmv_user_md *lmu = buf->lb_buf;
-       __u32 final_stripe_count;
-       char *stripe_name = info->lti_key;
-       struct dt_object *dto;
-       int i;
-       int rc = 0;
-
-       if (!lmu)
-               return -EINVAL;
-
-       final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
-       if (final_stripe_count >= lo->ldo_dir_stripe_count)
-               return -EINVAL;
+       lod_prep_slave_lmv_md(slave_lmv, lmv);
+       slave_buf.lb_buf = slave_lmv;
+       slave_buf.lb_len = sizeof(*slave_lmv);
 
 
-       for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
-               dto = lo->ldo_stripe[i];
-               if (!dto)
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+               if (!lo->ldo_stripe[i])
                        continue;
 
                        continue;
 
-               if (!dt_try_as_dir(env, dto))
-                       return -ENOTDIR;
-
-               rc = lod_sub_declare_delete(env, dto,
-                                           (const struct dt_key *)dot, th);
-               if (rc)
-                       return rc;
-
-               rc = lod_sub_declare_ref_del(env, dto, th);
-               if (rc)
-                       return rc;
-
-               rc = lod_sub_declare_delete(env, dto,
-                                       (const struct dt_key *)dotdot, th);
-               if (rc)
-                       return rc;
-
-               snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
-                        PFID(lu_object_fid(&dto->do_lu)), i);
-
-               rc = lod_sub_declare_delete(env, next,
-                                       (const struct dt_key *)stripe_name, th);
-               if (rc)
-                       return rc;
-
-               rc = lod_sub_declare_ref_del(env, next, th);
-               if (rc)
-                       return rc;
-       }
-
-       return 0;
-}
-
-/*
- * delete stripes from dir master object, the lum_stripe_count in argument is
- * the final stripe count, the stripes after that will be deleted, NB, they
- * are not destroyed, but deleted from it's parent namespace, this function
- * will be called in two places:
- * 1. mdd_migrate_create() delete stripes from source, and append them to
- *    target.
- * 2. mdd_dir_layout_shrink() delete stripes from source, and destroy them.
- */
-static int lod_dir_layout_delete(const struct lu_env *env,
-                                struct dt_object *dt,
-                                const struct lu_buf *buf,
-                                struct thandle *th)
-{
-       struct lod_thread_info *info = lod_env_info(env);
-       struct lod_object *lo = lod_dt_obj(dt);
-       struct dt_object *next = dt_object_child(dt);
-       struct lmv_user_md *lmu = buf->lb_buf;
-       __u32 final_stripe_count;
-       char *stripe_name = info->lti_key;
-       struct dt_object *dto;
-       int i;
-       int rc = 0;
-
-       ENTRY;
-
-       if (!lmu)
-               RETURN(-EINVAL);
-
-       final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
-       if (final_stripe_count >= lo->ldo_dir_stripe_count)
-               RETURN(-EINVAL);
-
-       for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
-               dto = lo->ldo_stripe[i];
-               if (!dto)
+               if (!dt_object_exists(lo->ldo_stripe[i]))
                        continue;
 
                        continue;
 
-               rc = lod_sub_delete(env, dto,
-                                   (const struct dt_key *)dotdot, th);
-               if (rc)
-                       break;
-
-               snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
-                        PFID(lu_object_fid(&dto->do_lu)), i);
-
-               rc = lod_sub_delete(env, next,
-                                   (const struct dt_key *)stripe_name, th);
-               if (rc)
-                       break;
-
-               rc = lod_sub_ref_del(env, next, th);
+               rc = lod_sub_xattr_set(env, lo->ldo_stripe[i], &slave_buf,
+                                      XATTR_NAME_LMV, fl, th);
                if (rc)
                        break;
        }
 
        lod_striping_free(env, lod_dt_obj(dt));
                if (rc)
                        break;
        }
 
        lod_striping_free(env, lod_dt_obj(dt));
+       OBD_FREE_PTR(slave_lmv);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -3590,20 +3375,6 @@ static int lod_declare_xattr_set(const struct lu_env *env,
                        RETURN(-ENOENT);
 
                rc = lod_declare_modify_layout(env, dt, name, buf, th);
                        RETURN(-ENOENT);
 
                rc = lod_declare_modify_layout(env, dt, name, buf, th);
-       } else if (strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 &&
-                  strlen(name) > strlen(XATTR_NAME_LMV)) {
-               const char *op = name + strlen(XATTR_NAME_LMV);
-
-               rc = -ENOTSUPP;
-               if (strcmp(op, ".add") == 0)
-                       rc = lod_dir_declare_layout_add(env, dt, buf, th);
-               else if (strcmp(op, ".del") == 0)
-                       rc = lod_dir_declare_layout_delete(env, dt, buf, th);
-               else if (strcmp(op, ".set") == 0)
-                       rc = lod_sub_declare_xattr_set(env, next, buf,
-                                                      XATTR_NAME_LMV, fl, th);
-
-               RETURN(rc);
        } else if (S_ISDIR(mode)) {
                rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
        } else if (strcmp(name, XATTR_NAME_FID) == 0) {
        } else if (S_ISDIR(mode)) {
                rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
        } else if (strcmp(name, XATTR_NAME_FID) == 0) {
@@ -3982,7 +3753,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                                                        cpu_to_le32(i);
 
                        rc = lod_sub_xattr_set(env, dto, &slave_lmv_buf,
                                                        cpu_to_le32(i);
 
                        rc = lod_sub_xattr_set(env, dto, &slave_lmv_buf,
-                                              XATTR_NAME_LMV, fl, th);
+                                              XATTR_NAME_LMV, 0, th);
                        if (rc != 0)
                                GOTO(out, rc);
                }
                        if (rc != 0)
                                GOTO(out, rc);
                }
@@ -4603,29 +4374,24 @@ static int lod_xattr_set(const struct lu_env *env,
                         struct dt_object *dt, const struct lu_buf *buf,
                         const char *name, int fl, struct thandle *th)
 {
                         struct dt_object *dt, const struct lu_buf *buf,
                         const char *name, int fl, struct thandle *th)
 {
-       struct dt_object        *next = dt_object_child(dt);
-       int                      rc;
+       struct dt_object *next = dt_object_child(dt);
+       int rc;
+
        ENTRY;
 
        if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
        ENTRY;
 
        if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
-           strcmp(name, XATTR_NAME_LMV) == 0) {
-               rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
-               RETURN(rc);
-       } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
-                  strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 &&
-                  strlen(name) > strlen(XATTR_NAME_LMV)) {
-               const char *op = name + strlen(XATTR_NAME_LMV);
-
-               rc = -ENOTSUPP;
-               /*
-                * XATTR_NAME_LMV".add" is never called, but only declared,
-                * because lod_xattr_set_lmv() will do the addition.
-                */
-               if (strcmp(op, ".del") == 0)
-                       rc = lod_dir_layout_delete(env, dt, buf, th);
-               else if (strcmp(op, ".set") == 0)
-                       rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV,
-                                              fl, th);
+           !strcmp(name, XATTR_NAME_LMV)) {
+               switch (fl) {
+               case LU_XATTR_CREATE:
+                       rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
+                       break;
+               case 0:
+               case LU_XATTR_REPLACE:
+                       rc = lod_dir_layout_set(env, dt, buf, fl, th);
+                       break;
+               default:
+                       LBUG();
+               }
 
                RETURN(rc);
        } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
 
                RETURN(rc);
        } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
@@ -7779,15 +7545,549 @@ out:
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static int lod_declare_layout_change(const struct lu_env *env,
-               struct dt_object *dt, struct md_layout_change *mlc,
-               struct thandle *th)
+typedef int (*mlc_handler)(const struct lu_env *env, struct dt_object *dt,
+                          const struct md_layout_change *mlc,
+                          struct thandle *th);
+
+/**
+ * Attach stripes after target's for migrating directory. NB, we
+ * only need to declare this, the actual work is done inside
+ * lod_xattr_set_lmv().
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       target object
+ * \param[in] mlc      layout change data
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 on success
+ * \retval             negative if failed
+ */
+static int lod_dir_declare_layout_attach(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        const struct md_layout_change *mlc,
+                                        struct thandle *th)
 {
 {
-       struct lod_thread_info  *info = lod_env_info(env);
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
        struct lod_object *lo = lod_dt_obj(dt);
        struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next = dt_object_child(dt);
+       struct dt_object_format *dof = &info->lti_format;
+       struct lmv_mds_md_v1 *lmv = mlc->mlc_buf.lb_buf;
+       struct dt_object **stripes;
+       __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+       struct lu_fid *fid = &info->lti_fid;
+       struct lod_tgt_desc *tgt;
+       struct dt_object *dto;
+       struct dt_device *tgt_dt;
+       int type = LU_SEQ_RANGE_ANY;
+       struct dt_insert_rec *rec = &info->lti_dt_rec;
+       char *stripe_name = info->lti_key;
+       struct lu_name *sname;
+       struct linkea_data ldata = { NULL };
+       struct lu_buf linkea_buf;
+       __u32 idx;
+       int i;
+       int rc;
+
+       ENTRY;
+
+       if (!lmv_is_sane(lmv))
+               RETURN(-EINVAL);
+
+       if (!dt_try_as_dir(env, dt))
+               return -ENOTDIR;
+
+       dof->dof_type = DFT_DIR;
+
+       OBD_ALLOC(stripes,
+                 sizeof(*stripes) * (lo->ldo_dir_stripe_count + stripe_count));
+       if (!stripes)
+               RETURN(-ENOMEM);
+
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++)
+               stripes[i] = lo->ldo_stripe[i];
+
+       rec->rec_type = S_IFDIR;
+
+       for (i = 0; i < stripe_count; i++) {
+               fid_le_to_cpu(fid,
+                       &lmv->lmv_stripe_fids[i]);
+               if (!fid_is_sane(fid))
+                       continue;
+
+               rc = lod_fld_lookup(env, lod, fid, &idx, &type);
+               if (rc)
+                       GOTO(out, rc);
+
+               if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
+                       tgt_dt = lod->lod_child;
+               } else {
+                       tgt = LTD_TGT(ltd, idx);
+                       if (tgt == NULL)
+                               GOTO(out, rc = -ESTALE);
+                       tgt_dt = tgt->ltd_tgt;
+               }
+
+               dto = dt_locate_at(env, tgt_dt, fid,
+                                 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
+                                 NULL);
+               if (IS_ERR(dto))
+                       GOTO(out, rc = PTR_ERR(dto));
+
+               stripes[i + lo->ldo_dir_stripe_count] = dto;
+
+               if (!dt_try_as_dir(env, dto))
+                       GOTO(out, rc = -ENOTDIR);
+
+               rc = lod_sub_declare_ref_add(env, dto, th);
+               if (rc)
+                       GOTO(out, rc);
+
+               rec->rec_fid = lu_object_fid(&dto->do_lu);
+               rc = lod_sub_declare_insert(env, dto,
+                                           (const struct dt_rec *)rec,
+                                           (const struct dt_key *)dot, th);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_sub_declare_insert(env, dto,
+                                           (const struct dt_rec *)rec,
+                                           (const struct dt_key *)dotdot, th);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_sub_declare_xattr_set(env, dto, &mlc->mlc_buf,
+                                               XATTR_NAME_LMV, 0, th);
+               if (rc)
+                       GOTO(out, rc);
+
+               snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
+                        PFID(lu_object_fid(&dto->do_lu)),
+                        i + lo->ldo_dir_stripe_count);
+
+               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
+               rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+                                     sname, lu_object_fid(&dt->do_lu));
+               if (rc)
+                       GOTO(out, rc);
+
+               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+               linkea_buf.lb_len = ldata.ld_leh->leh_len;
+               rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
+                                              XATTR_NAME_LINK, 0, th);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_sub_declare_insert(env, next,
+                                           (const struct dt_rec *)rec,
+                                           (const struct dt_key *)stripe_name,
+                                           th);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_sub_declare_ref_add(env, next, th);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       if (lo->ldo_stripe)
+               OBD_FREE(lo->ldo_stripe,
+                        sizeof(*stripes) * lo->ldo_dir_stripes_allocated);
+       lo->ldo_stripe = stripes;
+       lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count;
+       lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type);
+       lo->ldo_dir_stripe_count += stripe_count;
+       lo->ldo_dir_stripes_allocated += stripe_count;
+       lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION;
+
+       RETURN(0);
+out:
+       i = lo->ldo_dir_stripe_count;
+       while (i < lo->ldo_dir_stripe_count + stripe_count && stripes[i])
+               dt_object_put(env, stripes[i++]);
+
+       OBD_FREE(stripes,
+                sizeof(*stripes) * (stripe_count + lo->ldo_dir_stripe_count));
+       return rc;
+}
+
+static int lod_dir_declare_layout_detach(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        const struct md_layout_change *unused,
+                                        struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next = dt_object_child(dt);
+       char *stripe_name = info->lti_key;
+       struct dt_object *dto;
+       int i;
+       int rc = 0;
+
+       if (!dt_try_as_dir(env, dt))
+               return -ENOTDIR;
+
+       if (!lo->ldo_dir_stripe_count)
+               return lod_sub_declare_delete(env, next,
+                                       (const struct dt_key *)dotdot, th);
+
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+               dto = lo->ldo_stripe[i];
+               if (!dto)
+                       continue;
+
+               if (!dt_try_as_dir(env, dto))
+                       return -ENOTDIR;
+
+               rc = lod_sub_declare_delete(env, dto,
+                                       (const struct dt_key *)dotdot, th);
+               if (rc)
+                       return rc;
+
+               snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+                        PFID(lu_object_fid(&dto->do_lu)), i);
+
+               rc = lod_sub_declare_delete(env, next,
+                                       (const struct dt_key *)stripe_name, th);
+               if (rc)
+                       return rc;
+
+               rc = lod_sub_declare_ref_del(env, next, th);
+               if (rc)
+                       return rc;
+       }
+
+       return 0;
+}
+
+static int dt_dir_is_empty(const struct lu_env *env,
+                          struct dt_object *obj)
+{
+       struct dt_it *it;
+       const struct dt_it_ops *iops;
+       int rc;
+
+       ENTRY;
+
+       if (!dt_try_as_dir(env, obj))
+               RETURN(-ENOTDIR);
+
+       iops = &obj->do_index_ops->dio_it;
+       it = iops->init(env, obj, LUDA_64BITHASH);
+       if (IS_ERR(it))
+               RETURN(PTR_ERR(it));
+
+       rc = iops->get(env, it, (const struct dt_key *)"");
+       if (rc > 0) {
+               int i;
+
+               for (rc = 0, i = 0; rc == 0 && i < 3; ++i)
+                       rc = iops->next(env, it);
+               if (!rc)
+                       rc = -ENOTEMPTY;
+               else if (rc == 1)
+                       rc = 0;
+       } else if (!rc) {
+               /* Huh? Index contains no zero key? */
+               rc = -EIO;
+       }
+
+       iops->put(env, it);
+       iops->fini(env, it);
+
+       RETURN(rc);
+}
+
+static int lod_dir_declare_layout_shrink(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        const struct md_layout_change *mlc,
+                                        struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next = dt_object_child(dt);
+       struct lmv_user_md *lmu = mlc->mlc_buf.lb_buf;
+       __u32 final_stripe_count;
+       char *stripe_name = info->lti_key;
+       struct lu_buf *lmv_buf = &info->lti_buf;
+       struct dt_object *dto;
+       int i;
+       int rc;
+
+       LASSERT(lmu);
+
+       if (!dt_try_as_dir(env, dt))
+               return -ENOTDIR;
+
+       /* shouldn't be called on plain directory */
+       LASSERT(lo->ldo_dir_stripe_count);
+
+       lmv_buf->lb_buf = &info->lti_lmv.lmv_md_v1;
+       lmv_buf->lb_len = sizeof(info->lti_lmv.lmv_md_v1);
+
+       final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
+       LASSERT(final_stripe_count &&
+               final_stripe_count < lo->ldo_dir_stripe_count);
+
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+               dto = lo->ldo_stripe[i];
+               if (!dto)
+                       continue;
+
+               if (i < final_stripe_count) {
+                       if (final_stripe_count == 1)
+                               continue;
+
+                       rc = lod_sub_declare_xattr_set(env, dto, lmv_buf,
+                                                      XATTR_NAME_LMV,
+                                                      LU_XATTR_REPLACE, th);
+                       if (rc)
+                               return rc;
+
+                       continue;
+               }
+
+               rc = dt_dir_is_empty(env, dto);
+               if (rc < 0)
+                       return rc;
+
+               rc = lod_sub_declare_ref_del(env, dto, th);
+               if (rc)
+                       return rc;
+
+               rc = lod_sub_declare_destroy(env, dto, th);
+               if (rc)
+                       return rc;
+
+               snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+                        PFID(lu_object_fid(&dto->do_lu)), i);
+
+               rc = lod_sub_declare_delete(env, next,
+                                       (const struct dt_key *)stripe_name, th);
+               if (rc)
+                       return rc;
+
+               rc = lod_sub_declare_ref_del(env, next, th);
+               if (rc)
+                       return rc;
+       }
+
+       rc = lod_sub_declare_xattr_set(env, next, lmv_buf, XATTR_NAME_LMV,
+                                      LU_XATTR_REPLACE, th);
+       return rc;
+}
+
+/*
+ * detach all stripes from dir master object, NB, stripes are not destroyed, but
+ * deleted from it's parent namespace, this function is called in two places:
+ * 1. mdd_migrate_mdt() detach stripes from source, and attach them to
+ *    target.
+ * 2. mdd_dir_layout_update() detach stripe before turning 1-stripe directory to
+ *    a plain directory.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       target object
+ * \param[in] mlc      layout change data
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 on success
+ * \retval             negative if failed
+ */
+static int lod_dir_layout_detach(const struct lu_env *env,
+                                struct dt_object *dt,
+                                const struct md_layout_change *mlc,
+                                struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next = dt_object_child(dt);
+       char *stripe_name = info->lti_key;
+       struct dt_object *dto;
+       int i;
+       int rc = 0;
+
+       ENTRY;
+
+       if (!lo->ldo_dir_stripe_count) {
+               /* plain directory delete .. */
+               rc = lod_sub_delete(env, next,
+                                   (const struct dt_key *)dotdot, th);
+               RETURN(rc);
+       }
+
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+               dto = lo->ldo_stripe[i];
+               if (!dto)
+                       continue;
+
+               rc = lod_sub_delete(env, dto,
+                                   (const struct dt_key *)dotdot, th);
+               if (rc)
+                       break;
+
+               snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+                        PFID(lu_object_fid(&dto->do_lu)), i);
+
+               rc = lod_sub_delete(env, next,
+                                   (const struct dt_key *)stripe_name, th);
+               if (rc)
+                       break;
+
+               rc = lod_sub_ref_del(env, next, th);
+               if (rc)
+                       break;
+       }
+
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+               dto = lo->ldo_stripe[i];
+               if (dto)
+                       dt_object_put(env, dto);
+       }
+       OBD_FREE(lo->ldo_stripe,
+                sizeof(struct dt_object *) * lo->ldo_dir_stripes_allocated);
+       lo->ldo_stripe = NULL;
+       lo->ldo_dir_stripes_allocated = 0;
+       lo->ldo_dir_stripe_count = 0;
+
+       RETURN(rc);
+}
+
+static int lod_dir_layout_shrink(const struct lu_env *env,
+                                struct dt_object *dt,
+                                const struct md_layout_change *mlc,
+                                struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+       struct dt_object *next = dt_object_child(dt);
+       struct lmv_user_md *lmu = mlc->mlc_buf.lb_buf;
+       __u32 final_stripe_count;
+       char *stripe_name = info->lti_key;
+       struct dt_object *dto;
+       struct lu_buf *lmv_buf = &info->lti_buf;
+       struct lmv_mds_md_v1 *lmv = &info->lti_lmv.lmv_md_v1;
+       u32 mdtidx;
+       int type = LU_SEQ_RANGE_ANY;
+       int i;
        int rc;
        int rc;
+
        ENTRY;
 
        ENTRY;
 
+       final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
+
+       lmv_buf->lb_buf = lmv;
+       lmv_buf->lb_len = sizeof(*lmv);
+       lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
+       lmv->lmv_stripe_count = cpu_to_le32(final_stripe_count);
+       lmv->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type) &
+                            cpu_to_le32(LMV_HASH_TYPE_MASK);
+       lmv->lmv_layout_version =
+                       cpu_to_le32(lo->ldo_dir_layout_version + 1);
+
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
+               dto = lo->ldo_stripe[i];
+               if (!dto)
+                       continue;
+
+               if (i < final_stripe_count) {
+                       /* if only one stripe left, no need to update
+                        * LMV because this stripe will replace master
+                        * object and act as a plain directory.
+                        */
+                       if (final_stripe_count == 1)
+                               continue;
+
+
+                       rc = lod_fld_lookup(env, lod,
+                                           lu_object_fid(&dto->do_lu),
+                                           &mdtidx, &type);
+                       if (rc)
+                               RETURN(rc);
+
+                       lmv->lmv_master_mdt_index = cpu_to_le32(mdtidx);
+                       rc = lod_sub_xattr_set(env, dto, lmv_buf,
+                                              XATTR_NAME_LMV,
+                                              LU_XATTR_REPLACE, th);
+                       if (rc)
+                               RETURN(rc);
+
+                       continue;
+               }
+
+               dt_write_lock(env, dto, DT_TGT_CHILD);
+               rc = lod_sub_ref_del(env, dto, th);
+               dt_write_unlock(env, dto);
+               if (rc)
+                       RETURN(rc);
+
+               rc = lod_sub_destroy(env, dto, th);
+               if (rc)
+                       RETURN(rc);
+
+               snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+                        PFID(lu_object_fid(&dto->do_lu)), i);
+
+               rc = lod_sub_delete(env, next,
+                                   (const struct dt_key *)stripe_name, th);
+               if (rc)
+                       RETURN(rc);
+
+               rc = lod_sub_ref_del(env, next, th);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &mdtidx,
+                           &type);
+       if (rc)
+               RETURN(rc);
+
+       lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+       lmv->lmv_master_mdt_index = cpu_to_le32(mdtidx);
+       rc = lod_sub_xattr_set(env, next, lmv_buf, XATTR_NAME_LMV,
+                              LU_XATTR_REPLACE, th);
+       if (rc)
+               RETURN(rc);
+
+       for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
+               dto = lo->ldo_stripe[i];
+               if (dto)
+                       dt_object_put(env, dto);
+       }
+       lo->ldo_dir_stripe_count = final_stripe_count;
+
+       RETURN(rc);
+}
+
+static mlc_handler dir_mlc_declare_ops[MD_LAYOUT_MAX] = {
+       [MD_LAYOUT_ATTACH] = lod_dir_declare_layout_attach,
+       [MD_LAYOUT_DETACH] = lod_dir_declare_layout_detach,
+       [MD_LAYOUT_SHRINK] = lod_dir_declare_layout_shrink,
+};
+
+static mlc_handler dir_mlc_ops[MD_LAYOUT_MAX] = {
+       [MD_LAYOUT_DETACH] = lod_dir_layout_detach,
+       [MD_LAYOUT_SHRINK] = lod_dir_layout_shrink,
+};
+
+static int lod_declare_layout_change(const struct lu_env *env,
+               struct dt_object *dt, struct md_layout_change *mlc,
+               struct thandle *th)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       int rc;
+
+       ENTRY;
+
+       if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
+               LASSERT(dir_mlc_declare_ops[mlc->mlc_opc]);
+               rc = dir_mlc_declare_ops[mlc->mlc_opc](env, dt, mlc, th);
+               RETURN(rc);
+       }
+
        if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
            dt_object_remote(dt_object_child(dt)))
                RETURN(-EINVAL);
        if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
            dt_object_remote(dt_object_child(dt)))
                RETURN(-EINVAL);
@@ -7835,13 +8135,21 @@ static int lod_layout_change(const struct lu_env *env, struct dt_object *dt,
        struct lod_object *lo = lod_dt_obj(dt);
        int rc;
 
        struct lod_object *lo = lod_dt_obj(dt);
        int rc;
 
+       ENTRY;
+
+       if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
+               LASSERT(dir_mlc_ops[mlc->mlc_opc]);
+               rc = dir_mlc_ops[mlc->mlc_opc](env, dt, mlc, th);
+               RETURN(rc);
+       }
+
        rc = lod_striped_create(env, dt, attr, NULL, th);
        if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) {
                layout_attr->la_layout_version |= lo->ldo_layout_gen;
                rc = lod_attr_set(env, dt, layout_attr, th);
        }
 
        rc = lod_striped_create(env, dt, attr, NULL, th);
        if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) {
                layout_attr->la_layout_version |= lo->ldo_layout_gen;
                rc = lod_attr_set(env, dt, layout_attr, th);
        }
 
-       return rc;
+       RETURN(rc);
 }
 
 struct dt_object_operations lod_obj_ops = {
 }
 
 struct dt_object_operations lod_obj_ops = {
index b3e4397..606e29c 100644 (file)
@@ -2188,7 +2188,7 @@ static int mdd_declare_create_object(const struct lu_env *env,
                rc = mdo_declare_xattr_set(env, c, buf,
                                           S_ISDIR(attr->la_mode) ?
                                                XATTR_NAME_LMV : XATTR_NAME_LOV,
                rc = mdo_declare_xattr_set(env, c, buf,
                                           S_ISDIR(attr->la_mode) ?
                                                XATTR_NAME_LMV : XATTR_NAME_LOV,
-                                          0, handle);
+                                          LU_XATTR_CREATE, handle);
                if (rc)
                        GOTO(out, rc);
 
                if (rc)
                        GOTO(out, rc);
 
@@ -2382,7 +2382,7 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
                rc = mdo_xattr_set(env, son, buf,
                                   S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
                                                            XATTR_NAME_LOV,
                rc = mdo_xattr_set(env, son, buf,
                                   S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
                                                            XATTR_NAME_LOV,
-                                  0, handle);
+                                  LU_XATTR_CREATE, handle);
                if (rc != 0)
                        GOTO(err_destroy, rc);
        }
                if (rc != 0)
                        GOTO(err_destroy, rc);
        }
@@ -3310,232 +3310,6 @@ static int mdd_migrate_sanity_check(const struct lu_env *env,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-typedef int (*mdd_dir_stripe_cb)(const struct lu_env *env,
-                                struct mdd_object *obj,
-                                struct mdd_object *stripe,
-                                const struct lu_buf *lmv_buf,
-                                const struct lu_buf *lmu_buf,
-                                int index,
-                                struct thandle *handle);
-
-static int mdd_dir_declare_delete_stripe(const struct lu_env *env,
-                                        struct mdd_object *obj,
-                                        struct mdd_object *stripe,
-                                        const struct lu_buf *lmv_buf,
-                                        const struct lu_buf *lmu_buf,
-                                        int index,
-                                        struct thandle *handle)
-{
-       struct mdd_thread_info *info = mdd_env_info(env);
-       char *stripe_name = info->mti_name;
-       struct lmv_user_md *lmu = lmu_buf->lb_buf;
-       int rc;
-
-       if (index < le32_to_cpu(lmu->lum_stripe_count))
-               return 0;
-
-       rc = mdo_declare_index_delete(env, stripe, dotdot, handle);
-       if (rc)
-               return rc;
-
-       snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
-                PFID(mdd_object_fid(stripe)), index);
-
-       rc = mdo_declare_index_delete(env, obj, stripe_name, handle);
-       if (rc)
-               return rc;
-
-       rc = mdo_declare_ref_del(env, obj, handle);
-
-       return rc;
-}
-
-/* delete stripe from its master object namespace */
-static int mdd_dir_delete_stripe(const struct lu_env *env,
-                                struct mdd_object *obj,
-                                struct mdd_object *stripe,
-                                const struct lu_buf *lmv_buf,
-                                const struct lu_buf *lmu_buf,
-                                int index,
-                                struct thandle *handle)
-{
-       struct mdd_thread_info *info = mdd_env_info(env);
-       char *stripe_name = info->mti_name;
-       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
-       struct lmv_user_md *lmu = lmu_buf->lb_buf;
-       __u32 del_offset = le32_to_cpu(lmu->lum_stripe_count);
-       int rc;
-
-       ENTRY;
-
-       /* local dir will delete via LOD */
-       LASSERT(mdd_object_remote(obj));
-       LASSERT(del_offset < le32_to_cpu(lmv->lmv_stripe_count));
-
-       if (index < del_offset)
-               RETURN(0);
-
-       mdd_write_lock(env, stripe, DT_SRC_CHILD);
-       rc = __mdd_index_delete_only(env, stripe, dotdot, handle);
-       if (rc)
-               GOTO(out, rc);
-
-       snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
-                PFID(mdd_object_fid(stripe)), index);
-
-       rc = __mdd_index_delete_only(env, obj, stripe_name, handle);
-       if (rc)
-               GOTO(out, rc);
-
-       rc = mdo_ref_del(env, obj, handle);
-       GOTO(out, rc);
-out:
-       mdd_write_unlock(env, stripe);
-
-       return rc;
-}
-
-static int mdd_dir_declare_destroy_stripe(const struct lu_env *env,
-                                         struct mdd_object *obj,
-                                         struct mdd_object *stripe,
-                                         const struct lu_buf *lmv_buf,
-                                         const struct lu_buf *lmu_buf,
-                                         int index,
-                                         struct thandle *handle)
-{
-       struct lmv_user_md *lmu = lmu_buf->lb_buf;
-       __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
-       int rc;
-
-       if (index < shrink_offset) {
-               if (shrink_offset < 2)
-                       return 0;
-               return mdo_declare_xattr_set(env, stripe, lmv_buf,
-                                            XATTR_NAME_LMV".set", 0, handle);
-       }
-
-       rc = mdo_declare_ref_del(env, stripe, handle);
-       if (rc)
-               return rc;
-
-       rc = mdo_declare_destroy(env, stripe, handle);
-
-       return rc;
-}
-
-static int mdd_dir_destroy_stripe(const struct lu_env *env,
-                                 struct mdd_object *obj,
-                                 struct mdd_object *stripe,
-                                 const struct lu_buf *lmv_buf,
-                                 const struct lu_buf *lmu_buf,
-                                 int index,
-                                 struct thandle *handle)
-{
-       struct mdd_thread_info *info = mdd_env_info(env);
-       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
-       struct lmv_user_md *lmu = lmu_buf->lb_buf;
-       __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
-       int rc;
-
-       ENTRY;
-
-       /* update remaining stripes' LMV */
-       if (index < shrink_offset) {
-               struct lmv_mds_md_v1 *slave_lmv;
-               struct lu_buf slave_buf = {
-                               .lb_buf = &info->mti_lmv.lmv_md_v1,
-                               .lb_len = sizeof(*slave_lmv)
-               };
-               __u32 version = le32_to_cpu(lmv->lmv_layout_version);
-
-               /* if dir will be shrunk to 1-stripe, don't update */
-               if (shrink_offset < 2)
-                       RETURN(0);
-
-               slave_lmv = slave_buf.lb_buf;
-               memset(slave_lmv, 0, sizeof(*slave_lmv));
-               slave_lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE);
-               slave_lmv->lmv_stripe_count = lmu->lum_stripe_count;
-               slave_lmv->lmv_master_mdt_index = cpu_to_le32(index);
-               slave_lmv->lmv_hash_type = lmv->lmv_hash_type &
-                                          cpu_to_le32(LMV_HASH_TYPE_MASK);
-               slave_lmv->lmv_layout_version = cpu_to_le32(++version);
-
-               rc = mdo_xattr_set(env, stripe, &slave_buf,
-                                  XATTR_NAME_LMV".set", 0, handle);
-               RETURN(rc);
-       }
-
-       mdd_write_lock(env, stripe, DT_SRC_CHILD);
-       rc = mdo_ref_del(env, stripe, handle);
-       if (!rc)
-               rc = mdo_destroy(env, stripe, handle);
-       mdd_write_unlock(env, stripe);
-
-       RETURN(rc);
-}
-
-static int mdd_shrink_stripe_is_empty(const struct lu_env *env,
-                                      struct mdd_object *obj,
-                                      struct mdd_object *stripe,
-                                      const struct lu_buf *lmv_buf,
-                                      const struct lu_buf *lmu_buf,
-                                      int index,
-                                      struct thandle *handle)
-{
-       struct lmv_user_md *lmu = lmu_buf->lb_buf;
-       __u32 shrink_offset = le32_to_cpu(lmu->lum_stripe_count);
-
-       /* the default value is 0, but it means 1 */
-       if (!shrink_offset)
-               shrink_offset = 1;
-
-       if (index < shrink_offset)
-               return 0;
-
-       return mdd_dir_is_empty(env, stripe);
-}
-
-/*
- * iterate stripes of striped directory on remote MDT, local striped directory
- * is accessed via LOD.
- */
-static int mdd_dir_iterate_stripes(const struct lu_env *env,
-                                  struct mdd_object *obj,
-                                  const struct lu_buf *lmv_buf,
-                                  const struct lu_buf *lmu_buf,
-                                  struct thandle *handle,
-                                  mdd_dir_stripe_cb cb)
-{
-       struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
-       struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
-       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
-       struct mdd_object *stripe;
-       int i;
-       int rc;
-
-       ENTRY;
-
-       LASSERT(lmv);
-
-       for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
-               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
-               if (!fid_is_sane(fid))
-                       continue;
-
-               stripe = mdd_object_find(env, mdd, fid);
-               if (IS_ERR(stripe))
-                       RETURN(PTR_ERR(stripe));
-
-               rc = cb(env, obj, stripe, lmv_buf, lmu_buf, i, handle);
-               mdd_object_put(env, stripe);
-               if (rc)
-                       RETURN(rc);
-       }
-
-       RETURN(0);
-}
-
 typedef int (*mdd_xattr_cb)(const struct lu_env *env,
                            struct mdd_object *obj,
                            const struct lu_buf *buf,
 typedef int (*mdd_xattr_cb)(const struct lu_env *env,
                            struct mdd_object *obj,
                            const struct lu_buf *buf,
@@ -3895,52 +3669,6 @@ static int migrate_linkea_prepare(const struct lu_env *env,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static int mdd_dir_declare_layout_delete(const struct lu_env *env,
-                                        struct mdd_object *obj,
-                                        const struct lu_buf *lmv_buf,
-                                        const struct lu_buf *lmu_buf,
-                                        struct thandle *handle)
-{
-       int rc;
-
-       if (!lmv_buf->lb_buf)
-               rc = mdo_declare_index_delete(env, obj, dotdot, handle);
-       else if (mdd_object_remote(obj))
-               rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
-                                            mdd_dir_declare_delete_stripe);
-       else
-               rc = mdo_declare_xattr_set(env, obj, lmu_buf,
-                                          XATTR_NAME_LMV".del", 0, handle);
-
-       return rc;
-}
-
-static int mdd_dir_layout_delete(const struct lu_env *env,
-                                struct mdd_object *obj,
-                                const struct lu_buf *lmv_buf,
-                                const struct lu_buf *lmu_buf,
-                                struct thandle *handle)
-{
-       int rc;
-
-       ENTRY;
-
-       mdd_write_lock(env, obj, DT_SRC_PARENT);
-       if (!lmv_buf->lb_buf)
-               /* normal dir */
-               rc = __mdd_index_delete_only(env, obj, dotdot, handle);
-       else if (mdd_object_remote(obj))
-               /* striped, but remote */
-               rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
-                                            mdd_dir_delete_stripe);
-       else
-               rc = mdo_xattr_set(env, obj, lmu_buf, XATTR_NAME_LMV".del", 0,
-                                  handle);
-       mdd_write_unlock(env, obj);
-
-       RETURN(rc);
-}
-
 static int mdd_declare_migrate_create(const struct lu_env *env,
                                      struct mdd_object *tpobj,
                                      struct mdd_object *sobj,
 static int mdd_declare_migrate_create(const struct lu_env *env,
                                      struct mdd_object *tpobj,
                                      struct mdd_object *sobj,
@@ -3954,31 +3682,15 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
                                      struct thandle *handle)
 {
        struct mdd_thread_info *info = mdd_env_info(env);
                                      struct thandle *handle)
 {
        struct mdd_thread_info *info = mdd_env_info(env);
+       struct md_layout_change *mlc = &info->mti_mlc;
        struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
        int rc;
 
        if (S_ISDIR(attr->la_mode)) {
        struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
        int rc;
 
        if (S_ISDIR(attr->la_mode)) {
-               struct lu_buf lmu_buf = { NULL };
-
-               if (lmv) {
-                       struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
-
-                       lmu->lum_stripe_count = 0;
-                       lmu_buf.lb_buf = lmu;
-                       lmu_buf.lb_len = sizeof(*lmu);
-               }
-
-               rc = mdd_dir_declare_layout_delete(env, sobj, sbuf, &lmu_buf,
-                                                  handle);
+               mlc->mlc_opc = MD_LAYOUT_DETACH;
+               rc = mdo_declare_layout_change(env, sobj, mlc, handle);
                if (rc)
                        return rc;
                if (rc)
                        return rc;
-
-               if (lmv) {
-                       rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
-                                                  handle);
-                       if (rc)
-                               return rc;
-               }
        }
 
        rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
        }
 
        rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
@@ -3987,12 +3699,17 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
        if (rc)
                return rc;
 
        if (rc)
                return rc;
 
+       /*
+        * tobj mode will be used in mdo_declare_layout_change(), but it's not
+        * createb yet, copy from sobj.
+        */
+       tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
+       tobj->mod_obj.mo_lu.lo_header->loh_attr |=
+               sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
+
        if (S_ISDIR(attr->la_mode) && mdd_dir_is_empty(env, sobj) != 0) {
                if (!lmv) {
        if (S_ISDIR(attr->la_mode) && mdd_dir_is_empty(env, sobj) != 0) {
                if (!lmv) {
-                       /*
-                        * if sobj is not striped, fake a 1-stripe LMV, which
-                        * will be used to generate a compound LMV for tobj.
-                        */
+                       /* if sobj is not striped, fake a 1-stripe LMV */
                        LASSERT(sizeof(info->mti_key) >
                                lmv_mds_md_size(1, LMV_MAGIC_V1));
                        lmv = (typeof(lmv))info->mti_key;
                        LASSERT(sizeof(info->mti_key) >
                                lmv_mds_md_size(1, LMV_MAGIC_V1));
                        lmv = (typeof(lmv))info->mti_key;
@@ -4002,31 +3719,17 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
                        lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
                        fid_le_to_cpu(&lmv->lmv_stripe_fids[0],
                                      mdd_object_fid(sobj));
                        lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_DEFAULT);
                        fid_le_to_cpu(&lmv->lmv_stripe_fids[0],
                                      mdd_object_fid(sobj));
-                       sbuf->lb_buf = lmv;
-                       sbuf->lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
-
-                       rc = mdo_declare_xattr_set(env, tobj, sbuf,
-                                                  XATTR_NAME_LMV".add", 0,
-                                                  handle);
-                       sbuf->lb_buf = NULL;
-                       sbuf->lb_len = 0;
+                       mlc->mlc_buf.lb_buf = lmv;
+                       mlc->mlc_buf.lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
                } else {
                } else {
-                       rc = mdo_declare_xattr_set(env, tobj, sbuf,
-                                                  XATTR_NAME_LMV".add", 0,
-                                                  handle);
+                       mlc->mlc_buf = *sbuf;
                }
                }
+               mlc->mlc_opc = MD_LAYOUT_ATTACH;
+               rc = mdo_declare_layout_change(env, tobj, mlc, handle);
                if (rc)
                        return rc;
        }
 
                if (rc)
                        return rc;
        }
 
-       /*
-        * tobj mode will be used in lod_declare_xattr_set(), but it's not
-        * createb yet, copy from sobj.
-        */
-       tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
-       tobj->mod_obj.mo_lu.lo_header->loh_attr |=
-               sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
-
        rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
                                mdo_declare_xattr_set);
        if (rc)
        rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
                                mdo_declare_xattr_set);
        if (rc)
@@ -4056,13 +3759,6 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
                                        handle, mdd_declare_update_link);
                if (rc)
                        return rc;
                                        handle, mdd_declare_update_link);
                if (rc)
                        return rc;
-
-               if (lmv) {
-                       rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
-                                                  handle);
-                       if (rc)
-                               return rc;
-               }
        }
 
        return rc;
        }
 
        return rc;
@@ -4113,32 +3809,16 @@ static int mdd_migrate_create(const struct lu_env *env,
         * 3. create/attach stripes for tobj, see lod_xattr_set_lmv().
         */
        if (S_ISDIR(attr->la_mode)) {
         * 3. create/attach stripes for tobj, see lod_xattr_set_lmv().
         */
        if (S_ISDIR(attr->la_mode)) {
-               struct lu_buf lmu_buf = { NULL };
-
-               if (sbuf->lb_buf) {
-                       struct mdd_thread_info *info = mdd_env_info(env);
-                       struct lmv_user_md *lmu = &info->mti_lmv.lmv_user_md;
+               struct mdd_thread_info *info = mdd_env_info(env);
+               struct md_layout_change *mlc = &info->mti_mlc;
 
 
-                       lmu->lum_stripe_count = 0;
-                       lmu_buf.lb_buf = lmu;
-                       lmu_buf.lb_len = sizeof(*lmu);
-               }
+               mlc->mlc_opc = MD_LAYOUT_DETACH;
 
 
-               rc = mdd_dir_layout_delete(env, sobj, sbuf, &lmu_buf, handle);
+               mdd_write_lock(env, sobj, DT_SRC_PARENT);
+               rc = mdo_layout_change(env, sobj, mlc, handle);
+               mdd_write_unlock(env, sobj);
                if (rc)
                        RETURN(rc);
                if (rc)
                        RETURN(rc);
-
-               /*
-                * delete LMV so that later when destroying sobj it won't delete
-                * stripes again.
-                */
-               if (sbuf->lb_buf) {
-                       mdd_write_lock(env, sobj, DT_SRC_CHILD);
-                       rc = mdo_xattr_del(env, sobj, XATTR_NAME_LMV, handle);
-                       mdd_write_unlock(env, sobj);
-                       if (rc)
-                               RETURN(rc);
-               }
        }
 
        /* don't set nlink from sobj */
        }
 
        /* don't set nlink from sobj */
@@ -4244,6 +3924,12 @@ static int mdd_declare_migrate_update(const struct lu_env *env,
                if (rc)
                        return rc;
 
                if (rc)
                        return rc;
 
+               if (S_ISDIR(attr->la_mode)) {
+                       rc = mdo_declare_ref_del(env, sobj, handle);
+                       if (rc)
+                               return rc;
+               }
+
                rc = mdo_declare_destroy(env, sobj, handle);
                if (rc)
                        return rc;
                rc = mdo_declare_destroy(env, sobj, handle);
                if (rc)
                        return rc;
@@ -4333,6 +4019,8 @@ static int mdd_migrate_update(const struct lu_env *env,
        if (do_create && do_destroy) {
                mdd_write_lock(env, sobj, DT_SRC_CHILD);
                mdo_ref_del(env, sobj, handle);
        if (do_create && do_destroy) {
                mdd_write_lock(env, sobj, DT_SRC_CHILD);
                mdo_ref_del(env, sobj, handle);
+               if (S_ISDIR(attr->la_mode))
+                       mdo_ref_del(env, sobj, handle);
                rc = mdo_destroy(env, sobj, handle);
                mdd_write_unlock(env, sobj);
        }
                rc = mdo_destroy(env, sobj, handle);
                mdd_write_unlock(env, sobj);
        }
@@ -4644,52 +4332,22 @@ out:
        return rc;
 }
 
        return rc;
 }
 
-static int __mdd_dir_declare_layout_shrink(const struct lu_env *env,
-                                          struct mdd_object *pobj,
-                                          struct mdd_object *obj,
-                                          struct mdd_object *stripe,
-                                          struct lu_attr *attr,
-                                          struct lu_buf *lmv_buf,
-                                          const struct lu_buf *lmu_buf,
-                                          struct lu_name *lname,
-                                          struct thandle *handle)
+static int mdd_declare_1sd_collapse(const struct lu_env *env,
+                                   struct mdd_object *pobj,
+                                   struct mdd_object *obj,
+                                   struct mdd_object *stripe,
+                                   struct lu_attr *attr,
+                                   struct md_layout_change *mlc,
+                                   struct lu_name *lname,
+                                   struct thandle *handle)
 {
 {
-       struct mdd_thread_info *info = mdd_env_info(env);
-       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
-       struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
-       struct lu_buf shrink_buf = { .lb_buf = lmu,
-                                    .lb_len = sizeof(*lmu) };
        int rc;
 
        int rc;
 
-       LASSERT(lmv);
-
-       memcpy(lmu, lmu_buf->lb_buf, sizeof(*lmu));
-
-       if (le32_to_cpu(lmu->lum_stripe_count) < 2)
-               lmu->lum_stripe_count = 0;
-
-       rc = mdd_dir_declare_layout_delete(env, obj, lmv_buf, &shrink_buf,
-                                          handle);
-       if (rc)
-               return rc;
-
-       if (lmu->lum_stripe_count == 0) {
-               lmu->lum_stripe_count = cpu_to_le32(1);
-
-               rc = mdo_declare_xattr_del(env, obj, XATTR_NAME_LMV, handle);
-               if (rc)
-                       return rc;
-       }
-
-       rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, &shrink_buf, handle,
-                                    mdd_dir_declare_destroy_stripe);
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
+       rc = mdo_declare_layout_change(env, obj, mlc, handle);
        if (rc)
                return rc;
 
        if (rc)
                return rc;
 
-       if (le32_to_cpu(lmu->lum_stripe_count) > 1)
-               return mdo_declare_xattr_set(env, obj, lmv_buf,
-                                            XATTR_NAME_LMV".set", 0, handle);
-
        rc = mdo_declare_index_insert(env, stripe, mdd_object_fid(pobj),
                                      S_IFDIR, dotdot, handle);
        if (rc)
        rc = mdo_declare_index_insert(env, stripe, mdd_object_fid(pobj),
                                      S_IFDIR, dotdot, handle);
        if (rc)
@@ -4730,84 +4388,30 @@ static int __mdd_dir_declare_layout_shrink(const struct lu_env *env,
                return rc;
 
        return rc;
                return rc;
 
        return rc;
-
 }
 
 }
 
-/*
- * after files under \a obj were migrated, shrink old stripes from \a obj,
- * furthermore, if it becomes a 1-stripe directory, convert it to a normal one.
- */
-static int __mdd_dir_layout_shrink(const struct lu_env *env,
-                                  struct mdd_object *pobj,
-                                  struct mdd_object *obj,
-                                  struct mdd_object *stripe,
-                                  struct lu_attr *attr,
-                                  struct lu_buf *lmv_buf,
-                                  const struct lu_buf *lmu_buf,
-                                  struct lu_name *lname,
-                                  struct thandle *handle)
+/* transform one-stripe directory to a plain directory */
+static int mdd_1sd_collapse(const struct lu_env *env,
+                           struct mdd_object *pobj,
+                           struct mdd_object *obj,
+                           struct mdd_object *stripe,
+                           struct lu_attr *attr,
+                           struct md_layout_change *mlc,
+                           struct lu_name *lname,
+                           struct thandle *handle)
 {
 {
-       struct mdd_thread_info *info = mdd_env_info(env);
-       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
-       struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
-       struct lu_buf shrink_buf = { .lb_buf = lmu,
-                                    .lb_len = sizeof(*lmu) };
-       int len = lmv_buf->lb_len;
-       __u32 version = le32_to_cpu(lmv->lmv_layout_version);
        int rc;
 
        ENTRY;
 
        int rc;
 
        ENTRY;
 
-       /* lmu needs to be altered, but lmu_buf is const */
-       memcpy(lmu, lmu_buf->lb_buf, sizeof(*lmu));
-
-       /*
-        * if dir will be shrunk to 1-stripe, delete all stripes, because it
-        * will be converted to normal dir.
-        */
-       if (le32_to_cpu(lmu->lum_stripe_count) == 1)
-               lmu->lum_stripe_count = 0;
-
-       /* delete stripes after lmu_stripe_count */
-       rc = mdd_dir_layout_delete(env, obj, lmv_buf, &shrink_buf, handle);
-       if (rc)
-               RETURN(rc);
-
-       if (lmu->lum_stripe_count == 0) {
-               lmu->lum_stripe_count = cpu_to_le32(1);
+       /* replace 1-stripe directory with its stripe */
+       mlc->mlc_opc = MD_LAYOUT_DETACH;
 
 
-               /* delete LMV to avoid deleting stripes again upon destroy */
-               mdd_write_lock(env, obj, DT_SRC_CHILD);
-               rc = mdo_xattr_del(env, obj, XATTR_NAME_LMV, handle);
-               mdd_write_unlock(env, obj);
-               if (rc)
-                       RETURN(rc);
-       }
-
-       /* destroy stripes after lmu_stripe_count */
        mdd_write_lock(env, obj, DT_SRC_PARENT);
        mdd_write_lock(env, obj, DT_SRC_PARENT);
-       rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, &shrink_buf, handle,
-                                    mdd_dir_destroy_stripe);
+       rc = mdo_layout_change(env, obj, mlc, handle);
        mdd_write_unlock(env, obj);
        mdd_write_unlock(env, obj);
-
-       if (le32_to_cpu(lmu->lum_stripe_count) > 1) {
-               /* update dir LMV, that's all if it's still striped. */
-               lmv->lmv_stripe_count = lmu->lum_stripe_count;
-               lmv->lmv_hash_type &= ~cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
-               lmv->lmv_migrate_offset = 0;
-               lmv->lmv_migrate_hash = 0;
-               lmv->lmv_layout_version = cpu_to_le32(++version);
-
-               lmv_buf->lb_len = sizeof(*lmv);
-               rc = mdo_xattr_set(env, obj, lmv_buf, XATTR_NAME_LMV".set", 0,
-                                  handle);
-               lmv_buf->lb_len = len;
+       if (rc)
                RETURN(rc);
                RETURN(rc);
-       }
-
-       /* replace directory with its remaining stripe */
-       LASSERT(pobj);
-       LASSERT(stripe);
 
        mdd_write_lock(env, pobj, DT_SRC_PARENT);
        mdd_write_lock(env, obj, DT_SRC_CHILD);
 
        mdd_write_lock(env, pobj, DT_SRC_PARENT);
        mdd_write_lock(env, obj, DT_SRC_CHILD);
@@ -4868,11 +4472,11 @@ out:
 }
 
 /*
 }
 
 /*
- * shrink directory stripes to lum_stripe_count specified by lum_mds_md.
+ * shrink directory stripes after migration/merge
  */
 int mdd_dir_layout_shrink(const struct lu_env *env,
                          struct md_object *md_obj,
  */
 int mdd_dir_layout_shrink(const struct lu_env *env,
                          struct md_object *md_obj,
-                         const struct lu_buf *lmu_buf)
+                         struct md_layout_change *mlc)
 {
        struct mdd_device *mdd = mdo2mdd(md_obj);
        struct mdd_thread_info *info = mdd_env_info(env);
 {
        struct mdd_device *mdd = mdo2mdd(md_obj);
        struct mdd_thread_info *info = mdd_env_info(env);
@@ -4902,25 +4506,22 @@ int mdd_dir_layout_shrink(const struct lu_env *env,
                RETURN(rc);
 
        lmv = lmv_buf.lb_buf;
                RETURN(rc);
 
        lmv = lmv_buf.lb_buf;
-       lmu = lmu_buf->lb_buf;
+       lmu = mlc->mlc_buf.lb_buf;
+
+       /* adjust the default value '0' to '1' */
+       if (lmu->lum_stripe_count == 0)
+               lmu->lum_stripe_count = cpu_to_le32(1);
 
        /* this was checked in MDT */
        LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
                le32_to_cpu(lmv->lmv_stripe_count));
 
 
        /* this was checked in MDT */
        LASSERT(le32_to_cpu(lmu->lum_stripe_count) <
                le32_to_cpu(lmv->lmv_stripe_count));
 
-       rc = mdd_dir_iterate_stripes(env, obj, &lmv_buf, lmu_buf, NULL,
-                                    mdd_shrink_stripe_is_empty);
-       if (rc < 0)
-               GOTO(out, rc);
-       else if (rc != 0)
-               GOTO(out, rc = -ENOTEMPTY);
-
        /*
         * if obj stripe count will be shrunk to 1, we need to convert it to a
         * normal dir, which will change its fid and update parent namespace,
         * get obj name and parent fid from linkea.
         */
        /*
         * if obj stripe count will be shrunk to 1, we need to convert it to a
         * normal dir, which will change its fid and update parent namespace,
         * get obj name and parent fid from linkea.
         */
-       if (le32_to_cpu(lmu->lum_stripe_count) < 2) {
+       if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
                struct linkea_data *ldata = &info->mti_link_data;
                char *filename = info->mti_name;
 
                struct linkea_data *ldata = &info->mti_link_data;
                char *filename = info->mti_name;
 
@@ -4964,11 +4565,18 @@ int mdd_dir_layout_shrink(const struct lu_env *env,
        if (IS_ERR(handle))
                GOTO(out, rc = PTR_ERR(handle));
 
        if (IS_ERR(handle))
                GOTO(out, rc = PTR_ERR(handle));
 
-       rc = __mdd_dir_declare_layout_shrink(env, pobj, obj, stripe, attr,
-                                            &lmv_buf, lmu_buf, &lname, handle);
+       mlc->mlc_opc = MD_LAYOUT_SHRINK;
+       rc = mdo_declare_layout_change(env, obj, mlc, handle);
        if (rc)
                GOTO(stop_trans, rc);
 
        if (rc)
                GOTO(stop_trans, rc);
 
+       if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
+               rc = mdd_declare_1sd_collapse(env, pobj, obj, stripe, attr, mlc,
+                                             &lname, handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
+       }
+
        rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
                                         handle);
        if (rc)
        rc = mdd_declare_changelog_store(env, mdd, CL_LAYOUT, NULL, NULL,
                                         handle);
        if (rc)
@@ -4978,11 +4586,20 @@ int mdd_dir_layout_shrink(const struct lu_env *env,
        if (rc)
                GOTO(stop_trans, rc);
 
        if (rc)
                GOTO(stop_trans, rc);
 
-       rc = __mdd_dir_layout_shrink(env, pobj, obj, stripe, attr, &lmv_buf,
-                                    lmu_buf, &lname, handle);
+       mdd_write_lock(env, obj, DT_SRC_PARENT);
+       mlc->mlc_opc = MD_LAYOUT_SHRINK;
+       rc = mdo_layout_change(env, obj, mlc, handle);
+       mdd_write_unlock(env, obj);
        if (rc)
                GOTO(stop_trans, rc);
 
        if (rc)
                GOTO(stop_trans, rc);
 
+       if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
+               rc = mdd_1sd_collapse(env, pobj, obj, stripe, attr, mlc, &lname,
+                                     handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
+       }
+
        rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
                                            XATTR_NAME_LMV, handle);
        GOTO(stop_trans, rc);
        rc = mdd_changelog_data_store_xattr(env, mdd, CL_LAYOUT, 0, obj,
                                            XATTR_NAME_LMV, handle);
        GOTO(stop_trans, rc);
index cbdb370..27c2247 100644 (file)
@@ -209,6 +209,7 @@ struct mdd_thread_info {
        struct lfsck_req_local    mti_lrl;
        struct lu_seq_range       mti_range;
        union lmv_mds_md          mti_lmv;
        struct lfsck_req_local    mti_lrl;
        struct lu_seq_range       mti_range;
        union lmv_mds_md          mti_lmv;
+       struct md_layout_change   mti_mlc;
 };
 
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
 };
 
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
@@ -277,7 +278,7 @@ int mdd_links_rename(const struct lu_env *env,
                     int first, int check);
 int mdd_dir_layout_shrink(const struct lu_env *env,
                          struct md_object *md_obj,
                     int first, int check);
 int mdd_dir_layout_shrink(const struct lu_env *env,
                          struct md_object *md_obj,
-                         const struct lu_buf *lmu_buf);
+                         struct md_layout_change *mlc);
 
 int mdd_changelog_write_rec(const struct lu_env *env,
                            struct llog_handle *loghandle,
 
 int mdd_changelog_write_rec(const struct lu_env *env,
                            struct llog_handle *loghandle,
index f01550a..2f6b5ec 100644 (file)
@@ -1923,11 +1923,6 @@ static int mdd_xattr_set(const struct lu_env *env, struct md_object *obj,
                RETURN(rc);
        }
 
                RETURN(rc);
        }
 
-       if (strcmp(name, XATTR_NAME_LMV) == 0) {
-               rc = mdd_dir_layout_shrink(env, obj, buf);
-               RETURN(rc);
-       }
-
        if (strcmp(name, XATTR_NAME_ACL_ACCESS) == 0 ||
            strcmp(name, XATTR_NAME_ACL_DEFAULT) == 0) {
                struct posix_acl *acl;
        if (strcmp(name, XATTR_NAME_ACL_ACCESS) == 0 ||
            strcmp(name, XATTR_NAME_ACL_DEFAULT) == 0) {
                struct posix_acl *acl;
@@ -2942,8 +2937,21 @@ mdd_layout_change(const struct lu_env *env, struct md_object *o,
        struct thandle          *handle;
        int flr_state;
        int rc;
        struct thandle          *handle;
        int flr_state;
        int rc;
+
        ENTRY;
 
        ENTRY;
 
+       if (S_ISDIR(mdd_object_type(obj))) {
+               switch (mlc->mlc_opc) {
+               case MD_LAYOUT_SHRINK:
+                       rc = mdd_dir_layout_shrink(env, o, mlc);
+                       break;
+               default:
+                       LBUG();
+               }
+
+               RETURN(rc);
+       }
+
        /* Verify acceptable operations */
        switch (mlc->mlc_opc) {
        case MD_LAYOUT_WRITE:
        /* Verify acceptable operations */
        switch (mlc->mlc_opc) {
        case MD_LAYOUT_WRITE:
index 723cd55..47f89bc 100644 (file)
@@ -514,7 +514,7 @@ struct mdt_thread_info {
        struct tg_reply_data      *mti_reply_data;
 
        /* FLR: layout change API */
        struct tg_reply_data      *mti_reply_data;
 
        /* FLR: layout change API */
-       struct md_layout_change    mti_layout;
+       struct md_layout_change    mti_mlc;
 };
 
 extern struct lu_context_key mdt_thread_key;
 };
 
 extern struct lu_context_key mdt_thread_key;
index 33f3cdf..d73f812 100644 (file)
@@ -316,8 +316,8 @@ out:
        return rc;
 }
 
        return rc;
 }
 
-/* shrink dir layout after migration */
-static int mdt_dir_layout_shrink(struct mdt_thread_info *info)
+/* update dir layout after migration */
+static int mdt_dir_layout_update(struct mdt_thread_info *info)
 {
        const struct lu_env *env = info->mti_env;
        struct mdt_device *mdt = info->mti_mdt;
 {
        const struct lu_env *env = info->mti_env;
        struct mdt_device *mdt = info->mti_mdt;
@@ -325,7 +325,7 @@ static int mdt_dir_layout_shrink(struct mdt_thread_info *info)
        struct mdt_reint_record *rr = &info->mti_rr;
        struct lmv_user_md *lmu = rr->rr_eadata;
        __u32 lum_stripe_count = lmu->lum_stripe_count;
        struct mdt_reint_record *rr = &info->mti_rr;
        struct lmv_user_md *lmu = rr->rr_eadata;
        __u32 lum_stripe_count = lmu->lum_stripe_count;
-       struct lu_buf *buf = &info->mti_buf;
+       struct md_layout_change *mlc = &info->mti_mlc;
        struct lmv_mds_md_v1 *lmv;
        struct md_attr *ma = &info->mti_attr;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
        struct lmv_mds_md_v1 *lmv;
        struct md_attr *ma = &info->mti_attr;
        struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
@@ -375,7 +375,7 @@ static int mdt_dir_layout_shrink(struct mdt_thread_info *info)
 
        /*
         * lock parent if dir will be shrunk to 1 stripe, because dir will be
 
        /*
         * lock parent if dir will be shrunk to 1 stripe, because dir will be
-        * converted to normal directory, as will change dir fid and update
+        * converted to normal directory, as will change dir FID and update
         * namespace of parent.
         */
        lhp = &info->mti_lh[MDT_LH_PARENT];
         * namespace of parent.
         */
        lhp = &info->mti_lh[MDT_LH_PARENT];
@@ -440,9 +440,10 @@ static int mdt_dir_layout_shrink(struct mdt_thread_info *info)
                GOTO(unlock_obj, rc = -EINVAL);
        }
 
                GOTO(unlock_obj, rc = -EINVAL);
        }
 
-       buf->lb_buf = rr->rr_eadata;
-       buf->lb_len = rr->rr_eadatalen;
-       rc = mo_xattr_set(env, mdt_object_child(obj), buf, XATTR_NAME_LMV, 0);
+       mlc->mlc_opc = MD_LAYOUT_SHRINK;
+       mlc->mlc_buf.lb_buf = rr->rr_eadata;
+       mlc->mlc_buf.lb_len = rr->rr_eadatalen;
+       rc = mo_layout_change(env, mdt_object_child(obj), mlc);
        GOTO(unlock_obj, rc);
 
 unlock_obj:
        GOTO(unlock_obj, rc);
 
 unlock_obj:
@@ -506,7 +507,7 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
 
                        if (le32_to_cpu(*magic) == LMV_USER_MAGIC ||
                            le32_to_cpu(*magic) == LMV_USER_MAGIC_SPECIFIC) {
 
                        if (le32_to_cpu(*magic) == LMV_USER_MAGIC ||
                            le32_to_cpu(*magic) == LMV_USER_MAGIC_SPECIFIC) {
-                               rc = mdt_dir_layout_shrink(info);
+                               rc = mdt_dir_layout_update(info);
                                GOTO(out, rc);
                        }
                }
                                GOTO(out, rc);
                        }
                }