Whamcloud - gitweb
LU-4684 migrate: migrate striped directory 27/31427/26
authorLai Siyao <lai.siyao@intel.com>
Mon, 22 Jan 2018 18:34:54 +0000 (02:34 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 17 Sep 2018 04:05:23 +0000 (04:05 +0000)
Migrate striped directory in below steps:
1. create target object if needed: if source is directory, a
   target object is always created, otherwise if source is
   already located on the target MDT, or source still has
   link on source MDT, then skip creating.
a) if source is directory, detach source stripes and
   attach them to target.
b) migrate source xattrs to target.
c) if source is regular file, update PFID to target
   fid.
d) update fid to target for all links of source
2. update namespace
a) migrate dirent from source parent to target parent.
b) update linkea parent fid to target parent.
c) destroy source object.

This implementation improves following fields:
1. all involved objects are locked to avoid race.
2. directory migration doesn't migrate its dir entries, instead
   it's done in each sub file migration, this avoids timeout in
   migrating dir entries for large directory, and also avoids
   touching dir entries without lock.
3. file/dir is migrated in one transaction, so migrate recovery
   is the same as others.
4. migrating directory can be accessed (modifiable) like normal
   directory.
5. if migration of sub files under a directory fails, user can
   redo migrate to finish migration of this directory.

Disable migrate in racer.sh.

Signed-off-by: Lai Siyao <lai.siyao@whamcloud.com>
Change-Id: I7906e50a0bf55375eafdf2cf5500979dd2d94d48
Reviewed-on: https://review.whamcloud.com/31427
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
35 files changed:
lustre/include/lu_object.h
lustre/include/lustre_lmv.h
lustre/include/md_object.h
lustre/include/obd_support.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/lfsck/lfsck_internal.h
lustre/lfsck/lfsck_striped_dir.c
lustre/llite/file.c
lustre/llite/llite_lib.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/lod/lod_internal.h
lustre/lod/lod_object.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_reint.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_lproc.c
lustre/mdt/mdt_reint.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_xattr.c
lustre/ptlrpc/wiretest.c
lustre/tests/racer.sh
lustre/tests/recovery-small.sh
lustre/tests/sanity-lfsck.sh
lustre/tests/sanity.sh
lustre/utils/liblustreapi.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 5b6a8bd..54e0410 100644 (file)
@@ -1265,6 +1265,26 @@ struct lu_name {
         int            ln_namelen;
 };
 
+static inline bool name_is_dot_or_dotdot(const char *name, int namelen)
+{
+       return name[0] == '.' &&
+              (namelen == 1 || (namelen == 2 && name[1] == '.'));
+}
+
+static inline bool lu_name_is_dot_or_dotdot(const struct lu_name *lname)
+{
+       return name_is_dot_or_dotdot(lname->ln_name, lname->ln_namelen);
+}
+
+static inline bool lu_name_is_valid_len(const char *name, size_t name_len)
+{
+       return name != NULL &&
+              name_len > 0 &&
+              name_len < INT_MAX &&
+              strlen(name) == name_len &&
+              memchr(name, '/', name_len) == NULL;
+}
+
 /**
  * Validate names (path components)
  *
@@ -1276,12 +1296,7 @@ struct lu_name {
  */
 static inline bool lu_name_is_valid_2(const char *name, size_t name_len)
 {
-       return name != NULL &&
-              name_len > 0 &&
-              name_len < INT_MAX &&
-              name[name_len] == '\0' &&
-              strlen(name) == name_len &&
-              memchr(name, '/', name_len) == NULL;
+       return lu_name_is_valid_len(name, name_len) && name[name_len] == '\0';
 }
 
 static inline bool lu_name_is_valid(const struct lu_name *ln)
index aaf3a26..e797e77 100644 (file)
@@ -46,6 +46,8 @@ struct lmv_stripe_md {
        __u32   lsm_md_master_mdt_index;
        __u32   lsm_md_hash_type;
        __u32   lsm_md_layout_version;
+       __u32   lsm_md_migrate_offset;
+       __u32   lsm_md_migrate_hash;
        __u32   lsm_md_default_count;
        __u32   lsm_md_default_index;
        char    lsm_md_pool_name[LOV_MAXPOOLNAME + 1];
@@ -64,6 +66,10 @@ lsm_md_eq(const struct lmv_stripe_md *lsm1, const struct lmv_stripe_md *lsm2)
            lsm1->lsm_md_hash_type != lsm2->lsm_md_hash_type ||
            lsm1->lsm_md_layout_version !=
                                lsm2->lsm_md_layout_version ||
+           lsm1->lsm_md_migrate_offset !=
+                               lsm2->lsm_md_migrate_offset ||
+           lsm1->lsm_md_migrate_hash !=
+                               lsm2->lsm_md_migrate_hash ||
            strcmp(lsm1->lsm_md_pool_name,
                      lsm2->lsm_md_pool_name) != 0)
                return false;
@@ -141,18 +147,14 @@ static inline int lmv_name_to_stripe_index(__u32 lmv_hash_type,
                                           unsigned int stripe_count,
                                           const char *name, int namelen)
 {
-       int     idx;
-       __u32   hash_type = lmv_hash_type & LMV_HASH_TYPE_MASK;
+       int idx;
 
        LASSERT(namelen > 0);
-       if (stripe_count <= 1)
-               return 0;
 
-       /* for migrating object, always start from 0 stripe */
-       if (lmv_hash_type & LMV_HASH_FLAG_MIGRATION)
+       if (stripe_count <= 1)
                return 0;
 
-       switch (hash_type) {
+       switch (lmv_hash_type & LMV_HASH_TYPE_MASK) {
        case LMV_HASH_TYPE_ALL_CHARS:
                idx = lmv_hash_all_chars(stripe_count, name, namelen);
                break;
@@ -164,8 +166,8 @@ static inline int lmv_name_to_stripe_index(__u32 lmv_hash_type,
                break;
        }
 
-       CDEBUG(D_INFO, "name %.*s hash_type %d idx %d\n", namelen, name,
-              hash_type, idx);
+       CDEBUG(D_INFO, "name %.*s hash_type %#x idx %d/%u\n", namelen, name,
+              lmv_hash_type, idx, stripe_count);
 
        return idx;
 }
index cdfb03b..9e54f79 100644 (file)
@@ -138,15 +138,15 @@ struct md_attr {
 
 /** Additional parameters for create */
 struct md_op_spec {
-        union {
-                /** symlink target */
-                const char               *sp_symname;
-                /** eadata for regular files */
-                struct md_spec_reg {
-                        const void *eadata;
-                        int  eadatalen;
-                } sp_ea;
-        } u;
+       union {
+               /** symlink target */
+               const char *sp_symname;
+               /** eadata for regular files */
+               struct md_spec_reg {
+                       void *eadata;
+                       int  eadatalen;
+               } sp_ea;
+       } u;
 
        /** Create flag from client: such as MDS_OPEN_CREAT, and others. */
        __u64      sp_cr_flags;
@@ -163,10 +163,10 @@ struct md_op_spec {
                     sp_permitted:1, /* do not check permission */
                     sp_migrate_close:1; /* close the file during migrate */
        /** Current lock mode for parent dir where create is performing. */
-        mdl_mode_t sp_cr_mode;
+       mdl_mode_t sp_cr_mode;
 
-        /** to create directory */
-        const struct dt_index_features *sp_feat;
+       /** to create directory */
+       const struct dt_index_features *sp_feat;
 };
 
 enum md_layout_opc {
@@ -317,7 +317,8 @@ struct md_dir_operations {
 
        int (*mdo_migrate)(const struct lu_env *env, struct md_object *pobj,
                           struct md_object *sobj, const struct lu_name *lname,
-                          struct md_object *tobj, struct md_attr *ma);
+                          struct md_object *tobj, struct md_op_spec *spec,
+                          struct md_attr *ma);
 };
 
 struct md_device_operations {
@@ -614,10 +615,12 @@ static inline int mdo_migrate(const struct lu_env *env,
                             struct md_object *sobj,
                             const struct lu_name *lname,
                             struct md_object *tobj,
+                            struct md_op_spec *spec,
                             struct md_attr *ma)
 {
        LASSERT(pobj->mo_dir_ops->mdo_migrate);
-       return pobj->mo_dir_ops->mdo_migrate(env, pobj, sobj, lname, tobj, ma);
+       return pobj->mo_dir_ops->mdo_migrate(env, pobj, sobj, lname, tobj, spec,
+                                            ma);
 }
 
 static inline int mdo_is_subdir(const struct lu_env *env,
index d5f4223..265a3b8 100644 (file)
@@ -617,10 +617,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_INVALIDATE_UPDATE     0x1705
 
 /* MIGRATE */
-#define OBD_FAIL_MIGRATE_NET_REP               0x1800
 #define OBD_FAIL_MIGRATE_ENTRIES               0x1801
-#define OBD_FAIL_MIGRATE_LINKEA                        0x1802
-#define OBD_FAIL_MIGRATE_DELAY                 0x1803
 
 /* LMV */
 #define OBD_FAIL_UNKNOWN_LMV_STRIPE            0x1901
index 8d7b58c..a600a39 100644 (file)
@@ -880,7 +880,8 @@ struct ptlrpc_body_v2 {
 
 #define MDT_CONNECT_SUPPORTED2 (OBD_CONNECT2_FILE_SECCTX | OBD_CONNECT2_FLR | \
                                 OBD_CONNECT2_SUM_STATFS | \
-                               OBD_CONNECT2_LOCK_CONVERT)
+                               OBD_CONNECT2_LOCK_CONVERT | \
+                               OBD_CONNECT2_DIR_MIGRATE)
 
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
@@ -2161,9 +2162,16 @@ struct lmv_mds_md_v1 {
                                         * used for now. Higher 16 bits will
                                         * be used to mark the object status,
                                         * for example migrating or dead. */
-       __u32 lmv_layout_version;       /* Used for directory restriping */
-       __u32 lmv_padding1;
-       __u64 lmv_padding2;
+       __u32 lmv_layout_version;       /* increased each time layout changed,
+                                        * by directory migration, restripe
+                                        * and LFSCK. */
+       __u32 lmv_migrate_offset;       /* once this is set, it means this
+                                        * directory is been migrated, stripes
+                                        * before this offset belong to target,
+                                        * from this to source. */
+       __u32 lmv_migrate_hash;         /* hash type of source stripes of
+                                        * migrating directory */
+       __u32 lmv_padding2;
        __u64 lmv_padding3;
        char lmv_pool_name[LOV_MAXPOOLNAME + 1];        /* pool name */
        struct lu_fid lmv_stripe_fids[0];       /* FIDs for each stripe */
index a92181f..c302b43 100644 (file)
@@ -1095,12 +1095,6 @@ extern const char *lfsck_flags_names[];
 extern const char *lfsck_param_names[];
 extern struct lu_context_key lfsck_thread_key;
 
-static inline bool name_is_dot_or_dotdot(const char *name, int namelen)
-{
-       return name[0] == '.' &&
-              (namelen == 1 || (namelen == 2 && name[1] == '.'));
-}
-
 static inline struct dt_device *lfsck_obj2dev(struct dt_object *obj)
 {
        return container_of0(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
@@ -1504,6 +1498,8 @@ static inline void lfsck_lmv_header_le_to_cpu(struct lmv_mds_md_v1 *dst,
        dst->lmv_master_mdt_index = le32_to_cpu(src->lmv_master_mdt_index);
        dst->lmv_hash_type = le32_to_cpu(src->lmv_hash_type);
        dst->lmv_layout_version = le32_to_cpu(src->lmv_layout_version);
+       dst->lmv_migrate_offset = le32_to_cpu(src->lmv_migrate_offset);
+       dst->lmv_migrate_hash = le32_to_cpu(src->lmv_migrate_hash);
 }
 
 static inline void lfsck_lmv_header_cpu_to_le(struct lmv_mds_md_v1 *dst,
@@ -1514,6 +1510,8 @@ static inline void lfsck_lmv_header_cpu_to_le(struct lmv_mds_md_v1 *dst,
        dst->lmv_master_mdt_index = cpu_to_le32(src->lmv_master_mdt_index);
        dst->lmv_hash_type = cpu_to_le32(src->lmv_hash_type);
        dst->lmv_layout_version = cpu_to_le32(src->lmv_layout_version);
+       dst->lmv_migrate_offset = cpu_to_le32(src->lmv_migrate_offset);
+       dst->lmv_migrate_hash = cpu_to_le32(src->lmv_migrate_hash);
 }
 
 static inline struct lfsck_assistant_object *
index ba47994..90ec731 100644 (file)
@@ -1419,6 +1419,7 @@ static int lfsck_namespace_set_lmv_master(const struct lu_env *env,
 
        lmv3->lmv_magic = LMV_MAGIC;
        lmv3->lmv_master_mdt_index = pidx;
+       lmv3->lmv_layout_version++;
 
        if (flags & LEF_SET_LMV_ALL) {
                rc = lfsck_allow_regenerate_master_lmv(env, com, obj,
index 46bc2ac..16fb348 100644 (file)
@@ -4173,6 +4173,17 @@ int ll_migrate(struct inode *parent, struct file *file, struct lmv_user_md *lum,
        if (!child_inode)
                RETURN(-ENOENT);
 
+       if (!(exp_connect_flags2(ll_i2sbi(parent)->ll_md_exp) &
+             OBD_CONNECT2_DIR_MIGRATE)) {
+               if (le32_to_cpu(lum->lum_stripe_count) > 1 ||
+                   ll_i2info(child_inode)->lli_lsm_md) {
+                       CERROR("%s: MDT doesn't support stripe directory "
+                              "migration!\n",
+                              ll_get_fsname(parent->i_sb, NULL, 0));
+                       GOTO(out_iput, rc = -EOPNOTSUPP);
+               }
+       }
+
        /*
         * lfs migrate command needs to be blocked on the client
         * by checking the migrate FID against the FID of the
index df8eca9..2988cc2 100644 (file)
@@ -1320,14 +1320,8 @@ static int ll_init_lsm_md(struct inode *inode, struct lustre_md *md)
                 * where the initialization of slave inode is slightly
                 * different, so it reset lsm_md to NULL to avoid
                 * initializing lsm for slave inode. */
-               /* For migrating inode, master stripe and master object will
-                * be same, so we only need assign this inode */
-               if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION && i == 0)
-                       lsm->lsm_md_oinfo[i].lmo_root = inode;
-               else
-                       lsm->lsm_md_oinfo[i].lmo_root =
+               lsm->lsm_md_oinfo[i].lmo_root =
                                ll_iget_anon_dir(inode->i_sb, fid, md);
-
                if (IS_ERR(lsm->lsm_md_oinfo[i].lmo_root)) {
                        int rc = PTR_ERR(lsm->lsm_md_oinfo[i].lmo_root);
 
@@ -1339,20 +1333,6 @@ static int ll_init_lsm_md(struct inode *inode, struct lustre_md *md)
        return 0;
 }
 
-static inline int lli_lsm_md_eq(const struct lmv_stripe_md *lsm_md1,
-                               const struct lmv_stripe_md *lsm_md2)
-{
-       return lsm_md1->lsm_md_magic == lsm_md2->lsm_md_magic &&
-              lsm_md1->lsm_md_stripe_count == lsm_md2->lsm_md_stripe_count &&
-              lsm_md1->lsm_md_master_mdt_index ==
-                                       lsm_md2->lsm_md_master_mdt_index &&
-              lsm_md1->lsm_md_hash_type == lsm_md2->lsm_md_hash_type &&
-              lsm_md1->lsm_md_layout_version ==
-                                       lsm_md2->lsm_md_layout_version &&
-              strcmp(lsm_md1->lsm_md_pool_name,
-                     lsm_md2->lsm_md_pool_name) == 0;
-}
-
 static int ll_update_lsm_md(struct inode *inode, struct lustre_md *md)
 {
        struct ll_inode_info *lli = ll_i2info(inode);
@@ -1364,28 +1344,61 @@ static int ll_update_lsm_md(struct inode *inode, struct lustre_md *md)
        CDEBUG(D_INODE, "update lsm %p of "DFID"\n", lli->lli_lsm_md,
               PFID(ll_inode2fid(inode)));
 
-       /* no striped information from request. */
-       if (lsm == NULL) {
-               if (lli->lli_lsm_md == NULL) {
-                       RETURN(0);
-               } else if (lli->lli_lsm_md->lsm_md_hash_type &
-                                               LMV_HASH_FLAG_MIGRATION) {
-                       /* migration is done, the temporay MIGRATE layout has
-                        * been removed */
-                       CDEBUG(D_INODE, DFID" finish migration.\n",
-                              PFID(ll_inode2fid(inode)));
-                       lmv_free_memmd(lli->lli_lsm_md);
-                       lli->lli_lsm_md = NULL;
-                       RETURN(0);
-               } else {
-                       /* The lustre_md from req does not include stripeEA,
-                        * see ll_md_setattr */
-                       RETURN(0);
-               }
+       /*
+        * no striped information from request, lustre_md from req does not
+        * include stripeEA, see ll_md_setattr()
+        */
+       if (!lsm)
+               RETURN(0);
+
+       /* Compare the old and new stripe information */
+       if (lli->lli_lsm_md && !lsm_md_eq(lli->lli_lsm_md, lsm)) {
+               struct lmv_stripe_md *old_lsm = lli->lli_lsm_md;
+               int idx;
+               bool layout_changed = lsm->lsm_md_layout_version >
+                                     old_lsm->lsm_md_layout_version;
+
+               int mask = layout_changed ? D_INODE : D_ERROR;
+
+               CDEBUG(mask,
+                       "%s: inode@%p "DFID" lmv layout %s magic %#x/%#x "
+                       "stripe count %d/%d master_mdt %d/%d "
+                       "hash_type %#x/%#x version %d/%d migrate offset %d/%d "
+                       "migrate hash %#x/%#x pool %s/%s\n",
+                      ll_get_fsname(inode->i_sb, NULL, 0), inode,
+                      PFID(&lli->lli_fid),
+                      layout_changed ? "changed" : "mismatch",
+                      lsm->lsm_md_magic, old_lsm->lsm_md_magic,
+                      lsm->lsm_md_stripe_count,
+                      old_lsm->lsm_md_stripe_count,
+                      lsm->lsm_md_master_mdt_index,
+                      old_lsm->lsm_md_master_mdt_index,
+                      lsm->lsm_md_hash_type, old_lsm->lsm_md_hash_type,
+                      lsm->lsm_md_layout_version,
+                      old_lsm->lsm_md_layout_version,
+                      lsm->lsm_md_migrate_offset,
+                      old_lsm->lsm_md_migrate_offset,
+                      lsm->lsm_md_migrate_hash,
+                      old_lsm->lsm_md_migrate_hash,
+                      lsm->lsm_md_pool_name,
+                      old_lsm->lsm_md_pool_name);
+
+               for (idx = 0; idx < old_lsm->lsm_md_stripe_count; idx++)
+                       CDEBUG(mask, "old stripe[%d] "DFID"\n",
+                              idx, PFID(&old_lsm->lsm_md_oinfo[idx].lmo_fid));
+
+               for (idx = 0; idx < lsm->lsm_md_stripe_count; idx++)
+                       CDEBUG(mask, "new stripe[%d] "DFID"\n",
+                              idx, PFID(&lsm->lsm_md_oinfo[idx].lmo_fid));
+
+               if (!layout_changed)
+                       RETURN(-EINVAL);
+
+               ll_dir_clear_lsm_md(inode);
        }
 
        /* set the directory layout */
-       if (lli->lli_lsm_md == NULL) {
+       if (!lli->lli_lsm_md) {
                struct cl_attr  *attr;
 
                rc = ll_init_lsm_md(inode, md);
index c2dcfed..899007a 100644 (file)
@@ -122,14 +122,21 @@ static inline int lmv_stripe_md_size(int stripe_count)
        return sizeof(*lsm) + stripe_count * sizeof(lsm->lsm_md_oinfo[0]);
 }
 
+/* for file under migrating directory, return the target stripe info */
 static inline const struct lmv_oinfo *
 lsm_name_to_stripe_info(const struct lmv_stripe_md *lsm, const char *name,
                        int namelen)
 {
+       __u32 hash_type = lsm->lsm_md_hash_type;
+       __u32 stripe_count = lsm->lsm_md_stripe_count;
        int stripe_index;
 
-       stripe_index = lmv_name_to_stripe_index(lsm->lsm_md_hash_type,
-                                               lsm->lsm_md_stripe_count,
+       if (hash_type & LMV_HASH_FLAG_MIGRATION) {
+               hash_type &= ~LMV_HASH_FLAG_MIGRATION;
+               stripe_count = lsm->lsm_md_migrate_offset;
+       }
+
+       stripe_index = lmv_name_to_stripe_index(hash_type, stripe_count,
                                                name, namelen);
        if (stripe_index < 0)
                return ERR_PTR(stripe_index);
index 5ab1cb4..29e1e76 100644 (file)
@@ -1864,156 +1864,286 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
        RETURN(rc);
 }
 
-static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
-                     const char *old, size_t oldlen,
-                     const char *new, size_t newlen,
-                     struct ptlrpc_request **request)
+static int lmv_migrate(struct obd_export *exp, struct md_op_data *op_data,
+                       const char *name, size_t namelen,
+                       struct ptlrpc_request **request)
 {
-       struct obd_device       *obd = exp->exp_obd;
-       struct lmv_obd          *lmv = &obd->u.lmv;
-       struct lmv_tgt_desc     *src_tgt;
-       struct lmv_tgt_desc     *tgt_tgt;
-       struct obd_export       *target_exp;
-       struct mdt_body         *body;
-       int                     rc;
+       struct obd_device *obd = exp->exp_obd;
+       struct lmv_obd *lmv = &obd->u.lmv;
+       struct lmv_stripe_md *lsm = op_data->op_mea1;
+       struct lmv_tgt_desc *parent_tgt;
+       struct lmv_tgt_desc *sp_tgt;
+       struct lmv_tgt_desc *tp_tgt = NULL;
+       struct lmv_tgt_desc *child_tgt;
+       struct lmv_tgt_desc *tgt;
+       struct lu_fid target_fid;
+       int rc;
+
        ENTRY;
 
-       LASSERT(oldlen != 0);
+       LASSERT(op_data->op_cli_flags & CLI_MIGRATE);
+       LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
+                PFID(&op_data->op_fid3));
 
-       CDEBUG(D_INODE, "RENAME %.*s in "DFID":%d to %.*s in "DFID":%d\n",
-              (int)oldlen, old, PFID(&op_data->op_fid1),
-              op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0,
-              (int)newlen, new, PFID(&op_data->op_fid2),
-              op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0);
+       CDEBUG(D_INODE, "MIGRATE "DFID"/%.*s\n",
+              PFID(&op_data->op_fid1), (int)namelen, name);
 
        op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
        op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
        op_data->op_cap = cfs_curproc_cap_pack();
-       if (op_data->op_cli_flags & CLI_MIGRATE) {
-               LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
-                        PFID(&op_data->op_fid3));
-
-               if (op_data->op_mea1 != NULL) {
-                       struct lmv_stripe_md    *lsm = op_data->op_mea1;
-                       struct lmv_tgt_desc     *tmp;
-
-                       /* Fix the parent fid for striped dir */
-                       tmp = lmv_locate_target_for_name(lmv, lsm, old,
-                                                        oldlen,
-                                                        &op_data->op_fid1,
-                                                        NULL);
-                       if (IS_ERR(tmp))
-                               RETURN(PTR_ERR(tmp));
-               }
-
-               rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
-               if (rc != 0)
-                       RETURN(rc);
 
-               src_tgt = lmv_find_target(lmv, &op_data->op_fid3);
-               if (IS_ERR(src_tgt))
-                       RETURN(PTR_ERR(src_tgt));
+       parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+       if (IS_ERR(parent_tgt))
+               RETURN(PTR_ERR(parent_tgt));
 
-               target_exp = src_tgt->ltd_exp;
-       } else {
-               if (op_data->op_mea1 != NULL) {
-                       struct lmv_stripe_md    *lsm = op_data->op_mea1;
+       if (lsm) {
+               __u32 hash_type = lsm->lsm_md_hash_type;
+               __u32 stripe_count = lsm->lsm_md_stripe_count;
 
-                       src_tgt = lmv_locate_target_for_name(lmv, lsm, old,
-                                                            oldlen,
-                                                            &op_data->op_fid1,
-                                                            &op_data->op_mds);
-               } else {
-                       src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+               /*
+                * old stripes are appended after new stripes for migrating
+                * directory.
+                */
+               if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION) {
+                       hash_type = lsm->lsm_md_migrate_hash;
+                       stripe_count -= lsm->lsm_md_migrate_offset;
                }
-               if (IS_ERR(src_tgt))
-                       RETURN(PTR_ERR(src_tgt));
 
+               rc = lmv_name_to_stripe_index(hash_type, stripe_count, name,
+                                             namelen);
+               if (rc < 0)
+                       RETURN(rc);
 
-               if (op_data->op_mea2 != NULL) {
-                       struct lmv_stripe_md    *lsm = op_data->op_mea2;
+               if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION)
+                       rc += lsm->lsm_md_migrate_offset;
 
-                       tgt_tgt = lmv_locate_target_for_name(lmv, lsm, new,
-                                                            newlen,
-                                                            &op_data->op_fid2,
-                                                            &op_data->op_mds);
-               } else {
-                       tgt_tgt = lmv_find_target(lmv, &op_data->op_fid2);
+               /* save it in fid4 temporarily for early cancel */
+               op_data->op_fid4 = lsm->lsm_md_oinfo[rc].lmo_fid;
+               sp_tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[rc].lmo_mds,
+                                       NULL);
+               if (IS_ERR(sp_tgt))
+                       RETURN(PTR_ERR(sp_tgt));
 
+               /*
+                * if parent is being migrated too, fill op_fid2 with target
+                * stripe fid, otherwise the target stripe is not created yet.
+                */
+               if (lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION) {
+                       hash_type = lsm->lsm_md_hash_type &
+                                   ~LMV_HASH_FLAG_MIGRATION;
+                       stripe_count = lsm->lsm_md_migrate_offset;
+
+                       rc = lmv_name_to_stripe_index(hash_type, stripe_count,
+                                                     name, namelen);
+                       if (rc < 0)
+                               RETURN(rc);
+
+                       op_data->op_fid2 = lsm->lsm_md_oinfo[rc].lmo_fid;
+                       tp_tgt = lmv_get_target(lmv,
+                                               lsm->lsm_md_oinfo[rc].lmo_mds,
+                                               NULL);
+                       if (IS_ERR(tp_tgt))
+                               RETURN(PTR_ERR(tp_tgt));
                }
-               if (IS_ERR(tgt_tgt))
-                       RETURN(PTR_ERR(tgt_tgt));
-
-               target_exp = tgt_tgt->ltd_exp;
+       } else {
+               sp_tgt = parent_tgt;
        }
 
-       /*
-        * LOOKUP lock on src child (fid3) should also be cancelled for
-        * src_tgt in mdc_rename.
-        */
-       op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+       child_tgt = lmv_find_target(lmv, &op_data->op_fid3);
+       if (IS_ERR(child_tgt))
+               RETURN(PTR_ERR(child_tgt));
 
-       /*
-        * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
-        * own target.
-        */
-       rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
-                             LCK_EX, MDS_INODELOCK_UPDATE,
-                             MF_MDC_CANCEL_FID2);
-
-       if (rc != 0)
+       rc = lmv_fid_alloc(NULL, exp, &target_fid, op_data);
+       if (rc)
                RETURN(rc);
+
        /*
-        * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt.
+        * for directory, send migrate request to the MDT where the object will
+        * be migrated to, because we can't create a striped directory remotely.
+        *
+        * otherwise, send to the MDT where source is located because regular
+        * file may open lease.
+        *
+        * NB. if MDT doesn't support DIR_MIGRATE, send to source MDT too for
+        * backward compatibility.
         */
-       if (fid_is_sane(&op_data->op_fid3)) {
-               struct lmv_tgt_desc *tgt;
-
-               tgt = lmv_find_target(lmv, &op_data->op_fid1);
+       if (S_ISDIR(op_data->op_mode) &&
+           (exp_connect_flags2(exp) & OBD_CONNECT2_DIR_MIGRATE)) {
+               tgt = lmv_find_target(lmv, &target_fid);
                if (IS_ERR(tgt))
                        RETURN(PTR_ERR(tgt));
+       } else {
+               tgt = child_tgt;
+       }
 
-               /* Cancel LOOKUP lock on its parent */
-               rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx,
-                                     LCK_EX, MDS_INODELOCK_LOOKUP,
-                                     MF_MDC_CANCEL_FID3);
-               if (rc != 0)
+       /* cancel UPDATE lock of parent master object */
+       rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
+       if (rc)
+               RETURN(rc);
+
+       /* cancel UPDATE lock of source parent */
+       if (sp_tgt != parent_tgt) {
+               /*
+                * migrate RPC packs master object FID, because we can only pack
+                * two FIDs in reint RPC, but MDS needs to know both source
+                * parent and target parent, and it will obtain them from master
+                * FID and LMV, the other FID in RPC is kept for target.
+                *
+                * since this FID is not passed to MDC, cancel it anyway.
+                */
+               rc = lmv_early_cancel(exp, sp_tgt, op_data, -1, LCK_EX,
+                                     MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID4);
+               if (rc)
                        RETURN(rc);
 
-               rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
-                                     LCK_EX, MDS_INODELOCK_ELC,
+               op_data->op_flags &= ~MF_MDC_CANCEL_FID4;
+       }
+       op_data->op_fid4 = target_fid;
+
+       /* cancel UPDATE locks of target parent */
+       rc = lmv_early_cancel(exp, tp_tgt, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
+       if (rc)
+               RETURN(rc);
+
+       /* cancel LOOKUP lock of source if source is remote object */
+       if (child_tgt != sp_tgt) {
+               rc = lmv_early_cancel(exp, sp_tgt, op_data, tgt->ltd_idx,
+                                     LCK_EX, MDS_INODELOCK_LOOKUP,
                                      MF_MDC_CANCEL_FID3);
-               if (rc != 0)
+               if (rc)
                        RETURN(rc);
        }
 
-retry_rename:
-       /*
-        * Cancel all the locks on tgt child (fid4).
-        */
-       if (fid_is_sane(&op_data->op_fid4)) {
-               struct lmv_tgt_desc *tgt;
+       /* cancel ELC locks of source */
+       rc = lmv_early_cancel(exp, child_tgt, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_ELC, MF_MDC_CANCEL_FID3);
+       if (rc)
+               RETURN(rc);
 
-               rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
-                                     LCK_EX, MDS_INODELOCK_ELC,
-                                     MF_MDC_CANCEL_FID4);
-               if (rc != 0)
-                       RETURN(rc);
+       rc = md_rename(tgt->ltd_exp, op_data, name, namelen, NULL, 0, request);
+
+       RETURN(rc);
+}
+
+static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
+                     const char *old, size_t oldlen,
+                     const char *new, size_t newlen,
+                     struct ptlrpc_request **request)
+{
+       struct obd_device *obd = exp->exp_obd;
+       struct lmv_obd *lmv = &obd->u.lmv;
+       struct lmv_stripe_md *lsm = op_data->op_mea1;
+       struct lmv_tgt_desc *sp_tgt;
+       struct lmv_tgt_desc *tp_tgt = NULL;
+       struct lmv_tgt_desc *tgt;
+       struct mdt_body *body;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(oldlen != 0);
+
+       if (op_data->op_cli_flags & CLI_MIGRATE) {
+               rc = lmv_migrate(exp, op_data, old, oldlen, request);
+               RETURN(rc);
+       }
+
+       op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
+       op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
+       op_data->op_cap = cfs_curproc_cap_pack();
+
+       CDEBUG(D_INODE, "RENAME "DFID"/%.*s to "DFID"/%.*s\n",
+               PFID(&op_data->op_fid1), (int)oldlen, old,
+               PFID(&op_data->op_fid2), (int)newlen, new);
 
+       if (lsm)
+               sp_tgt = lmv_locate_target_for_name(lmv, lsm, old, oldlen,
+                                                   &op_data->op_fid1,
+                                                   &op_data->op_mds);
+       else
+               sp_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+       if (IS_ERR(sp_tgt))
+               RETURN(PTR_ERR(sp_tgt));
+
+       lsm = op_data->op_mea2;
+       if (lsm)
+               tp_tgt = lmv_locate_target_for_name(lmv, lsm, new, newlen,
+                                                   &op_data->op_fid2,
+                                                   &op_data->op_mds);
+       else
+               tp_tgt = lmv_find_target(lmv, &op_data->op_fid2);
+       if (IS_ERR(tp_tgt))
+               RETURN(PTR_ERR(tp_tgt));
+
+       /* Since the target child might be destroyed, and it might become
+        * orphan, and we can only check orphan on the local MDT right now, so
+        * we send rename request to the MDT where target child is located. If
+        * target child does not exist, then it will send the request to the
+        * target parent */
+       if (fid_is_sane(&op_data->op_fid4)) {
                tgt = lmv_find_target(lmv, &op_data->op_fid4);
                if (IS_ERR(tgt))
                        RETURN(PTR_ERR(tgt));
+       } else {
+               tgt = tp_tgt;
+       }
+
+       op_data->op_flags |= MF_MDC_CANCEL_FID4;
+
+       /* cancel UPDATE locks of source parent */
+       rc = lmv_early_cancel(exp, sp_tgt, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* cancel UPDATE locks of target parent */
+       rc = lmv_early_cancel(exp, tp_tgt, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (fid_is_sane(&op_data->op_fid3)) {
+               struct lmv_tgt_desc *src_tgt;
+
+               src_tgt = lmv_find_target(lmv, &op_data->op_fid3);
+               if (IS_ERR(src_tgt))
+                       RETURN(PTR_ERR(src_tgt));
+
+               /* cancel LOOKUP lock of source on source parent */
+               if (src_tgt != sp_tgt) {
+                       rc = lmv_early_cancel(exp, sp_tgt, op_data,
+                                             tgt->ltd_idx, LCK_EX,
+                                             MDS_INODELOCK_LOOKUP,
+                                             MF_MDC_CANCEL_FID3);
+                       if (rc != 0)
+                               RETURN(rc);
+               }
+
+               /* cancel ELC locks of source */
+               rc = lmv_early_cancel(exp, src_tgt, op_data, tgt->ltd_idx,
+                                     LCK_EX, MDS_INODELOCK_ELC,
+                                     MF_MDC_CANCEL_FID3);
+               if (rc != 0)
+                       RETURN(rc);
+       }
 
-               /* Since the target child might be destroyed, and it might
-                * become orphan, and we can only check orphan on the local
-                * MDT right now, so we send rename request to the MDT where
-                * target child is located. If target child does not exist,
-                * then it will send the request to the target parent */
-               target_exp = tgt->ltd_exp;
+retry_rename:
+       if (fid_is_sane(&op_data->op_fid4)) {
+               /* cancel LOOKUP lock of target on target parent */
+               if (tgt != tp_tgt) {
+                       rc = lmv_early_cancel(exp, tp_tgt, op_data,
+                                             tgt->ltd_idx, LCK_EX,
+                                             MDS_INODELOCK_LOOKUP,
+                                             MF_MDC_CANCEL_FID4);
+                       if (rc != 0)
+                               RETURN(rc);
+               }
        }
 
-       rc = md_rename(target_exp, op_data, old, oldlen, new, newlen,
-                      request);
+       rc = md_rename(tgt->ltd_exp, op_data, old, oldlen, new, newlen,
+                       request);
 
        if (rc != 0 && rc != -EXDEV)
                RETURN(rc);
@@ -2032,6 +2162,11 @@ retry_rename:
        op_data->op_fid4 = body->mbo_fid1;
        ptlrpc_req_finished(*request);
        *request = NULL;
+
+       tgt = lmv_find_target(lmv, &op_data->op_fid4);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
        goto retry_rename;
 }
 
@@ -2781,13 +2916,15 @@ static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
        else
                lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
        lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
+       lsm->lsm_md_migrate_offset = le32_to_cpu(lmm1->lmv_migrate_offset);
+       lsm->lsm_md_migrate_hash = le32_to_cpu(lmm1->lmv_migrate_hash);
        cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
                        sizeof(lsm->lsm_md_pool_name));
 
        if (cplen >= sizeof(lsm->lsm_md_pool_name))
                RETURN(-E2BIG);
 
-       CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d"
+       CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %#x "
               "layout_version %d\n", lsm->lsm_md_stripe_count,
               lsm->lsm_md_master_mdt_index, lsm->lsm_md_hash_type,
               lsm->lsm_md_layout_version);
@@ -2822,14 +2959,9 @@ static int lmv_unpackmd(struct obd_export *exp, struct lmv_stripe_md **lsmp,
        /* Free memmd */
        if (lsm != NULL && lmm == NULL) {
                int i;
-               for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
-                       /* For migrating inode, the master stripe and master
-                        * object will be the same, so do not need iput, see
-                        * ll_update_lsm_md */
-                       if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION &&
-                             i == 0) && lsm->lsm_md_oinfo[i].lmo_root != NULL)
-                               iput(lsm->lsm_md_oinfo[i].lmo_root);
-               }
+
+               for (i = 0; i < lsm->lsm_md_stripe_count; i++)
+                       iput(lsm->lsm_md_oinfo[i].lmo_root);
                lsm_size = lmv_stripe_md_size(lsm->lsm_md_stripe_count);
                OBD_FREE(lsm, lsm_size);
                *lsmp = NULL;
index b478332..ed7dcf1 100644 (file)
@@ -317,6 +317,8 @@ struct lod_object {
                        __u16           ldo_dir_stripes_allocated;
                        __u32           ldo_dir_stripe_offset;
                        __u32           ldo_dir_hash_type;
+                       __u32           ldo_dir_migrate_offset;
+                       __u32           ldo_dir_migrate_hash;
                        /* Is a slave stripe of striped directory? */
                        __u32           ldo_dir_slave_stripe:1,
                                        ldo_dir_striped:1,
index 908f675..cd38047 100644 (file)
@@ -776,14 +776,9 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo,
        int                      rc;
        ENTRY;
 
-       /* If it is not a striped directory, then load nothing. */
        if (magic != LMV_MAGIC_V1)
                RETURN(0);
 
-       /* If it is in migration (or failure), then load nothing. */
-       if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
-               RETURN(0);
-
        stripes = le32_to_cpu(lmv1->lmv_stripe_count);
        if (stripes < 1)
                RETURN(0);
@@ -1615,9 +1610,15 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
        }
 
        lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
+       memset(lmm1, 0, sizeof(*lmm1));
        lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
        lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
        lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
+       if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) {
+               lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash);
+               lmm1->lmv_migrate_offset =
+                       cpu_to_le32(lo->ldo_dir_migrate_offset);
+       }
        rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
                            &mdtidx, &type);
        if (rc != 0)
@@ -2105,6 +2106,284 @@ out:
 }
 
 /**
+ * Append source stripes after target stripes for migrating directory. NB, we
+ * only need to declare this, the append is done inside lod_xattr_set_lmv().
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       target object
+ * \param[in] buf      LMV buf which contains source stripe fids
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 on success
+ * \retval             negative if failed
+ */
+static int lod_dir_declare_layout_add(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     const struct lu_buf *buf,
+                                     struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next = dt_object_child(dt);
+       struct dt_object_format *dof = &info->lti_format;
+       struct lmv_mds_md_v1 *lmv = buf->lb_buf;
+       struct dt_object **stripe;
+       __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+       struct lu_fid *fid = &info->lti_fid;
+       struct lod_tgt_desc *tgt;
+       struct dt_object *dto;
+       struct dt_device *tgt_dt;
+       int type = LU_SEQ_RANGE_ANY;
+       struct dt_insert_rec *rec = &info->lti_dt_rec;
+       char *stripe_name = info->lti_key;
+       struct lu_name *sname;
+       struct linkea_data ldata = { NULL };
+       struct lu_buf linkea_buf;
+       __u32 idx;
+       int i;
+       int rc;
+
+       ENTRY;
+
+       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
+               RETURN(-EINVAL);
+
+       if (stripe_count == 0)
+               RETURN(-EINVAL);
+
+       dof->dof_type = DFT_DIR;
+
+       OBD_ALLOC(stripe,
+                 sizeof(*stripe) * (lo->ldo_dir_stripe_count + stripe_count));
+       if (stripe == NULL)
+               RETURN(-ENOMEM);
+
+       for (i = 0; i < lo->ldo_dir_stripe_count; i++)
+               stripe[i] = lo->ldo_stripe[i];
+
+       for (i = 0; i < stripe_count; i++) {
+               fid_le_to_cpu(fid,
+                       &lmv->lmv_stripe_fids[i]);
+               if (!fid_is_sane(fid))
+                       GOTO(out, rc = -ESTALE);
+
+               rc = lod_fld_lookup(env, lod, fid, &idx, &type);
+               if (rc)
+                       GOTO(out, rc);
+
+               if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) {
+                       tgt_dt = lod->lod_child;
+               } else {
+                       tgt = LTD_TGT(ltd, idx);
+                       if (tgt == NULL)
+                               GOTO(out, rc = -ESTALE);
+                       tgt_dt = tgt->ltd_tgt;
+               }
+
+               dto = dt_locate_at(env, tgt_dt, fid,
+                                 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
+                                 NULL);
+               if (IS_ERR(dto))
+                       GOTO(out, rc = PTR_ERR(dto));
+
+               stripe[i + lo->ldo_dir_stripe_count] = dto;
+
+               if (!dt_try_as_dir(env, dto))
+                       GOTO(out, rc = -ENOTDIR);
+
+               rc = lod_sub_declare_ref_add(env, dto, th);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_sub_declare_insert(env, dto,
+                                           (const struct dt_rec *)rec,
+                                           (const struct dt_key *)dot, th);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_sub_declare_insert(env, dto,
+                                           (const struct dt_rec *)rec,
+                                           (const struct dt_key *)dotdot, th);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_sub_declare_xattr_set(env, dto, buf,
+                                               XATTR_NAME_LMV, 0, th);
+               if (rc)
+                       GOTO(out, rc);
+
+               snprintf(stripe_name, sizeof(info->lti_key), DFID":%u",
+                        PFID(lu_object_fid(&dto->do_lu)),
+                        i + lo->ldo_dir_stripe_count);
+
+               sname = lod_name_get(env, stripe_name, strlen(stripe_name));
+               rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+                                     sname, lu_object_fid(&dt->do_lu));
+               if (rc)
+                       GOTO(out, rc);
+
+               linkea_buf.lb_buf = ldata.ld_buf->lb_buf;
+               linkea_buf.lb_len = ldata.ld_leh->leh_len;
+               rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf,
+                                              XATTR_NAME_LINK, 0, th);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_sub_declare_insert(env, next,
+                                           (const struct dt_rec *)rec,
+                                           (const struct dt_key *)stripe_name,
+                                           th);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_sub_declare_ref_add(env, next, th);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       if (lo->ldo_stripe)
+               OBD_FREE(lo->ldo_stripe,
+                        sizeof(*stripe) * lo->ldo_dir_stripes_allocated);
+       lo->ldo_stripe = stripe;
+       lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count;
+       lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type);
+       lo->ldo_dir_stripe_count += stripe_count;
+       lo->ldo_dir_stripes_allocated += stripe_count;
+       lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION;
+
+       RETURN(0);
+out:
+       i = lo->ldo_dir_stripe_count;
+       while (i < lo->ldo_dir_stripe_count + stripe_count && stripe[i])
+               dt_object_put(env, stripe[i++]);
+
+       OBD_FREE(stripe,
+                sizeof(*stripe) * (stripe_count + lo->ldo_dir_stripe_count));
+       RETURN(rc);
+}
+
+static int lod_dir_declare_layout_delete(const struct lu_env *env,
+                                        struct dt_object *dt,
+                                        const struct lu_buf *buf,
+                                        struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next = dt_object_child(dt);
+       struct lmv_user_md *lmu = buf->lb_buf;
+       __u32 final_stripe_count;
+       char *stripe_name = info->lti_key;
+       struct dt_object *dto;
+       int i;
+       int rc = 0;
+
+       if (!lmu)
+               return -EINVAL;
+
+       final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
+       if (final_stripe_count >= lo->ldo_dir_stripe_count)
+               return -EINVAL;
+
+       for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
+               dto = lo->ldo_stripe[i];
+               LASSERT(dto);
+
+               if (!dt_try_as_dir(env, dto))
+                       return -ENOTDIR;
+
+               rc = lod_sub_declare_delete(env, dto,
+                                           (const struct dt_key *)dot, th);
+               if (rc)
+                       return rc;
+
+               rc = lod_sub_declare_ref_del(env, dto, th);
+               if (rc)
+                       return rc;
+
+               rc = lod_sub_declare_delete(env, dto,
+                                       (const struct dt_key *)dotdot, th);
+               if (rc)
+                       return rc;
+
+               snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+                        PFID(lu_object_fid(&dto->do_lu)), i);
+
+               rc = lod_sub_declare_delete(env, next,
+                                       (const struct dt_key *)stripe_name, th);
+               if (rc)
+                       return rc;
+
+               rc = lod_sub_declare_ref_del(env, next, th);
+               if (rc)
+                       return rc;
+       }
+
+       return 0;
+}
+
+/*
+ * delete stripes from dir master object, the lum_stripe_count in argument is
+ * the final stripe count, the stripes after that will be deleted, NB, they
+ * are not destroyed, but deleted from it's parent namespace, this function
+ * will be called in two places:
+ * 1. mdd_migrate_create() delete stripes from source, and append them to
+ *    target.
+ * 2. mdd_dir_layout_shrink() delete stripes from source, and destroy them.
+ */
+static int lod_dir_layout_delete(const struct lu_env *env,
+                                struct dt_object *dt,
+                                const struct lu_buf *buf,
+                                struct thandle *th)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next = dt_object_child(dt);
+       struct lmv_user_md *lmu = buf->lb_buf;
+       __u32 final_stripe_count;
+       char *stripe_name = info->lti_key;
+       struct dt_object *dto;
+       int i;
+       int rc = 0;
+
+       ENTRY;
+
+       if (!lmu)
+               RETURN(-EINVAL);
+
+       final_stripe_count = le32_to_cpu(lmu->lum_stripe_count);
+       if (final_stripe_count >= lo->ldo_dir_stripe_count)
+               RETURN(-EINVAL);
+
+       for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) {
+               dto = lo->ldo_stripe[i];
+               LASSERT(dto);
+
+               rc = lod_sub_delete(env, dto,
+                                   (const struct dt_key *)dotdot, th);
+               if (rc)
+                       break;
+
+               snprintf(stripe_name, sizeof(info->lti_key), DFID":%d",
+                        PFID(lu_object_fid(&dto->do_lu)), i);
+
+               rc = lod_sub_delete(env, next,
+                                   (const struct dt_key *)stripe_name, th);
+               if (rc)
+                       break;
+
+               rc = lod_sub_ref_del(env, next, th);
+               if (rc)
+                       break;
+       }
+
+       lod_striping_free(env, lod_dt_obj(dt));
+
+       RETURN(rc);
+}
+
+/**
  * Implementation of dt_object_operations::do_declare_xattr_set.
  *
  * Used with regular (non-striped) objects. Basically it
@@ -2195,7 +2474,13 @@ lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env,
        }
 
        filter_fid_le_to_cpu(ff, ff, sizeof(*ff));
-       if (lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) &&
+
+       /*
+        * mdd_declare_migrate_create() declares this via source object because
+        * target is not ready yet, so declare anyway.
+        */
+       if (!data->locd_declare &&
+           lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) &&
            ff->ff_layout.ol_comp_id == comp->llc_id)
                return 0;
 
@@ -3043,6 +3328,17 @@ static int lod_declare_xattr_set(const struct lu_env *env,
                        RETURN(-ENOENT);
 
                rc = lod_declare_modify_layout(env, dt, name, buf, th);
+       } else if (strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 &&
+                  strlen(name) > strlen(XATTR_NAME_LMV) + 1) {
+               const char *op = name + strlen(XATTR_NAME_LMV) + 1;
+
+               rc = -ENOTSUPP;
+               if (strcmp(op, "add") == 0)
+                       rc = lod_dir_declare_layout_add(env, dt, buf, th);
+               else if (strcmp(op, "del") == 0)
+                       rc = lod_dir_declare_layout_delete(env, dt, buf, th);
+
+               RETURN(rc);
        } else if (S_ISDIR(mode)) {
                rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th);
        } else if (strcmp(name, XATTR_NAME_FID) == 0) {
@@ -3338,31 +3634,34 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
 
        rec->rec_type = S_IFDIR;
        for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
-               struct dt_object *dto;
-               char             *stripe_name = info->lti_key;
-               struct lu_name          *sname;
-               struct linkea_data       ldata          = { NULL };
-               struct lu_buf            linkea_buf;
-
-               dto = lo->ldo_stripe[i];
+               struct dt_object *dto = lo->ldo_stripe[i];
+               char *stripe_name = info->lti_key;
+               struct lu_name *sname;
+               struct linkea_data ldata = { NULL };
+               struct lu_buf linkea_buf;
+
+               /* if it's source stripe of migrating directory, don't create */
+               if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) &&
+                     i >= lo->ldo_dir_migrate_offset)) {
+                       dt_write_lock(env, dto, MOR_TGT_CHILD);
+                       rc = lod_sub_create(env, dto, attr, NULL, dof, th);
+                       if (rc != 0) {
+                               dt_write_unlock(env, dto);
+                               GOTO(out, rc);
+                       }
 
-               dt_write_lock(env, dto, MOR_TGT_CHILD);
-               rc = lod_sub_create(env, dto, attr, NULL, dof, th);
-               if (rc != 0) {
+                       rc = lod_sub_ref_add(env, dto, th);
                        dt_write_unlock(env, dto);
-                       GOTO(out, rc);
-               }
-
-               rc = lod_sub_ref_add(env, dto, th);
-               dt_write_unlock(env, dto);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       if (rc != 0)
+                               GOTO(out, rc);
 
-               rec->rec_fid = lu_object_fid(&dto->do_lu);
-               rc = lod_sub_insert(env, dto, (const struct dt_rec *)rec,
-                                   (const struct dt_key *)dot, th, 0);
-               if (rc != 0)
-                       GOTO(out, rc);
+                       rec->rec_fid = lu_object_fid(&dto->do_lu);
+                       rc = lod_sub_insert(env, dto,
+                                           (const struct dt_rec *)rec,
+                                           (const struct dt_key *)dot, th, 0);
+                       if (rc != 0)
+                               GOTO(out, rc);
+               }
 
                rec->rec_fid = lu_object_fid(&dt->do_lu);
                rc = lod_sub_insert(env, dto, (struct dt_rec *)rec,
@@ -3787,18 +4086,23 @@ static int lod_xattr_set(const struct lu_env *env,
 
        if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
            strcmp(name, XATTR_NAME_LMV) == 0) {
-               struct lmv_mds_md_v1 *lmm = buf->lb_buf;
+               rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
+               RETURN(rc);
+       } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+                  strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 &&
+                  strlen(name) > strlen(XATTR_NAME_LMV) + 1) {
+               const char *op = name + strlen(XATTR_NAME_LMV) + 1;
 
-               if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) &
-                                               LMV_HASH_FLAG_MIGRATION)
-                       rc = lod_sub_xattr_set(env, next, buf, name, fl, th);
-               else
-                       rc = lod_dir_striping_create(env, dt, NULL, NULL, th);
+               rc = -ENOTSUPP;
+               if (strcmp(op, "del") == 0)
+                       rc = lod_dir_layout_delete(env, dt, buf, th);
+               /*
+                * XATTR_NAME_LMV".add" is never called, but only declared,
+                * because lod_xattr_set_lmv() will do the addition.
+                */
 
                RETURN(rc);
-       }
-
-       if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+       } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
            strcmp(name, XATTR_NAME_LOV) == 0) {
                struct lod_thread_info *info = lod_env_info(env);
                struct lod_default_striping *lds = &info->lti_def_striping;
@@ -3925,12 +4229,13 @@ static int lod_declare_xattr_del(const struct lu_env *env,
                                 struct dt_object *dt, const char *name,
                                 struct thandle *th)
 {
-       struct lod_object       *lo = lod_dt_obj(dt);
-       int                     rc;
-       int                     i;
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct dt_object *next = dt_object_child(dt);
+       int i;
+       int rc;
        ENTRY;
 
-       rc = lod_sub_declare_xattr_del(env, dt_object_child(dt), name, th);
+       rc = lod_sub_declare_xattr_del(env, next, name, th);
        if (rc != 0)
                RETURN(rc);
 
@@ -3946,9 +4251,10 @@ static int lod_declare_xattr_del(const struct lu_env *env,
                RETURN(0);
 
        for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
-               LASSERT(lo->ldo_stripe[i]);
-               rc = lod_sub_declare_xattr_del(env, lo->ldo_stripe[i],
-                                              name, th);
+               struct dt_object *dto = lo->ldo_stripe[i];
+
+               LASSERT(dto);
+               rc = lod_sub_declare_xattr_del(env, dto, name, th);
                if (rc != 0)
                        break;
        }
@@ -3973,7 +4279,7 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
        int                     i;
        ENTRY;
 
-       if (!strcmp(name, XATTR_NAME_LOV))
+       if (!strcmp(name, XATTR_NAME_LOV) || !strcmp(name, XATTR_NAME_LMV))
                lod_striping_free(env, lod_dt_obj(dt));
 
        rc = lod_sub_xattr_del(env, next, name, th);
@@ -3984,9 +4290,11 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt,
                RETURN(0);
 
        for (i = 0; i < lo->ldo_dir_stripe_count; i++) {
-               LASSERT(lo->ldo_stripe[i]);
+               struct dt_object *dto = lo->ldo_stripe[i];
 
-               rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th);
+               LASSERT(dto);
+
+               rc = lod_sub_xattr_del(env, dto, name, th);
                if (rc != 0)
                        break;
        }
@@ -4460,19 +4768,19 @@ static void lod_ah_init(const struct lu_env *env,
                } else {
                        /* transfer defaults LMV to new directory */
                        lod_striping_from_default(lc, lds, child_mode);
+
+                       /* set count 0 to create normal directory */
+                       if (lc->ldo_dir_stripe_count == 1)
+                               lc->ldo_dir_stripe_count = 0;
                }
 
                /* shrink the stripe_count to the avaible MDT count */
                if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 &&
-                   !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
+                   !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) {
                        lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1;
-
-               /* Directory will be striped only if stripe_count > 1, if
-                * stripe_count == 1, let's reset stripe_count = 0 to avoid
-                * create single master stripe and also help to unify the
-                * stripe handling of directories and files */
-               if (lc->ldo_dir_stripe_count == 1)
-                       lc->ldo_dir_stripe_count = 0;
+                       if (lc->ldo_dir_stripe_count == 1)
+                               lc->ldo_dir_stripe_count = 0;
+               }
 
                CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n",
                       lc->ldo_dir_stripe_count,
@@ -5295,7 +5603,6 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt,
                RETURN(0);
 
        LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
-       LASSERT(lo->ldo_dir_stripe_count > 1);
        /* Note: for remote lock for single stripe dir, MDT will cancel
         * the lock by lockh directly */
        LASSERT(!dt_object_remote(dt_object_child(dt)));
index a3ed60a..18d3001 100644 (file)
@@ -63,6 +63,8 @@ void mdc_link_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
 void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
                     const char *old, size_t oldlen,
                     const char *new, size_t newlen);
+void mdc_migrate_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+                       const char *name, size_t namelen);
 void mdc_close_pack(struct ptlrpc_request *req, struct md_op_data *op_data);
 
 /* mdc/mdc_locks.c */
index 0faf5bf..2481e63 100644 (file)
@@ -496,8 +496,7 @@ void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
        rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
 
        /* XXX do something about time, uid, gid */
-       rec->rn_opcode  = op_data->op_cli_flags & CLI_MIGRATE ?
-                                       REINT_MIGRATE : REINT_RENAME;
+       rec->rn_opcode   = REINT_RENAME;
        rec->rn_fsuid    = op_data->op_fsuid;
        rec->rn_fsgid    = op_data->op_fsgid;
        rec->rn_cap      = op_data->op_cap;
@@ -513,22 +512,41 @@ void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
 
        if (new != NULL)
                mdc_pack_name(req, &RMF_SYMTGT, new, newlen);
+}
 
-       if (op_data->op_cli_flags & CLI_MIGRATE) {
-               char *tmp;
+void mdc_migrate_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
+                     const char *name, size_t namelen)
+{
+       struct mdt_rec_rename *rec;
+       char *ea;
 
-               if (op_data->op_bias & MDS_CLOSE_MIGRATE) {
-                       struct mdt_ioepoch *epoch;
+       CLASSERT(sizeof(struct mdt_rec_reint) == sizeof(struct mdt_rec_rename));
+       rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
 
-                       mdc_close_intent_pack(req, op_data);
-                       epoch = req_capsule_client_get(&req->rq_pill,
-                                                       &RMF_MDT_EPOCH);
-                       mdc_ioepoch_pack(epoch, op_data);
-               }
+       rec->rn_opcode   = REINT_MIGRATE;
+       rec->rn_fsuid    = op_data->op_fsuid;
+       rec->rn_fsgid    = op_data->op_fsgid;
+       rec->rn_cap      = op_data->op_cap;
+       rec->rn_suppgid1 = op_data->op_suppgids[0];
+       rec->rn_suppgid2 = op_data->op_suppgids[1];
+       rec->rn_fid1     = op_data->op_fid1;
+       rec->rn_fid2     = op_data->op_fid4;
+       rec->rn_time     = op_data->op_mod_time;
+       rec->rn_mode     = op_data->op_mode;
+       rec->rn_bias     = op_data->op_bias;
 
-               tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
-               memcpy(tmp, op_data->op_data, op_data->op_data_size);
+       mdc_pack_name(req, &RMF_NAME, name, namelen);
+
+       if (op_data->op_bias & MDS_CLOSE_MIGRATE) {
+               struct mdt_ioepoch *epoch;
+
+               mdc_close_intent_pack(req, op_data);
+               epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
+               mdc_ioepoch_pack(epoch, op_data);
        }
+
+       ea = req_capsule_client_get(&req->rq_pill, &RMF_EADATA);
+       memcpy(ea, op_data->op_data, op_data->op_data_size);
 }
 
 void mdc_getattr_pack(struct ptlrpc_request *req, __u64 valid, __u32 flags,
index bd45df9..ae15cda 100644 (file)
@@ -409,7 +409,10 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
        if (exp_connect_cancelset(exp) && req)
                ldlm_cli_cancel_list(&cancels, count, req, 0);
 
-       mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
+       if (op_data->op_cli_flags & CLI_MIGRATE)
+               mdc_migrate_pack(req, op_data, old, oldlen);
+       else
+               mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
 
        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
                             obd->u.cli.cl_default_mds_easize);
index ddc62fa..aa87adc 100644 (file)
@@ -42,6 +42,7 @@
 #include <obd_support.h>
 #include <lustre_mds.h>
 #include <lustre_fid.h>
+#include <lustre_lmv.h>
 
 #include "mdd_internal.h"
 
@@ -1938,8 +1939,8 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
 static int mdd_object_initialize(const struct lu_env *env,
                                 const struct lu_fid *pfid,
                                 struct mdd_object *child,
-                                struct lu_attr *attr, struct thandle *handle,
-                                const struct md_op_spec *spec)
+                                struct lu_attr *attr,
+                                struct thandle *handle)
 {
        int rc = 0;
        ENTRY;
@@ -2104,7 +2105,7 @@ static int mdd_declare_create_object(const struct lu_env *env,
                GOTO(out, rc);
 
 #ifdef CONFIG_FS_POSIX_ACL
-       if (def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
+       if (def_acl_buf && def_acl_buf->lb_len > 0 && S_ISDIR(attr->la_mode)) {
                /* if dir, then can inherit default ACl */
                rc = mdo_declare_xattr_set(env, c, def_acl_buf,
                                           XATTR_NAME_ACL_DEFAULT,
@@ -2113,7 +2114,7 @@ static int mdd_declare_create_object(const struct lu_env *env,
                        GOTO(out, rc);
        }
 
-       if (acl_buf->lb_len > 0) {
+       if (acl_buf && acl_buf->lb_len > 0) {
                rc = mdo_declare_attr_set(env, c, attr, handle);
                if (rc)
                        GOTO(out, rc);
@@ -2133,8 +2134,10 @@ static int mdd_declare_create_object(const struct lu_env *env,
            (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
                buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
                                        spec->u.sp_ea.eadatalen);
-               rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
-                                          handle);
+               rc = mdo_declare_xattr_set(env, c, buf,
+                                          S_ISDIR(attr->la_mode) ?
+                                               XATTR_NAME_LMV : XATTR_NAME_LOV,
+                                          0, handle);
                if (rc)
                        GOTO(out, rc);
        }
@@ -2289,8 +2292,7 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
         * created in declare phase, they also needs to be added to master
         * object as sub-directory entry. So it has to initialize the master
         * object, then set dir striped EA.(in mdo_xattr_set) */
-       rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
-                                  spec);
+       rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle);
        if (rc != 0)
                GOTO(err_destroy, rc);
 
@@ -2316,8 +2318,8 @@ static int mdd_create_object(const struct lu_env *env, struct mdd_object *pobj,
                                        spec->u.sp_ea.eadatalen);
                rc = mdo_xattr_set(env, son, buf,
                                   S_ISDIR(attr->la_mode) ? XATTR_NAME_LMV :
-                                                           XATTR_NAME_LOV, 0,
-                                  handle);
+                                                           XATTR_NAME_LOV,
+                                  0, handle);
                if (rc != 0)
                        GOTO(err_destroy, rc);
        }
@@ -2520,6 +2522,20 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        if (rc < 0)
                GOTO(out_stop, rc);
 
+       if (S_ISDIR(attr->la_mode)) {
+               struct lmv_user_md *lmu = spec->u.sp_ea.eadata;
+
+               /*
+                * migrate may create 1-stripe directory, so lod_ah_init()
+                * doesn't adjust stripe count from lmu.
+                */
+               if (lmu && lmu->lum_stripe_count == cpu_to_le32(1)) {
+                       info->mti_lmu = *lmu;
+                       info->mti_lmu.lum_stripe_count = 0;
+                       spec->u.sp_ea.eadata = &info->mti_lmu;
+               }
+       }
+
        mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
 
        memset(ldata, 0, sizeof(*ldata));
@@ -3169,270 +3185,213 @@ out_pending:
 }
 
 /**
- * During migration once the parent FID has been changed,
- * we need update the parent FID in linkea.
+ * Check whether we should migrate the file/dir
+ * return val
+ *     < 0  permission check failed or other error.
+ *     = 0  the file can be migrated.
  **/
-static int mdd_linkea_update_child_internal(const struct lu_env *env,
-                                           struct mdd_object *parent,
-                                           struct mdd_object *newparent,
-                                           struct mdd_object *child,
-                                           const char *name, int namelen,
-                                           struct thandle *handle,
-                                           bool declare)
+static int mdd_migrate_sanity_check(const struct lu_env *env,
+                                   struct mdd_device *mdd,
+                                   struct mdd_object *spobj,
+                                   struct mdd_object *tpobj,
+                                   struct mdd_object *sobj,
+                                   struct mdd_object *tobj,
+                                   const struct lu_attr *spattr,
+                                   const struct lu_attr *tpattr,
+                                   const struct lu_attr *attr)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct linkea_data      ldata = { NULL };
-       struct lu_buf           *buf = &info->mti_link_buf;
-       int                     count;
-       int                     rc = 0;
+       int rc;
 
        ENTRY;
 
-       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
-       if (buf->lb_buf == NULL)
-               RETURN(-ENOMEM);
-
-       ldata.ld_buf = buf;
-       rc = mdd_links_read(env, child, &ldata);
-       if (rc != 0) {
-               if (rc == -ENOENT || rc == -ENODATA)
-                       rc = 0;
-               RETURN(rc);
+       if (!mdd_object_remote(sobj)) {
+               mdd_read_lock(env, sobj, MOR_SRC_CHILD);
+               if (sobj->mod_count > 0) {
+                       CDEBUG(D_INFO, "%s: "DFID" is opened, count %d\n",
+                              mdd2obd_dev(mdd)->obd_name, PFID(mdo2fid(sobj)),
+                              sobj->mod_count);
+                       mdd_read_unlock(env, sobj);
+                       RETURN(-EBUSY);
+               }
+               mdd_read_unlock(env, sobj);
        }
 
-       LASSERT(ldata.ld_leh != NULL);
-       ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
-       for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
-               struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
-               struct lu_name lname;
-               struct lu_fid  fid;
-
-               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
-                                   &lname, &fid);
-
-               if (strncmp(lname.ln_name, name, namelen) != 0 ||
-                   !lu_fid_eq(&fid, mdd_object_fid(parent))) {
-                       ldata.ld_lee = (struct link_ea_entry *)
-                                      ((char *)ldata.ld_lee +
-                                       ldata.ld_reclen);
-                       continue;
-               }
+       if (mdd_object_exists(tobj))
+               RETURN(-EEXIST);
 
-               CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
-                      mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
-                      lname.ln_namelen, lname.ln_name,
-                      PFID(mdd_object_fid(newparent)));
-               /* update to the new parent fid */
-               linkea_entry_pack(ldata.ld_lee, &lname,
-                                 mdd_object_fid(newparent));
-               if (declare)
-                       rc = mdd_declare_links_add(env, child, handle, &ldata);
-               else
-                       rc = mdd_links_write(env, child, &ldata, handle);
-               break;
-       }
+       rc = mdd_rename_sanity_check(env, spobj, spattr, tpobj, tpattr, sobj,
+                                    attr, NULL, NULL);
        RETURN(rc);
 }
 
-static int mdd_linkea_declare_update_child(const struct lu_env *env,
-                                          struct mdd_object *parent,
-                                          struct mdd_object *newparent,
-                                          struct mdd_object *child,
-                                          const char *name, int namelen,
-                                          struct thandle *handle)
+typedef int (*mdd_dir_stripe_cb)(const struct lu_env *env,
+                                struct mdd_object *obj,
+                                struct mdd_object *stripe,
+                                const struct lu_buf *lmv_buf,
+                                const struct lu_buf *lmu_buf,
+                                int index,
+                                struct thandle *handle);
+
+static int mdd_dir_declare_delete_stripe(const struct lu_env *env,
+                                        struct mdd_object *obj,
+                                        struct mdd_object *stripe,
+                                        const struct lu_buf *lmv_buf,
+                                        const struct lu_buf *lmu_buf,
+                                        int index,
+                                        struct thandle *handle)
 {
-       return mdd_linkea_update_child_internal(env, parent, newparent,
-                                               child, name,
-                                               namelen, handle, true);
-}
+       struct mdd_thread_info *info = mdd_env_info(env);
+       char *stripe_name = info->mti_name;
+       struct lmv_user_md *lmu = lmu_buf->lb_buf;
+       int rc;
 
-static int mdd_linkea_update_child(const struct lu_env *env,
-                                  struct mdd_object *parent,
-                                  struct mdd_object *newparent,
-                                  struct mdd_object *child,
-                                  const char *name, int namelen,
-                                  struct thandle *handle)
-{
-       return mdd_linkea_update_child_internal(env, parent, newparent,
-                                               child, name,
-                                               namelen, handle, false);
+       if (index < le32_to_cpu(lmu->lum_stripe_count))
+               return 0;
+
+       rc = mdo_declare_index_delete(env, stripe, dotdot, handle);
+       if (rc)
+               return rc;
+
+       snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
+                PFID(mdd_object_fid(stripe)), index);
+
+       rc = mdo_declare_index_delete(env, obj, stripe_name, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_ref_del(env, obj, handle);
+
+       return rc;
 }
 
-static int mdd_update_linkea_internal(const struct lu_env *env,
-                                     struct mdd_object *mdd_pobj,
-                                     struct mdd_object *mdd_sobj,
-                                     struct mdd_object *mdd_tobj,
-                                     const struct lu_name *child_name,
-                                     struct linkea_data *ldata,
-                                     struct thandle *handle,
-                                     int declare)
+/* delete stripe from its master object namespace */
+static int mdd_dir_delete_stripe(const struct lu_env *env,
+                                struct mdd_object *obj,
+                                struct mdd_object *stripe,
+                                const struct lu_buf *lmv_buf,
+                                const struct lu_buf *lmu_buf,
+                                int index,
+                                struct thandle *handle)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       int                     count;
-       int                     rc = 0;
-       ENTRY;
-
-       LASSERT(ldata->ld_buf != NULL);
-       LASSERT(ldata->ld_leh != NULL);
+       struct mdd_thread_info *info = mdd_env_info(env);
+       char *stripe_name = info->mti_name;
+       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+       struct lmv_user_md *lmu = lmu_buf->lb_buf;
+       __u32 del_offset = le32_to_cpu(lmu->lum_stripe_count);
+       int rc;
 
-       /* If it is mulitple links file, we need update the name entry for
-        * all parent */
-       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
-       for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
-               struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-               struct mdd_object       *pobj;
-               struct lu_name          lname;
-               struct lu_fid           fid;
-
-               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
-                                   &lname, &fid);
-               pobj = mdd_object_find(env, mdd, &fid);
-               if (IS_ERR(pobj)) {
-                       CWARN("%s: cannot find obj "DFID": rc = %ld\n",
-                             mdd2obd_dev(mdd)->obd_name, PFID(&fid),
-                             PTR_ERR(pobj));
-                       continue;
-               }
+       ENTRY;
 
-               if (!mdd_object_exists(pobj)) {
-                       CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
-                             mdd2obd_dev(mdd)->obd_name, PFID(&fid));
-                       goto next_put;
-               }
+       /* local dir will delete via LOD */
+       LASSERT(mdd_object_remote(obj));
+       LASSERT(del_offset < le32_to_cpu(lmv->lmv_stripe_count));
 
-               if (pobj == mdd_pobj &&
-                   lname.ln_namelen == child_name->ln_namelen &&
-                   strncmp(lname.ln_name, child_name->ln_name,
-                           lname.ln_namelen) == 0) {
-                       CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
-                             mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
-                             PFID(&fid));
-                       goto next_put;
-               }
+       if (index < del_offset)
+               RETURN(0);
 
-               CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
-                      mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
-                      PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
+       mdd_write_lock(env, stripe, MOR_SRC_CHILD);
+       rc = __mdd_index_delete_only(env, stripe, dotdot, handle);
+       if (rc)
+               GOTO(out, rc);
 
-               if (declare) {
-                       /* Remove source name from source directory */
-                       /* Insert new fid with target name into target dir */
-                       rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
-                                                     handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
+       snprintf(stripe_name, sizeof(info->mti_name), DFID":%d",
+                PFID(mdd_object_fid(stripe)), index);
 
-                       rc = mdo_declare_index_insert(env, pobj,
-                                       mdd_object_fid(mdd_tobj),
-                                       mdd_object_type(mdd_tobj),
-                                       lname.ln_name, handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
+       rc = __mdd_index_delete_only(env, obj, stripe_name, handle);
+       if (rc)
+               GOTO(out, rc);
 
-                       rc = mdo_declare_ref_add(env, mdd_tobj, handle);
-                       if (rc)
-                               GOTO(next_put, rc);
+       rc = mdo_ref_del(env, obj, handle);
+       GOTO(out, rc);
+out:
+       mdd_write_unlock(env, stripe);
 
-                       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-                       if (rc)
-                               GOTO(next_put, rc);
-               } else {
-                       char *tmp_name = info->mti_key;
-
-                       if (lname.ln_namelen >= sizeof(info->mti_key)) {
-                               /* lnamelen is too big(> NAME_MAX + 16),
-                                * something wrong about this linkea, let's
-                                * skip it */
-                               CWARN("%s: the name %.*s is too long under "
-                                     DFID"\n", mdd2obd_dev(mdd)->obd_name,
-                                     lname.ln_namelen, lname.ln_name,
-                                     PFID(&fid));
-                               goto next_put;
-                       }
+       return rc;
+}
 
-                       /* Note: lname might be without \0 at the end, see
-                        * linkea_entry_unpack(), let's add extra \0 by
-                        * snprintf */
-                       snprintf(tmp_name, sizeof(info->mti_key), "%.*s",
-                                lname.ln_namelen, lname.ln_name);
-                       lname.ln_name = tmp_name;
-
-                       /* Let's check if this linkEA still valid, before
-                        * it might be packed into the RPC buffer. */
-                       rc = mdd_lookup(env, &pobj->mod_obj, &lname,
-                                       &info->mti_fid, NULL);
-                       if (rc < 0 || !lu_fid_eq(&info->mti_fid,
-                                                mdd_object_fid(mdd_sobj)))
-                               GOTO(next_put, rc == -ENOENT ? 0 : rc);
-
-                       rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
+/*
+ * iterate stripes of striped directory on remote MDT, local striped directory
+ * is accessed via LOD.
+ */
+static int mdd_dir_iterate_stripes(const struct lu_env *env,
+                                  struct mdd_object *obj,
+                                  const struct lu_buf *lmv_buf,
+                                  const struct lu_buf *lmu_buf,
+                                  struct thandle *handle,
+                                  mdd_dir_stripe_cb cb)
+{
+       struct mdd_device *mdd = mdo2mdd(&obj->mod_obj);
+       struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+       struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf;
+       struct mdd_object *stripe;
+       int i;
+       int rc;
 
-                       rc = __mdd_index_insert(env, pobj,
-                                       mdd_object_fid(mdd_tobj),
-                                       mdd_object_type(mdd_tobj),
-                                       tmp_name, handle);
-                       if (rc != 0)
-                               GOTO(next_put, rc);
+       ENTRY;
 
-                       mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
-                       rc = mdo_ref_add(env, mdd_tobj, handle);
-                       mdd_write_unlock(env, mdd_tobj);
-                       if (rc)
-                               GOTO(next_put, rc);
+       LASSERT(lmv);
 
-                       mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
-                       mdo_ref_del(env, mdd_sobj, handle);
-                       mdd_write_unlock(env, mdd_sobj);
-               }
-next_put:
-               mdd_object_put(env, pobj);
-               if (rc != 0)
-                       break;
+       for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
+               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
+               stripe = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(stripe))
+                       RETURN(PTR_ERR(stripe));
 
-               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
-                                                        ldata->ld_reclen);
+               rc = cb(env, obj, stripe, lmv_buf, lmu_buf, i, handle);
+               mdd_object_put(env, stripe);
+               if (rc)
+                       RETURN(rc);
        }
 
-       RETURN(rc);
+       RETURN(0);
 }
 
-static int mdd_migrate_xattrs(const struct lu_env *env,
-                             struct mdd_object *mdd_sobj,
-                             struct mdd_object *mdd_tobj)
+typedef int (*mdd_xattr_cb)(const struct lu_env *env,
+                           struct mdd_object *obj,
+                           const struct lu_buf *buf,
+                           const char *name,
+                           int fl, struct thandle *handle);
+
+/* iterate xattrs, but ignore LMA, LMV, and LINKEA if 'skip_linkea' is set. */
+static int mdd_iterate_xattrs(const struct lu_env *env,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             bool skip_linkea,
+                             struct thandle *handle,
+                             mdd_xattr_cb cb)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       char                    *xname;
-       struct thandle          *handle;
-       struct lu_buf           xbuf;
-       int                     xlen;
-       int                     rem;
-       int                     xsize;
-       int                     list_xsize;
-       struct lu_buf           list_xbuf;
-       int                     rc;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       char *xname;
+       struct lu_buf list_xbuf;
+       struct lu_buf xbuf = { NULL };
+       int list_xsize;
+       int xlen;
+       int rem;
+       int xsize;
+       int rc;
+
+       ENTRY;
 
        /* retrieve xattr list from the old object */
-       list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL);
+       list_xsize = mdo_xattr_list(env, sobj, &LU_BUF_NULL);
        if (list_xsize == -ENODATA)
-               return 0;
+               RETURN(0);
 
        if (list_xsize < 0)
-               return list_xsize;
+               RETURN(list_xsize);
 
        lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
        if (info->mti_big_buf.lb_buf == NULL)
-               return -ENOMEM;
+               RETURN(-ENOMEM);
 
        list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
        list_xbuf.lb_len = list_xsize;
-       rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf);
+       rc = mdo_xattr_list(env, sobj, &list_xbuf);
        if (rc < 0)
-               return rc;
+               RETURN(rc);
+
+       rem = rc;
        rc = 0;
-       rem = list_xsize;
        xname = list_xbuf.lb_buf;
        while (rem > 0) {
                xlen = strnlen(xname, rem - 1) + 1;
@@ -3440,982 +3399,1045 @@ static int mdd_migrate_xattrs(const struct lu_env *env,
                    strcmp(XATTR_NAME_LMV, xname) == 0)
                        goto next;
 
-               /* For directory, if there are default layout, migrate here */
-               if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
-                   !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
+               if (skip_linkea &&
+                   strcmp(XATTR_NAME_LINK, xname) == 0)
                        goto next;
 
-               xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL, xname);
+               xsize = mdo_xattr_get(env, sobj, &LU_BUF_NULL, xname);
                if (xsize == -ENODATA)
                        goto next;
                if (xsize < 0)
-                       GOTO(out, rc);
+                       GOTO(out, rc = xsize);
 
-               lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
-               if (info->mti_link_buf.lb_buf == NULL)
+               lu_buf_check_and_alloc(&xbuf, xsize);
+               if (xbuf.lb_buf == NULL)
                        GOTO(out, rc = -ENOMEM);
 
-               xbuf.lb_len = xsize;
-               xbuf.lb_buf = info->mti_link_buf.lb_buf;
-               rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname);
+               rc = mdo_xattr_get(env, sobj, &xbuf, xname);
                if (rc == -ENODATA)
                        goto next;
                if (rc < 0)
                        GOTO(out, rc);
 
-               handle = mdd_trans_create(env, mdd);
-               if (IS_ERR(handle))
-                       GOTO(out, rc = PTR_ERR(handle));
-
-               rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
-                                          handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-               /* Note: this transaction is part of migration, and it is not
-                * the last step of migration, so we set th_local = 1 to avoid
-                * update last rcvd for this transaction */
-               handle->th_local = 1;
-               rc = mdd_trans_start(env, mdd, handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-
-again:
-               rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
-               if (rc == -EEXIST)
-                       GOTO(stop_trans, rc = 0);
-
+repeat:
+               rc = cb(env, tobj, &xbuf, xname, 0, handle);
                if (unlikely(rc == -ENOSPC &&
                             strcmp(xname, XATTR_NAME_LINK) == 0)) {
                        rc = linkea_overflow_shrink(
                                        (struct linkea_data *)(xbuf.lb_buf));
                        if (likely(rc > 0)) {
                                xbuf.lb_len = rc;
-                               goto again;
+                               goto repeat;
                        }
                }
 
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-stop_trans:
-               rc = mdd_trans_stop(env, mdd, rc, handle);
-               if (rc != 0)
+               if (rc)
                        GOTO(out, rc);
 next:
+               xname += xlen;
                rem -= xlen;
-               memmove(xname, xname + xlen, rem);
        }
+
 out:
-       return rc;
+       lu_buf_free(&xbuf);
+       RETURN(rc);
 }
 
-static int mdd_declare_migrate_create(const struct lu_env *env,
-                                     struct mdd_object *mdd_pobj,
-                                     struct mdd_object *mdd_sobj,
-                                     struct mdd_object *mdd_tobj,
-                                     struct md_op_spec *spec,
-                                     struct lu_attr *la,
-                                     union lmv_mds_md *mgr_ea,
-                                     struct linkea_data *ldata,
-                                     struct thandle *handle)
+typedef int (*mdd_linkea_cb)(const struct lu_env *env,
+                            struct mdd_object *sobj,
+                            struct mdd_object *tobj,
+                            const struct lu_name *sname,
+                            const struct lu_fid *sfid,
+                            const struct lu_name *lname,
+                            const struct lu_fid *fid,
+                            void *opaque,
+                            struct thandle *handle);
+
+static int mdd_declare_update_link(const struct lu_env *env,
+                                  struct mdd_object *sobj,
+                                  struct mdd_object *tobj,
+                                  const struct lu_name *tname,
+                                  const struct lu_fid *tpfid,
+                                  const struct lu_name *lname,
+                                  const struct lu_fid *fid,
+                                  void *unused,
+                                  struct thandle *handle)
 {
-       struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
-       const struct lu_buf     *buf;
-       int                     rc;
-       int                     mgr_easize;
-
-       rc = mdd_declare_create_object_internal(env, mdd_pobj, mdd_tobj, la,
-                                               handle, spec, NULL);
-       if (rc != 0)
-               return rc;
-
-       rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
-                                          handle);
-       if (rc != 0)
-               return rc;
+       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+       struct mdd_object *pobj;
+       int rc;
 
-       if (S_ISLNK(la->la_mode)) {
-               const char *target_name = spec->u.sp_symname;
-               int sym_len = strlen(target_name);
-               const struct lu_buf *buf;
+       /* ignore tobj */
+       if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+           !strcmp(tname->ln_name, lname->ln_name))
+               return 0;
 
-               buf = mdd_buf_get_const(env, target_name, sym_len);
-               rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
-                                            buf, 0, handle);
-               if (rc != 0)
-                       return rc;
-       } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
-               rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
-               if (rc != 0)
-                       return rc;
-       }
+       pobj = mdd_object_find(env, mdd, fid);
+       if (IS_ERR(pobj))
+               return PTR_ERR(pobj);
 
-       if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
-               buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
-                                       spec->u.sp_ea.eadatalen);
-               rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
-                                          0, handle);
-               if (rc)
-                       return rc;
-       }
 
-       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
-       buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
-       rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
-                                  0, handle);
+       rc = mdo_declare_index_delete(env, pobj, lname->ln_name, handle);
+       if (!rc)
+               rc = mdo_declare_index_insert(env, pobj, mdo2fid(tobj),
+                                             mdd_object_type(sobj),
+                                             lname->ln_name, handle);
+       mdd_object_put(env, pobj);
        if (rc)
                return rc;
 
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
-       rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
+       rc = mdo_declare_ref_add(env, tobj, handle);
+       if (rc)
+               return rc;
 
+       rc = mdo_declare_ref_del(env, sobj, handle);
        return rc;
 }
 
-static int mdd_migrate_create(const struct lu_env *env,
-                             struct mdd_object *mdd_pobj,
-                             struct mdd_object *mdd_sobj,
-                             struct mdd_object *mdd_tobj,
-                             const struct lu_name *lname,
-                             struct lu_attr *la)
+static int mdd_update_link(const struct lu_env *env,
+                          struct mdd_object *sobj,
+                          struct mdd_object *tobj,
+                          const struct lu_name *tname,
+                          const struct lu_fid *tpfid,
+                          const struct lu_name *lname,
+                          const struct lu_fid *fid,
+                          void *unused,
+                          struct thandle *handle)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct md_op_spec       *spec = &info->mti_spec;
-       struct lu_buf           lmm_buf = { NULL };
-       struct lu_buf           link_buf = { NULL };
-       struct lu_buf            mgr_buf;
-       struct thandle          *handle;
-       struct lmv_mds_md_v1    *mgr_ea;
-       struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
-       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
-       int                     mgr_easize;
-       struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
-       int                     rc;
-       ENTRY;
-
-       /* prepare spec for create */
-       memset(spec, 0, sizeof(*spec));
-       spec->sp_cr_lookup = 0;
-       spec->sp_feat = &dt_directory_features;
-       if (S_ISLNK(la->la_mode)) {
-               const struct lu_buf *buf;
+       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+       struct mdd_object *pobj;
+       int rc;
 
-               buf = lu_buf_check_and_alloc(
-                               &mdd_env_info(env)->mti_big_buf,
-                               la->la_size + 1);
-               link_buf = *buf;
-               link_buf.lb_len = la->la_size + 1;
-               memset(link_buf.lb_buf, 0, link_buf.lb_len);
-               rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
-               if (rc <= 0) {
-                       rc = rc != 0 ? rc : -EFAULT;
-                       CERROR("%s: "DFID" readlink failed: rc = %d\n",
-                              mdd2obd_dev(mdd)->obd_name,
-                              PFID(mdd_object_fid(mdd_sobj)), rc);
-                       RETURN(rc);
-               }
-               spec->u.sp_symname = link_buf.lb_buf;
-       } else if (S_ISREG(la->la_mode)) {
-               /* retrieve lov of the old object */
-               rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
-               if (rc != 0 && rc != -ENODATA)
-                       RETURN(rc);
-               if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
-                       spec->u.sp_ea.eadata = lmm_buf.lb_buf;
-                       spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
-                       spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
-               }
-       } else if (S_ISDIR(la->la_mode)) {
-               rc = mdd_links_read_with_rec(env, mdd_sobj, ldata);
-               if (rc == -ENODATA) {
-                       /* ignore the non-linkEA error */
-                       ldata = NULL;
-                       rc = 0;
-               }
-               if (rc < 0)
-                       RETURN(rc);
-       }
+       ENTRY;
 
-       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
-       lu_buf_check_and_alloc(&info->mti_xattr_buf, mgr_easize);
-       mgr_buf.lb_buf = info->mti_xattr_buf.lb_buf;
-       mgr_buf.lb_len = mgr_easize;
-       mgr_ea = mgr_buf.lb_buf;
-       memset(mgr_ea, 0, sizeof(*mgr_ea));
-       mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
-       mgr_ea->lmv_stripe_count = cpu_to_le32(2);
-       mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
-       mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
-       fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
-       fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
-
-       mdd_object_make_hint(env, mdd_pobj, mdd_tobj, la, spec, hint);
+       LASSERT(lu_name_is_valid(lname));
 
-       handle = mdd_trans_create(env, mdd);
-       if (IS_ERR(handle))
-               GOTO(out_free, rc = PTR_ERR(handle));
+       /* ignore tobj */
+       if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+           !strncmp(tname->ln_name, lname->ln_name, lname->ln_namelen))
+               RETURN(0);
 
-       /* Note: this transaction is part of migration, and it is not
-        * the last step of migration, so we set th_local = 1 to avoid
-        * update last rcvd for this transaction */
-       handle->th_local = 1;
-       rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj, spec,
-                                       la, mgr_buf.lb_buf, ldata, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       CDEBUG(D_INFO, "update "DFID"/"DNAME":"DFID"\n",
+              PFID(fid), PNAME(lname), PFID(mdo2fid(tobj)));
 
-       rc = mdd_trans_start(env, mdd, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       pobj = mdd_object_find(env, mdd, fid);
+       if (IS_ERR(pobj)) {
+               CWARN("%s: cannot find obj "DFID": %ld\n",
+                     mdd2obd_dev(mdd)->obd_name, PFID(fid), PTR_ERR(pobj));
+               RETURN(PTR_ERR(pobj));
+       }
 
-       /* don't set nlink from the original object */
-       la->la_valid &= ~LA_NLINK;
+       if (!mdd_object_exists(pobj)) {
+               CDEBUG(D_INFO, DFID" doesn't exist\n", PFID(fid));
+               mdd_object_put(env, pobj);
+               RETURN(-ENOENT);
+       }
 
-       /* create the target object */
-       rc = mdd_create_object(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
-                              hint, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       mdd_write_lock(env, pobj, MOR_TGT_PARENT);
+       rc = __mdd_index_delete_only(env, pobj, lname->ln_name, handle);
+       if (!rc)
+               rc = __mdd_index_insert_only(env, pobj, mdo2fid(tobj),
+                                            mdd_object_type(sobj),
+                                            lname->ln_name, handle);
+       mdd_write_unlock(env, pobj);
+       mdd_object_put(env, pobj);
+       if (rc)
+               RETURN(rc);
 
-       if (S_ISDIR(la->la_mode) && ldata != NULL) {
-               rc = mdd_links_write(env, mdd_tobj, ldata, handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-       }
+       mdd_write_lock(env, tobj, MOR_TGT_CHILD);
+       rc = mdo_ref_add(env, tobj, handle);
+       mdd_write_unlock(env, tobj);
+       if (rc)
+               RETURN(rc);
 
-       /* Set MIGRATE EA on the source inode, so once the migration needs
-        * to be re-done during failover, the re-do process can locate the
-        * target object which is already being created. */
-       rc = mdo_xattr_set(env, mdd_sobj, &mgr_buf, XATTR_NAME_LMV, 0, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       mdd_write_lock(env, sobj, MOR_SRC_CHILD);
+       rc = mdo_ref_del(env, sobj, handle);
+       mdd_write_unlock(env, sobj);
 
-       /* Set immutable flag, so any modification is disabled until
-        * the migration is done. Once the migration is interrupted,
-        * if the resume process find the migrating object has both
-        * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
-        * flag and approve the migration */
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = la->la_flags | LUSTRE_IMMUTABLE_FL;
-       rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
-stop_trans:
-       if (handle != NULL)
-               rc = mdd_trans_stop(env, mdd, rc, handle);
-out_free:
-       if (lmm_buf.lb_buf != NULL)
-               OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
        RETURN(rc);
 }
 
-static int mdd_migrate_entries(const struct lu_env *env,
-                              struct mdd_object *mdd_sobj,
-                              struct mdd_object *mdd_tobj)
+static inline int mdd_fld_lookup(const struct lu_env *env,
+                                struct mdd_device *mdd,
+                                const struct lu_fid *fid,
+                                __u32 *mdt_index)
 {
-       struct dt_object        *next = mdd_object_child(mdd_sobj);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
-       struct thandle          *handle;
-       struct dt_it            *it;
-       const struct dt_it_ops  *iops;
-       int                      result;
-       struct lu_dirent        *ent;
-       int                      rc;
-       ENTRY;
+       struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
+       struct seq_server_site *ss;
+       int rc;
 
-       OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
-       if (ent == NULL)
-               RETURN(-ENOMEM);
+       ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
 
-       if (!dt_try_as_dir(env, next))
-               GOTO(out_ent, rc = -ENOTDIR);
-       /*
-        * iterate directories
-        */
-       iops = &next->do_index_ops->dio_it;
-       it = iops->init(env, next, LUDA_FID | LUDA_TYPE);
-       if (IS_ERR(it))
-               GOTO(out_ent, rc = PTR_ERR(it));
+       range->lsr_flags = LU_SEQ_RANGE_MDT;
+       rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
+       if (rc)
+               return rc;
 
-       rc = iops->load(env, it, 0);
-       if (rc == 0)
-               rc = iops->next(env, it);
-       else if (rc > 0)
-               rc = 0;
-       /*
-        * At this point and across for-loop:
-        *
-        *  rc == 0 -> ok, proceed.
-        *  rc >  0 -> end of directory.
-        *  rc <  0 -> error.
-        */
-       do {
-               struct mdd_object       *child;
-               char                    *name = mdd_env_info(env)->mti_key;
-               int                     len;
-               int                     is_dir;
-               bool                    target_exist = false;
-
-               len = iops->key_size(env, it);
-               if (len == 0)
-                       goto next;
+       *mdt_index = range->lsr_index;
 
-               result = iops->rec(env, it, (struct dt_rec *)ent,
-                                  LUDA_FID | LUDA_TYPE);
-               if (result == -ESTALE)
-                       goto next;
-               if (result != 0) {
-                       rc = result;
-                       goto out;
-               }
+       return 0;
+}
 
-               fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
+static int mdd_is_link_on_source_mdt(const struct lu_env *env,
+                                    struct mdd_object *sobj,
+                                    struct mdd_object *tobj,
+                                    const struct lu_name *tname,
+                                    const struct lu_fid *tpfid,
+                                    const struct lu_name *lname,
+                                    const struct lu_fid *fid,
+                                    void *opaque,
+                                    struct thandle *handle)
+{
+       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+       __u32 source_mdt_index = *(__u32 *)opaque;
+       __u32 link_mdt_index;
+       int rc;
 
-               /* Insert new fid with target name into target dir */
-               if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
-                   (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
-                    ent->lde_name[1] == '.'))
-                       goto next;
+       ENTRY;
 
-               child = mdd_object_find(env, mdd, &ent->lde_fid);
-               if (IS_ERR(child))
-                       GOTO(out, rc = PTR_ERR(child));
+       /* ignore tobj */
+       if (lu_fid_eq(tpfid, fid) && tname->ln_namelen == lname->ln_namelen &&
+           !strcmp(tname->ln_name, lname->ln_name))
+               return 0;
 
-               /* child may not exist, but lu_object_attr will assert this,
-                * get type from loh_attr directly */
-               is_dir = S_ISDIR(child->mod_obj.mo_lu.lo_header->loh_attr);
-
-               mdd_write_lock(env, child, MOR_SRC_CHILD);
-
-               snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
-
-               /* Check whether the name has been inserted to the target */
-               if (dt_try_as_dir(env, dt_tobj)) {
-                       struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+       rc = mdd_fld_lookup(env, mdd, fid, &link_mdt_index);
+       if (rc)
+               RETURN(rc);
 
-                       rc = dt_lookup(env, dt_tobj, (struct dt_rec *)fid,
-                                      (struct dt_key *)name);
-                       if (unlikely(rc == 0))
-                               target_exist = true;
-               }
+       RETURN(link_mdt_index == source_mdt_index);
+}
 
-               handle = mdd_trans_create(env, mdd);
-               if (IS_ERR(handle))
-                       GOTO(out_put, rc = PTR_ERR(handle));
-
-               /* Note: this transaction is part of migration, and it is not
-                * the last step of migration, so we set th_local = 1 to avoid
-                * updating last rcvd for this transaction */
-               handle->th_local = 1;
-               if (likely(!target_exist)) {
-                       rc = mdo_declare_index_insert(env, mdd_tobj,
-                               &ent->lde_fid,
-                               child->mod_obj.mo_lu.lo_header->loh_attr,
-                               name, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+static int mdd_iterate_linkea(const struct lu_env *env,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *tname,
+                             const struct lu_fid *tpfid,
+                             struct linkea_data *ldata,
+                             void *opaque,
+                             struct thandle *handle,
+                             mdd_linkea_cb cb)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       char *filename = info->mti_name;
+       struct lu_name lname;
+       struct lu_fid fid;
+       int rc = 0;
 
-                       if (is_dir) {
-                               rc = mdo_declare_ref_add(env, mdd_tobj, handle);
-                               if (rc != 0)
-                                       GOTO(out_put, rc);
-                       }
-               }
+       if (!ldata->ld_buf)
+               return 0;
 
-               rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
-               if (rc != 0)
-                       GOTO(out_put, rc);
+       for (linkea_first_entry(ldata); ldata->ld_lee && !rc;
+            linkea_next_entry(ldata)) {
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &lname,
+                                   &fid);
 
-               if (is_dir) {
-                       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+               /* Note: lname might miss \0 at the end */
+               snprintf(filename, sizeof(info->mti_name), "%.*s",
+                        lname.ln_namelen, lname.ln_name);
+               lname.ln_name = filename;
 
-                       /* Update .. for child */
-                       rc = mdo_declare_index_delete(env, child, dotdot,
-                                                     handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+               CDEBUG(D_INFO, DFID"/"DNAME"\n", PFID(&fid), PNAME(&lname));
 
-                       rc = mdo_declare_index_insert(env, child,
-                                                     mdd_object_fid(mdd_tobj),
-                                                     S_IFDIR, dotdot, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
+               rc = cb(env, sobj, tobj, tname, tpfid, &lname, &fid, opaque,
+                       handle);
+       }
 
-               rc = mdd_linkea_declare_update_child(env, mdd_sobj,mdd_tobj,
-                                                    child, name,
-                                                    strlen(name),
-                                                    handle);
-               if (rc != 0)
-                       GOTO(out_put, rc);
+       return rc;
+}
 
-               rc = mdd_trans_start(env, mdd, handle);
-               if (rc != 0) {
-                       CERROR("%s: transaction start failed: rc = %d\n",
-                              mdd2obd_dev(mdd)->obd_name, rc);
-                       GOTO(out_put, rc);
-               }
+/**
+ * Prepare linkea, and check whether file needs migrate: if source still has
+ * link on source MDT, no need to migrate, just update namespace on source and
+ * target parents.
+ *
+ * \retval     0 do migrate
+ * \retval     1 don't migrate
+ * \retval     -errno on failure
+ */
+static int migrate_linkea_prepare(const struct lu_env *env,
+                                 struct mdd_device *mdd,
+                                 struct mdd_object *spobj,
+                                 struct mdd_object *tpobj,
+                                 struct mdd_object *sobj,
+                                 const struct lu_name *lname,
+                                 const struct lu_attr *attr,
+                                 struct linkea_data *ldata)
+{
+       __u32 source_mdt_index;
+       int rc;
 
-               if (likely(!target_exist)) {
-                       rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
-                               child->mod_obj.mo_lu.lo_header->loh_attr, name,
-                               handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
+       ENTRY;
 
-               rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
-               if (rc != 0)
-                       GOTO(out_put, rc);
+       memset(ldata, 0, sizeof(*ldata));
+       rc = mdd_linkea_prepare(env, sobj, mdo2fid(spobj), lname,
+                               mdo2fid(tpobj), lname, 1, 0, ldata);
+       if (rc)
+               RETURN(rc);
 
-               if (is_dir) {
-                       rc = __mdd_index_delete_only(env, child, dotdot,
-                                                    handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
+       /*
+        * Then it will check if the file should be migrated. If the file has
+        * mulitple links, we only need migrate the file if all of its entries
+        * has been migrated to the remote MDT.
+        */
+       if (S_ISDIR(attr->la_mode) || attr->la_nlink < 2)
+               RETURN(0);
 
-                       rc = __mdd_index_insert_only(env, child,
-                                        mdd_object_fid(mdd_tobj), S_IFDIR,
-                                        dotdot, handle);
-                       if (rc != 0)
-                               GOTO(out_put, rc);
-               }
+       /* If there are still links locally, don't migrate this file */
+       LASSERT(ldata->ld_leh != NULL);
 
-               rc = mdd_linkea_update_child(env, mdd_sobj, mdd_tobj,
-                                            child, name,
-                                            strlen(name), handle);
+       /*
+        * If linkEA is overflow, it means there are some unknown name entries
+        * under unknown parents, which will prevent the migration.
+        */
+       if (unlikely(ldata->ld_leh->leh_overflow_time))
+               RETURN(-EOVERFLOW);
 
-out_put:
-               mdd_write_unlock(env, child);
-               mdd_object_put(env, child);
-               rc = mdd_trans_stop(env, mdd, rc, handle);
-               if (rc != 0)
-                       GOTO(out, rc);
-next:
-               result = iops->next(env, it);
-               if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
-                       GOTO(out, rc = -EINTR);
+       rc = mdd_fld_lookup(env, mdd, mdo2fid(sobj), &source_mdt_index);
+       if (rc)
+               RETURN(rc);
 
-               if (result == -ESTALE)
-                       goto next;
-       } while (result == 0);
-out:
-       iops->put(env, it);
-       iops->fini(env, it);
-out_ent:
-       OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
+       rc = mdd_iterate_linkea(env, sobj, NULL, lname, mdo2fid(tpobj), ldata,
+                               &source_mdt_index, NULL,
+                               mdd_is_link_on_source_mdt);
        RETURN(rc);
 }
 
-static int mdd_declare_update_linkea(const struct lu_env *env,
-                                    struct mdd_object *mdd_pobj,
-                                    struct mdd_object *mdd_sobj,
-                                    struct mdd_object *mdd_tobj,
-                                    const struct lu_name *child_name,
-                                    struct linkea_data *ldata,
-                                    struct thandle *handle)
+static int mdd_dir_declare_layout_delete(const struct lu_env *env,
+                                        struct mdd_object *obj,
+                                        const struct lu_buf *lmv_buf,
+                                        const struct lu_buf *lmu_buf,
+                                        struct thandle *handle)
 {
-       return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                         child_name, ldata, handle, 1);
+       int rc;
+
+       if (!lmv_buf->lb_buf)
+               rc = mdo_declare_index_delete(env, obj, dotdot, handle);
+       else if (mdd_object_remote(obj))
+               rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
+                                            mdd_dir_declare_delete_stripe);
+       else
+               rc = mdo_declare_xattr_set(env, obj, lmu_buf,
+                                          XATTR_NAME_LMV".del", 0, handle);
+
+       return rc;
 }
 
-static int mdd_update_linkea(const struct lu_env *env,
-                            struct mdd_object *mdd_pobj,
-                            struct mdd_object *mdd_sobj,
-                            struct mdd_object *mdd_tobj,
-                            const struct lu_name *child_name,
-                            struct linkea_data *ldata,
-                            struct thandle *handle)
+static int mdd_dir_layout_delete(const struct lu_env *env,
+                                struct mdd_object *obj,
+                                const struct lu_buf *lmv_buf,
+                                const struct lu_buf *lmu_buf,
+                                struct thandle *handle)
 {
-       return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                         child_name, ldata, handle, 0);
+       int rc;
+
+       ENTRY;
+
+       mdd_write_lock(env, obj, MOR_SRC_PARENT);
+       if (!lmv_buf->lb_buf)
+               /* normal dir */
+               rc = __mdd_index_delete_only(env, obj, dotdot, handle);
+       else if (mdd_object_remote(obj))
+               /* striped, but remote */
+               rc = mdd_dir_iterate_stripes(env, obj, lmv_buf, lmu_buf, handle,
+                                            mdd_dir_delete_stripe);
+       else
+               rc = mdo_xattr_set(env, obj, lmu_buf, XATTR_NAME_LMV".del", 0,
+                                  handle);
+       mdd_write_unlock(env, obj);
+
+       RETURN(rc);
 }
 
-static int mdd_declare_migrate_update_name(const struct lu_env *env,
-                                          struct mdd_object *mdd_pobj,
-                                          struct mdd_object *mdd_sobj,
-                                          struct mdd_object *mdd_tobj,
-                                          const struct lu_name *lname,
-                                          struct lu_attr *la,
-                                          struct lu_attr *parent_la,
-                                          struct linkea_data *ldata,
-                                          struct thandle *handle)
+static int mdd_declare_migrate_create(const struct lu_env *env,
+                                     struct mdd_object *tpobj,
+                                     struct mdd_object *sobj,
+                                     struct mdd_object *tobj,
+                                     const struct lu_name *lname,
+                                     struct lu_attr *attr,
+                                     struct lu_buf *sbuf,
+                                     struct linkea_data *ldata,
+                                     struct md_op_spec *spec,
+                                     struct dt_allocation_hint *hint,
+                                     struct thandle *handle)
 {
-       struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct lu_attr *la_flag = MDD_ENV_VAR(env, tattr);
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct lmv_mds_md_v1 *lmv = sbuf->lb_buf;
        int rc;
 
-       /* Revert IMMUTABLE flag */
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
-       rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
-       if (rc != 0)
-               return rc;
-
-       /* delete entry from source dir */
-       rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
-       if (rc != 0)
-               return rc;
+       if (S_ISDIR(attr->la_mode)) {
+               struct lu_buf lmu_buf = { NULL };
 
-       if (ldata->ld_buf != NULL) {
-               rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
-                                              mdd_tobj, lname, ldata, handle);
-               if (rc != 0)
-                       return rc;
-       }
+               if (lmv) {
+                       struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
 
-       if (S_ISREG(mdd_object_type(mdd_sobj))) {
-               rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
-                                          handle);
-               if (rc != 0)
-                       return rc;
+                       lmu->lum_stripe_count = 0;
+                       lmu_buf.lb_buf = lmu;
+                       lmu_buf.lb_len = sizeof(*lmu);
+               }
 
-               handle->th_complex = 1;
-               rc = mdo_declare_xattr_set(env, mdd_tobj, NULL,
-                                          XATTR_NAME_FID,
-                                          LU_XATTR_REPLACE, handle);
-               if (rc < 0)
+               rc = mdd_dir_declare_layout_delete(env, sobj, sbuf, &lmu_buf,
+                                               handle);
+               if (rc)
                        return rc;
-       }
 
-       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
-               rc = mdo_declare_ref_del(env, mdd_pobj, handle);
-               if (rc != 0)
-                       return rc;
+               if (lmv) {
+                       rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
+                                                  handle);
+                       if (rc)
+                               return rc;
+               }
        }
 
-       /* new name */
-       rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
-                                     mdd_object_type(mdd_tobj),
-                                     lname->ln_name, handle);
-       if (rc != 0)
-               return rc;
-
-       rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL);
-       if (rc != 0)
+       rc = mdd_declare_create(env, mdo2mdd(&tpobj->mod_obj), tpobj, tobj,
+                               lname, attr, handle, spec, ldata, NULL, NULL,
+                               hint);
+       if (rc)
                return rc;
 
-       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
-               rc = mdo_declare_ref_add(env, mdd_pobj, handle);
-               if (rc != 0)
+       if (S_ISDIR(attr->la_mode) && mdd_dir_is_empty(env, sobj) != 0) {
+               if (!lmv) {
+                       /*
+                        * if sobj is not striped, fake a 1-stripe LMV, which
+                        * will be used to generate a compound LMV for tobj.
+                        */
+                       LASSERT(sizeof(info->mti_key) >
+                               lmv_mds_md_size(1, LMV_MAGIC_V1));
+                       lmv = (typeof(lmv))info->mti_key;
+                       memset(lmv, 0, sizeof(*lmv));
+                       lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1);
+                       lmv->lmv_stripe_count = cpu_to_le32(1);
+                       fid_le_to_cpu(&lmv->lmv_stripe_fids[0], mdo2fid(sobj));
+                       sbuf->lb_buf = lmv;
+                       sbuf->lb_len = lmv_mds_md_size(1, LMV_MAGIC_V1);
+
+                       rc = mdo_declare_xattr_set(env, tobj, sbuf,
+                                                  XATTR_NAME_LMV".add", 0,
+                                                  handle);
+                       sbuf->lb_buf = NULL;
+                       sbuf->lb_len = 0;
+               } else {
+                       rc = mdo_declare_xattr_set(env, tobj, sbuf,
+                                                  XATTR_NAME_LMV".add", 0,
+                                                  handle);
+               }
+               if (rc)
                        return rc;
        }
 
-       /* delete old object */
-       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-       if (rc != 0)
+       /*
+        * tobj mode will be used in lod_declare_xattr_set(), but it's not
+        * createb yet, copy from sobj.
+        */
+       tobj->mod_obj.mo_lu.lo_header->loh_attr &= ~S_IFMT;
+       tobj->mod_obj.mo_lu.lo_header->loh_attr |=
+               sobj->mod_obj.mo_lu.lo_header->loh_attr & S_IFMT;
+
+       rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle,
+                               mdo_declare_xattr_set);
+       if (rc)
                return rc;
 
-       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
-               /* delete old object */
-               rc = mdo_declare_ref_del(env, mdd_sobj, handle);
-               if (rc != 0)
+       if (S_ISREG(attr->la_mode)) {
+               handle->th_complex = 1;
+
+               rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+               if (rc)
                        return rc;
-               /* set nlink to 0 */
-               rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
-               if (rc != 0)
+
+               /*
+                * target is not initalized because its LOV is copied from
+                * source in mdd_migrate_create(), declare via sobj.
+                */
+               rc = mdo_declare_xattr_set(env, sobj, NULL, XATTR_NAME_FID, 0,
+                                          handle);
+               if (rc)
                        return rc;
        }
 
-       rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
-       if (rc)
-               return rc;
-
-       rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
-       if (rc != 0)
-               return rc;
+       if (!S_ISDIR(attr->la_mode)) {
+               rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
+                                       ldata, NULL, handle,
+                                       mdd_declare_update_link);
+               if (rc)
+                       return rc;
 
-       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
-                                        handle);
+               if (lmv) {
+                       rc = mdo_declare_xattr_del(env, sobj, XATTR_NAME_LMV,
+                                                  handle);
+                       if (rc)
+                               return rc;
+               }
+       }
 
        return rc;
 }
 
-static int mdd_migrate_update_name(const struct lu_env *env,
-                                  struct mdd_object *mdd_pobj,
-                                  struct mdd_object *mdd_sobj,
-                                  struct mdd_object *mdd_tobj,
-                                  const struct lu_name *lname,
-                                  struct md_attr *ma)
+/**
+ * Create target, migrate xattrs and update links.
+ *
+ * Create target according to \a spec, and then migrate xattrs, if it's
+ * directory, migrate source stripes to target, else update fid to target
+ * for links.
+ *
+ * \param[in] env      execution environment
+ * \param[in] tpobj    target parent object
+ * \param[in] sobj     source object
+ * \param[in] tobj     target object
+ * \param[in] lname    file name
+ * \param[in] attr     source attributes
+ * \param[in] sbuf     source LMV buf
+ * \param[in] ldata    source linkea
+ * \param[in] spec     migrate create spec
+ * \param[in] hint     target creation hint
+ * \param[in] handle   tranasction handle
+ *
+ * \retval     0 on success
+ * \retval     -errno on failure
+ **/
+static int mdd_migrate_create(const struct lu_env *env,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *lname,
+                             struct lu_attr *attr,
+                             const struct lu_buf *sbuf,
+                             struct linkea_data *ldata,
+                             struct md_op_spec *spec,
+                             struct dt_allocation_hint *hint,
+                             struct thandle *handle)
 {
-       struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
-       struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
-       struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
-       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
-       struct linkea_data      *ldata = &mdd_env_info(env)->mti_link_data;
-       struct thandle          *handle;
-       int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
-       const char              *name = lname->ln_name;
-       int                     rc;
+       int rc;
+
        ENTRY;
 
-       /* update time for parent */
-       LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-       p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
-       p_la->la_valid = LA_CTIME;
+       /*
+        * directory will migrate sobj stripes to tobj:
+        * 1. delete stripes from sobj.
+        * 2. add stripes to tobj, see lod_dir_declare_layout_add().
+        * 3. create/attach stripes for tobj, see lod_xattr_set_lmv().
+        */
+       if (S_ISDIR(attr->la_mode)) {
+               struct lu_buf lmu_buf = { NULL };
 
-       rc = mdd_la_get(env, mdd_sobj, so_attr);
-       if (rc != 0)
-               RETURN(rc);
+               if (sbuf->lb_buf) {
+                       struct mdd_thread_info *info = mdd_env_info(env);
+                       struct lmv_user_md *lmu = (typeof(lmu))info->mti_key;
 
-       ldata->ld_buf = NULL;
-       rc = mdd_links_read(env, mdd_sobj, ldata);
-       if (rc != 0 && rc != -ENOENT && rc != -ENODATA)
-               RETURN(rc);
+                       lmu->lum_stripe_count = 0;
+                       lmu_buf.lb_buf = lmu;
+                       lmu_buf.lb_len = sizeof(*lmu);
+               }
 
-       handle = mdd_trans_create(env, mdd);
-       if (IS_ERR(handle))
-               RETURN(PTR_ERR(handle));
+               rc = mdd_dir_layout_delete(env, sobj, sbuf, &lmu_buf, handle);
+               if (rc)
+                       RETURN(rc);
 
-       rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                            lname, so_attr, p_la, ldata,
-                                            handle);
-       if (rc != 0) {
-               /* If the migration can not be fit in one transaction, just
-                * leave it in the original MDT */
-               if (rc == -E2BIG)
-                       GOTO(stop_trans, rc = 0);
-               else
-                       GOTO(stop_trans, rc);
+               /*
+                * delete LMV so that later when destroying sobj it won't delete
+                * stripes again.
+                */
+               if (sbuf->lb_buf) {
+                       mdd_write_lock(env, sobj, MOR_SRC_CHILD);
+                       rc = mdo_xattr_del(env, sobj, XATTR_NAME_LMV, handle);
+                       mdd_write_unlock(env, sobj);
+                       if (rc)
+                               RETURN(rc);
+               }
        }
 
-       CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
-              mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
-              PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
-              PFID(mdd_object_fid(mdd_tobj)));
+       /* don't set nlink from sobj */
+       attr->la_valid &= ~LA_NLINK;
 
-       rc = mdd_trans_start(env, mdd, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
-
-       /* Revert IMMUTABLE flag */
-       la_flag->la_valid = LA_FLAGS;
-       la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
-       rc = mdo_attr_set(env, mdd_sobj, la_flag, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
-
-       /* Remove source name from source directory */
-       rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
+       rc = mdd_create_object(env, tpobj, tobj, attr, spec, NULL, NULL, hint,
+                               handle);
+       if (rc)
+               RETURN(rc);
 
-       if (ldata->ld_buf != NULL) {
-               rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                      lname, ldata, handle);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
+       mdd_write_lock(env, tobj, MOR_TGT_CHILD);
+       rc = mdd_iterate_xattrs(env, sobj, tobj, true, handle, mdo_xattr_set);
+       mdd_write_unlock(env, tobj);
+       if (rc)
+               RETURN(rc);
 
-               /*  linkea update might decrease the source object
-                *  nlink, let's get the attr again after ref_del */
-               rc = mdd_la_get(env, mdd_sobj, so_attr);
-               if (rc != 0)
-                       GOTO(stop_trans, rc);
-       }
+       if (S_ISREG(attr->la_mode)) {
+               /* delete LOV to avoid deleting OST objs when destroying sobj */
+               mdd_write_lock(env, sobj, MOR_SRC_CHILD);
+               rc = mdo_xattr_del(env, sobj, XATTR_NAME_LOV, handle);
+               mdd_write_unlock(env, sobj);
+               if (rc)
+                       RETURN(rc);
 
-       if (S_ISREG(so_attr->la_mode)) {
-               if (so_attr->la_nlink == 1) {
-                       rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
-                                          handle);
-                       if (rc != 0 && rc != -ENODATA)
-                               GOTO(stop_trans, rc);
-
-                       rc = mdo_xattr_set(env, mdd_tobj, NULL,
-                                          XATTR_NAME_FID,
-                                          LU_XATTR_REPLACE, handle);
-                       if (rc < 0)
-                               GOTO(stop_trans, rc);
-               }
+               /* for regular file, update OST objects XATTR_NAME_FID */
+               rc = mdo_xattr_set(env, tobj, NULL, XATTR_NAME_FID, 0, handle);
+               if (rc)
+                       RETURN(rc);
        }
 
-       /* Insert new fid with target name into target dir */
-       rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj),
-                               mdd_object_type(mdd_tobj), name, handle);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
-
-       mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+       if (!S_ISDIR(attr->la_mode))
+               rc = mdd_iterate_linkea(env, sobj, tobj, lname, mdo2fid(tpobj),
+                                       ldata, NULL, handle, mdd_update_link);
 
-       mdd_sobj->mod_flags |= DEAD_OBJ;
-       rc = mdd_mark_orphan_object(env, mdd_sobj, handle, false);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       RETURN(rc);
+}
 
-       rc = mdd_orphan_insert(env, mdd_sobj, handle);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+static int mdd_declare_migrate_update(const struct lu_env *env,
+                                     struct mdd_object *spobj,
+                                     struct mdd_object *tpobj,
+                                     struct mdd_object *sobj,
+                                     struct mdd_object *tobj,
+                                     const struct lu_name *lname,
+                                     struct lu_attr *attr,
+                                     struct lu_attr *spattr,
+                                     struct lu_attr *tpattr,
+                                     struct linkea_data *ldata,
+                                     bool do_create,
+                                     bool do_destroy,
+                                     struct md_attr *ma,
+                                     struct thandle *handle)
+{
+       struct mdd_thread_info *info = mdd_env_info(env);
+       const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
+       struct lu_attr *la = &info->mti_la_for_fix;
+       int rc;
 
-       mdo_ref_del(env, mdd_sobj, handle);
-       if (is_dir)
-               mdo_ref_del(env, mdd_sobj, handle);
+       rc = mdo_declare_index_delete(env, spobj, lname->ln_name, handle);
+       if (rc)
+               return rc;
 
-       /* Get the attr again after ref_del */
-       rc = mdd_la_get(env, mdd_sobj, so_attr);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       if (S_ISDIR(attr->la_mode)) {
+               rc = mdo_declare_ref_del(env, spobj, handle);
+               if (rc)
+                       return rc;
+       }
 
-       ma->ma_attr = *so_attr;
-       ma->ma_valid |= MA_INODE;
+       rc = mdo_declare_index_insert(env, tpobj, fid, mdd_object_type(sobj),
+                                     lname->ln_name, handle);
+       if (rc)
+               return rc;
 
-       rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
-       if (rc != 0)
-               GOTO(out_unlock, rc);
+       rc = mdd_declare_links_add(env, do_create ? tobj : sobj, handle, ldata);
+       if (rc)
+               return rc;
 
-       rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, mdd_tobj,
-                              mdo2fid(mdd_pobj), mdo2fid(mdd_sobj),
-                              mdo2fid(mdd_pobj), lname, lname, handle);
-       if (rc != 0) {
-               CWARN("%s: changelog for migrate %s "DFID
-                     "under "DFID" failed: rc = %d\n",
-                     mdd2obd_dev(mdd)->obd_name, lname->ln_name,
-                     PFID(mdd_object_fid(mdd_sobj)),
-                     PFID(mdd_object_fid(mdd_pobj)), rc);
-               /* Sigh, there are no easy way to migrate back the object, so
-                * let's reset the result to 0 for now XXX */
-               rc = 0;
+       if (S_ISDIR(attr->la_mode)) {
+               rc = mdo_declare_ref_add(env, tpobj, handle);
+               if (rc)
+                       return rc;
        }
-out_unlock:
-       mdd_write_unlock(env, mdd_sobj);
 
-stop_trans:
-       rc = mdd_trans_stop(env, mdd, rc, handle);
-
-       RETURN(rc);
-}
-
-static int mdd_fld_lookup(const struct lu_env *env, struct mdd_device *mdd,
-                         const struct lu_fid *fid, __u32 *mdt_index)
-{
-       struct lu_seq_range *range = &mdd_env_info(env)->mti_range;
-       struct seq_server_site *ss;
-       int rc;
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdo_declare_attr_set(env, spobj, la, handle);
+       if (rc)
+               return rc;
 
-       ss = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site;
+       if (tpobj != spobj) {
+               rc = mdo_declare_attr_set(env, tpobj, la, handle);
+               if (rc)
+                       return rc;
+       }
 
-       range->lsr_flags = LU_SEQ_RANGE_MDT;
-       rc = fld_server_lookup(env, ss->ss_server_fld, fid->f_seq, range);
-       if (rc != 0)
-               return rc;
+       if (do_create && do_destroy) {
+               rc = mdo_declare_ref_del(env, sobj, handle);
+               if (rc)
+                       return rc;
 
-       *mdt_index = range->lsr_index;
+               rc = mdo_declare_destroy(env, sobj, handle);
+               if (rc)
+                       return rc;
+       }
 
-       return 0;
+       return rc;
 }
+
 /**
- * Check whether we should migrate the file/dir
- * return val
- *     < 0  permission check failed or other error.
- *     = 0  the file can be migrated.
- *     > 0  the file does not need to be migrated, mostly
- *          for multiple link file
+ * migrate dirent from \a spobj to \a tpobj, and destroy \a sobj
  **/
-static int mdd_migrate_sanity_check(const struct lu_env *env,
-                                   struct mdd_object *pobj,
-                                   const struct lu_attr *pattr,
-                                   struct mdd_object *sobj,
-                                   struct lu_attr *sattr)
+static int mdd_migrate_update(const struct lu_env *env,
+                             struct mdd_object *spobj,
+                             struct mdd_object *tpobj,
+                             struct mdd_object *sobj,
+                             struct mdd_object *tobj,
+                             const struct lu_name *lname,
+                             struct lu_attr *attr,
+                             struct lu_attr *spattr,
+                             struct lu_attr *tpattr,
+                             struct linkea_data *ldata,
+                             bool do_create,
+                             bool do_destroy,
+                             struct md_attr *ma,
+                             struct thandle *handle)
 {
-       struct mdd_thread_info  *info = mdd_env_info(env);
-       struct linkea_data      *ldata = &info->mti_link_data;
-       struct mdd_device       *mdd = mdo2mdd(&pobj->mod_obj);
-       int                     mgr_easize;
-       struct lu_buf           *mgr_buf;
-       int                     count;
-       int                     rc;
-       __u64 mdt_index;
+       struct mdd_thread_info *info = mdd_env_info(env);
+       const struct lu_fid *fid = mdo2fid(do_create ? tobj : sobj);
+       struct lu_attr *la = &info->mti_la_for_fix;
+       int rc;
+
        ENTRY;
 
-       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_V1);
-       mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
-       if (mgr_buf->lb_buf == NULL)
-               RETURN(-ENOMEM);
+       CDEBUG(D_INFO, "update %s "DFID"/"DFID" to "DFID"/"DFID"\n",
+              lname->ln_name, PFID(mdo2fid(spobj)),
+              PFID(mdo2fid(sobj)), PFID(mdo2fid(tpobj)),
+              PFID(fid));
 
-       rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV);
-       if (rc > 0) {
-               union lmv_mds_md *lmm = mgr_buf->lb_buf;
-
-               /* If the object has migrateEA, it means IMMUTE flag
-                * is being set by previous migration process, so it
-                * needs to override the IMMUTE flag, otherwise the
-                * following sanity check will fail */
-               if (le32_to_cpu(lmm->lmv_md_v1.lmv_hash_type) &
-                                               LMV_HASH_FLAG_MIGRATION) {
-                       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
-
-                       sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
-                       CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
-                              mdd2obd_dev(mdd)->obd_name,
-                              PFID(mdd_object_fid(sobj)));
-               }
-       }
+       rc = __mdd_index_delete(env, spobj, lname->ln_name,
+                               S_ISDIR(attr->la_mode), handle);
+       if (rc)
+               RETURN(rc);
 
-       rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
-                                    sobj, sattr, NULL, NULL);
-       if (rc != 0)
+       rc = __mdd_index_insert(env, tpobj, fid, mdd_object_type(sobj),
+                               lname->ln_name, handle);
+       if (rc)
                RETURN(rc);
 
-       /* Then it will check if the file should be migrated. If the file
-        * has mulitple links, we only need migrate the file if all of its
-        * entries has been migrated to the remote MDT */
-       if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
-               RETURN(0);
+       rc = mdd_links_write(env, do_create ? tobj : sobj, ldata, handle);
+       if (rc)
+               RETURN(rc);
 
-       rc = mdd_links_read(env, sobj, ldata);
-       if (rc != 0) {
-               /* For multiple links files, if there are no linkEA data at all,
-                * means the file might be created before linkEA is enabled, and
-                * all of its links should not be migrated yet, otherwise it
-                * should have some linkEA there */
-               if (rc == -ENOENT || rc == -ENODATA)
-                       RETURN(1);
+       la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+       la->la_valid = LA_CTIME | LA_MTIME;
+       mdd_write_lock(env, spobj, MOR_SRC_PARENT);
+       rc = mdd_update_time(env, spobj, spattr, la, handle);
+       mdd_write_unlock(env, spobj);
+       if (rc)
                RETURN(rc);
+
+       if (tpobj != spobj) {
+               la->la_valid = LA_CTIME | LA_MTIME;
+               mdd_write_lock(env, tpobj, MOR_TGT_PARENT);
+               rc = mdd_update_time(env, tpobj, tpattr, la, handle);
+               mdd_write_unlock(env, tpobj);
+               if (rc)
+                       RETURN(rc);
        }
 
-       mdt_index = mdd->mdd_md_dev.md_lu_dev.ld_site->ld_seq_site->ss_node_id;
-       /* If there are still links locally, then the file will not be
-        * migrated. */
-       LASSERT(ldata->ld_leh != NULL);
+       /*
+        * there are three situations we shouldn't destroy source:
+        * 1. if source is not dir, and it happens to be located on the same MDT
+        *    as target parent.
+        * 2. if source is not dir, and has link on the same MDT where source is
+        *    located.
+        * 3. if source is dir, and it's a normal, non-empty dir.
+        *
+        * the first two situations equals to !do_create, and the 3rd equals to
+        * !do_destroy, so the below condition is actually
+        * !(!do_create || !do_destroy).
+        *
+        * NB, if user has opened source dir before migration, he will get
+        * -ENOENT error when close it later, because source is likely to be
+        *  remote, which can't be moved to orphan list, but except this error
+        *  message, this won't cause any inconsistency or trouble.
+        */
+       if (do_create && do_destroy) {
+               mdd_write_lock(env, sobj, MOR_SRC_CHILD);
+               mdo_ref_del(env, sobj, handle);
+               rc = mdo_destroy(env, sobj, handle);
+               mdd_write_unlock(env, sobj);
+       }
 
-       /* If the linkEA is overflow, then means there are some unknown name
-        * entries under unknown parents, that will prevent the migration. */
-       if (unlikely(ldata->ld_leh->leh_overflow_time))
-               RETURN(1);
+       RETURN(rc);
+}
 
-       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
-       for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
-               struct lu_name          lname;
-               struct lu_fid           fid;
-               __u32                   parent_mdt_index;
+/**
+ * Migrate directory or file.
+ *
+ * migrate source to target in following steps:
+ *   1. create target, append source stripes after target's if it's directory,
+ *      migrate xattrs and update fid of source links.
+ *   2. update namespace: migrate dirent from source parent to target parent,
+ *      update file linkea, and destroy source if it's not needed any more.
+ *
+ * \param[in] env      execution environment
+ * \param[in] md_pobj  parent master object
+ * \param[in] md_sobj  source object
+ * \param[in] lname    file name
+ * \param[in] md_tobj  target object
+ * \param[in] spec     target creation spec
+ * \param[in] ma       used to update \a pobj mtime and ctime
+ *
+ * \retval             0 on success
+ * \retval             -errno on failure
+ */
+static int mdd_migrate(const struct lu_env *env, struct md_object *md_pobj,
+                      struct md_object *md_sobj, const struct lu_name *lname,
+                      struct md_object *md_tobj, struct md_op_spec *spec,
+                      struct md_attr *ma)
+{
+       struct mdd_device *mdd = mdo2mdd(md_pobj);
+       struct mdd_thread_info *info = mdd_env_info(env);
+       struct mdd_object *pobj = md2mdd_obj(md_pobj);
+       struct mdd_object *sobj = md2mdd_obj(md_sobj);
+       struct mdd_object *tobj = md2mdd_obj(md_tobj);
+       struct mdd_object *spobj = NULL;
+       struct mdd_object *tpobj = NULL;
+       struct lu_attr *spattr = &info->mti_pattr;
+       struct lu_attr *tpattr = &info->mti_tpattr;
+       struct lu_attr *attr = &info->mti_cattr;
+       struct linkea_data *ldata = &info->mti_link_data;
+       struct dt_allocation_hint *hint = &info->mti_hint;
+       struct lu_fid *fid = &info->mti_fid2;
+       struct lu_buf pbuf = { NULL };
+       struct lu_buf sbuf = { NULL };
+       struct lmv_mds_md_v1 *plmv;
+       struct thandle *handle;
+       bool do_create = true;
+       bool do_destroy = true;
+       int rc;
+       ENTRY;
 
-               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
-                                   &lname, &fid);
-               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
-                                                        ldata->ld_reclen);
+       rc = mdd_la_get(env, sobj, attr);
+       if (rc)
+               RETURN(rc);
 
-               rc = mdd_fld_lookup(env, mdd, &fid, &parent_mdt_index);
-               if (rc != 0)
-                       RETURN(rc);
+       /* locate source and target stripe on pobj, which are the real parent */
+       rc = mdd_stripe_get(env, pobj, &pbuf, XATTR_NAME_LMV);
+       if (rc < 0 && rc != -ENODATA)
+               RETURN(rc);
+
+       plmv = pbuf.lb_buf;
+       if (plmv) {
+               __u32 hash_type = le32_to_cpu(plmv->lmv_hash_type);
+               __u32 count = le32_to_cpu(plmv->lmv_stripe_count);
+               int index;
 
-               /* Migrate the object only if none of its parents are on the
-                * current MDT. */
-               if (parent_mdt_index != mdt_index)
-                       continue;
+               /* locate target parent stripe */
+               if (hash_type & LMV_HASH_FLAG_MIGRATION) {
+                       /*
+                        * fail check here to make sure top dir migration
+                        * succeed.
+                        */
+                       if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
+                               GOTO(out, rc = -EIO);
+                       hash_type &= ~LMV_HASH_FLAG_MIGRATION;
+                       count = le32_to_cpu(plmv->lmv_migrate_offset);
+               }
+               index = lmv_name_to_stripe_index(hash_type, count,
+                                                lname->ln_name,
+                                                lname->ln_namelen);
+               if (index < 0)
+                       GOTO(out, rc = index);
+
+               fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+               tpobj = mdd_object_find(env, mdd, fid);
+               if (IS_ERR(tpobj))
+                       GOTO(out, rc = PTR_ERR(tpobj));
+
+               /* locate source parent stripe */
+               if (le32_to_cpu(plmv->lmv_hash_type) &
+                   LMV_HASH_FLAG_MIGRATION) {
+                       hash_type = le32_to_cpu(plmv->lmv_migrate_hash);
+                       count = le32_to_cpu(plmv->lmv_stripe_count) -
+                               le32_to_cpu(plmv->lmv_migrate_offset);
+
+                       index = lmv_name_to_stripe_index(hash_type, count,
+                                                        lname->ln_name,
+                                                        lname->ln_namelen);
+                       if (index < 0) {
+                               mdd_object_put(env, tpobj);
+                               GOTO(out, rc = index);
+                       }
 
-               CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
-                      PFID(mdd_object_fid(sobj)), lname.ln_namelen,
-                      lname.ln_name, PFID(&fid));
-               rc = 1;
-               break;
+                       index += le32_to_cpu(plmv->lmv_migrate_offset);
+                       fid_le_to_cpu(fid, &plmv->lmv_stripe_fids[index]);
+                       spobj = mdd_object_find(env, mdd, fid);
+                       if (IS_ERR(spobj)) {
+                               mdd_object_put(env, tpobj);
+                               GOTO(out, rc = PTR_ERR(spobj));
+                       }
+               } else {
+                       spobj = tpobj;
+                       mdd_object_get(spobj);
+               }
+       } else {
+               tpobj = pobj;
+               spobj = pobj;
+               mdd_object_get(tpobj);
+               mdd_object_get(spobj);
        }
 
-       RETURN(rc);
-}
+       rc = mdd_la_get(env, spobj, spattr);
+       if (rc)
+               GOTO(out, rc);
 
-static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
-                      struct md_object *sobj, const struct lu_name *lname,
-                      struct md_object *tobj, struct md_attr *ma)
-{
-       struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
-       struct mdd_device       *mdd = mdo2mdd(pobj);
-       struct mdd_object       *mdd_sobj = md2mdd_obj(sobj);
-       struct mdd_object       *mdd_tobj = md2mdd_obj(tobj);
-       struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
-       struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
-       bool                    created = false;
-       int                     rc;
+       rc = mdd_la_get(env, tpobj, tpattr);
+       if (rc)
+               GOTO(out, rc);
 
-       ENTRY;
-       /* If the file will being migrated, it will check whether
-        * the file is being opened by someone else right now */
-       mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
-       if (mdd_sobj->mod_count > 0) {
-               CDEBUG(D_OTHER,
-                      "%s: "DFID"%s is already opened count %d: rc = %d\n",
-                      mdd2obd_dev(mdd)->obd_name,
-                      PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
-                      mdd_sobj->mod_count, -EBUSY);
-               mdd_read_unlock(env, mdd_sobj);
-               GOTO(put, rc = -EBUSY);
-       }
-       mdd_read_unlock(env, mdd_sobj);
+       if (S_ISDIR(attr->la_mode)) {
+               struct lmv_user_md_v1 *lmu = spec->u.sp_ea.eadata;
 
-       rc = mdd_la_get(env, mdd_sobj, so_attr);
-       if (rc != 0)
-               GOTO(put, rc);
+               LASSERT(lmu);
 
-       rc = mdd_la_get(env, mdd_pobj, pattr);
-       if (rc != 0)
-               GOTO(put, rc);
+               /*
+                * if user use default value '0' for stripe_count, we need to
+                * adjust it to '1' to create a 1-stripe directory.
+                */
+               if (lmu->lum_stripe_count == 0) {
+                       /* eadata is from request, don't alter it */
+                       info->mti_lmu = *lmu;
+                       info->mti_lmu.lum_stripe_count = cpu_to_le32(1);
+                       spec->u.sp_ea.eadata = &info->mti_lmu;
+                       lmu = spec->u.sp_ea.eadata;
+               }
 
-       rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
-       if (rc != 0) {
-               if (rc > 0)
-                       rc = 0;
-               GOTO(put, rc);
+               rc = mdd_stripe_get(env, sobj, &sbuf, XATTR_NAME_LMV);
+               if (rc == -ENODATA) {
+                       if (mdd_dir_is_empty(env, sobj) == 0) {
+                               /*
+                                * if sobj is empty, and target is not striped,
+                                * create target as a normal directory.
+                                */
+                               if (le32_to_cpu(lmu->lum_stripe_count) == 1) {
+                                       info->mti_lmu = *lmu;
+                                       info->mti_lmu.lum_stripe_count = 0;
+                                       spec->u.sp_ea.eadata = &info->mti_lmu;
+                                       lmu = spec->u.sp_ea.eadata;
+                               }
+                       } else {
+                               /*
+                                * sobj is not striped dir, if it's not empty,
+                                * it will be migrated to be a stripe of target,
+                                * don't destroy it after migration.
+                                */
+                               do_destroy = false;
+                       }
+               } else if (rc) {
+                       GOTO(out, rc);
+               } else {
+                       struct lmv_mds_md_v1 *lmv = sbuf.lb_buf;
+
+                       if (le32_to_cpu(lmv->lmv_hash_type) &
+                           LMV_HASH_FLAG_MIGRATION) {
+                               __u32 lum_stripe_count = lmu->lum_stripe_count;
+                               __u32 lmv_hash_type = lmv->lmv_hash_type &
+                                       cpu_to_le32(LMV_HASH_TYPE_MASK);
+
+                               if (!lum_stripe_count)
+                                       lum_stripe_count = cpu_to_le32(1);
+
+                               /* TODO: check specific MDTs */
+                               if (lmv->lmv_migrate_offset !=
+                                   lum_stripe_count ||
+                                   lmv->lmv_master_mdt_index !=
+                                   lmu->lum_stripe_offset ||
+                                   (lmv_hash_type != 0 &&
+                                    lmv_hash_type != lmu->lum_hash_type)) {
+                                       CERROR("%s: \'"DNAME"\' migration was "
+                                               "interrupted, run \'lfs migrate "
+                                               "-m %d -c %d -H %d "DNAME"\' to "
+                                               "finish migration.\n",
+                                               mdd2obd_dev(mdd)->obd_name,
+                                               PNAME(lname),
+                                               le32_to_cpu(
+                                                   lmv->lmv_master_mdt_index),
+                                               le32_to_cpu(
+                                                   lmv->lmv_migrate_offset),
+                                               le32_to_cpu(lmv_hash_type),
+                                               PNAME(lname));
+                                       GOTO(out, rc = -EPERM);
+                               }
+                               GOTO(out, rc = -EALREADY);
+                       }
+               }
+       } else if (!mdd_object_remote(tpobj)) {
+               /*
+                * if source is already on MDT where target parent is located,
+                * no need to create, just update namespace.
+                */
+               do_create = false;
+       } else if (S_ISLNK(attr->la_mode)) {
+               lu_buf_check_and_alloc(&sbuf, attr->la_size + 1);
+               if (!sbuf.lb_buf)
+                       GOTO(out, rc = -ENOMEM);
+               rc = mdd_readlink(env, &sobj->mod_obj, &sbuf);
+               if (rc <= 0) {
+                       rc = rc ?: -EFAULT;
+                       CERROR("%s: "DFID" readlink failed: rc = %d\n",
+                              mdd2obd_dev(mdd)->obd_name,
+                              PFID(mdo2fid(sobj)), rc);
+                       GOTO(out, rc);
+               }
+               spec->u.sp_symname = sbuf.lb_buf;
+       } else if (S_ISREG(attr->la_mode)) {
+               spec->sp_cr_flags |= MDS_OPEN_DELAY_CREATE;
+               spec->sp_cr_flags &= ~MDS_OPEN_HAS_EA;
        }
 
-       /* Sigh, it is impossible to finish all of migration in a single
-        * transaction, for example migrating big directory entries to the
-        * new MDT, it needs insert all of name entries of children in the
-        * new directory.
-        *
-        * So migration will be done in multiple steps and transactions.
-        *
-        * 1. create an orphan object on the remote MDT in one transaction.
-        * 2. migrate extend attributes to the new target file/directory.
-        * 3. For directory, migrate the entries to the new MDT and update
-        * linkEA of each children. Because we can not migrate all entries
-        * in a single transaction, so the migrating directory will become
-        * a striped directory during migration, so once the process is
-        * interrupted, the directory is still accessible. (During lookup,
-        * client will locate the name by searching both original and target
-        * object).
-        * 4. Finally, update the name/FID to point to the new file/directory
-        * in a separate transaction.
+       /*
+        * if sobj has link on the same MDT, no need to create, just update
+        * namespace, and it will be a remote file on target parent, which is
+        * similar to rename.
         */
+       rc = migrate_linkea_prepare(env, mdd, spobj, tpobj, sobj, lname, attr,
+                                   ldata);
+       if (rc > 0)
+               do_create = false;
+       else if (rc)
+               GOTO(out, rc);
 
-       /* step 1: Check whether the orphan object has been created, and create
-        * orphan object on the remote MDT if needed */
-       if (!mdd_object_exists(mdd_tobj)) {
-               rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
-                                       lname, so_attr);
-               if (rc != 0)
-                       GOTO(put, rc);
-               created = true;
+       rc = mdd_migrate_sanity_check(env, mdd, spobj, tpobj, sobj, tobj,
+                                     spattr, tpattr, attr);
+       if (rc)
+               GOTO(out, rc);
+
+       mdd_object_make_hint(env, NULL, tobj, attr, spec, hint);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               GOTO(out, rc = PTR_ERR(handle));
+
+       if (do_create) {
+               rc = mdd_declare_migrate_create(env, tpobj, sobj, tobj, lname,
+                                               attr, &sbuf, ldata, spec, hint,
+                                               handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
        }
 
-       LASSERT(mdd_object_exists(mdd_tobj));
-       /* step 2: migrate xattr */
-       rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
-       if (rc != 0)
-               GOTO(put, rc);
+       rc = mdd_declare_migrate_update(env, spobj, tpobj, sobj, tobj, lname,
+                                       attr, spattr, tpattr, ldata, do_create,
+                                       do_destroy, ma, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       /* step 3: migrate name entries to the orphan object */
-       if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
-               rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
-               if (rc != 0)
-                       GOTO(put, rc);
-               if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
-                                                 OBD_FAIL_MDS_REINT_NET_REP)))
-                       GOTO(put, rc = 0);
-       } else {
-               OBD_FAIL_TIMEOUT(OBD_FAIL_MIGRATE_DELAY, cfs_fail_val);
+       rc = mdd_declare_changelog_store(env, mdd, CL_MIGRATE, lname, NULL,
+                                        handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       if (do_create) {
+               rc = mdd_migrate_create(env, tpobj, sobj, tobj, lname, attr,
+                                       &sbuf, ldata, spec, hint, handle);
+               if (rc)
+                       GOTO(stop_trans, rc);
        }
 
-       LASSERT(mdd_object_exists(mdd_tobj));
-       /* step 4: update name entry to the new object */
-       rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
-                                    ma);
-       if (rc != 0)
-               GOTO(put, rc);
+       rc = mdd_migrate_update(env, spobj, tpobj, sobj, tobj, lname, attr,
+                               spattr, tpattr, ldata, do_create, do_destroy,
+                               ma, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
 
-       /* newly created target was not locked, don't cache its attributes */
-       if (created)
-               mdd_invalidate(env, tobj);
-put:
-       RETURN(rc);
+       rc = mdd_changelog_ns_store(env, mdd, CL_MIGRATE, 0, tobj,
+                                   mdo2fid(spobj), mdo2fid(sobj),
+                                   mdo2fid(tpobj), lname, lname, handle);
+       if (rc)
+               GOTO(stop_trans, rc);
+
+       EXIT;
+stop_trans:
+       rc = mdd_trans_stop(env, mdd, rc, handle);
+out:
+       if (spobj && !IS_ERR(spobj))
+               mdd_object_put(env, spobj);
+       if (tpobj && !IS_ERR(tpobj))
+               mdd_object_put(env, tpobj);
+       lu_buf_free(&sbuf);
+       lu_buf_free(&pbuf);
+       return rc;
 }
 
 const struct md_dir_operations mdd_dir_ops = {
index 7277832..5ecce16 100644 (file)
@@ -190,11 +190,13 @@ struct mdd_thread_info {
        * then mti_ent::lde_name will be mti_key. */
        struct lu_dirent          mti_ent;
        char                      mti_key[NAME_MAX + 16];
+       char                      mti_name[NAME_MAX + 1];
        struct lu_buf             mti_buf[4];
        struct lu_buf             mti_big_buf; /* biggish persistent buf */
        struct lu_buf             mti_link_buf; /* buf for link ea */
        struct lu_buf             mti_xattr_buf;
        struct obdo               mti_oa;
+       struct lmv_user_md        mti_lmu;
        struct dt_allocation_hint mti_hint;
        struct dt_object_format   mti_dof;
        struct linkea_data        mti_link_data;
@@ -347,8 +349,8 @@ int mdd_declare_create_object_internal(const struct lu_env *env,
                                       struct thandle *handle,
                                       const struct md_op_spec *spec,
                                       struct dt_allocation_hint *hint);
-int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
-                  struct lu_buf *lmm_buf);
+int mdd_stripe_get(const struct lu_env *env, struct mdd_object *obj,
+                  struct lu_buf *lmm_buf, const char *name);
 
 /* mdd_trans.c */
 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
index 3e48fcc..a623314 100644 (file)
@@ -1231,7 +1231,7 @@ int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
        if (la_copy->la_valid & LA_SIZE) {
                struct lu_buf *lov_buf = mdd_buf_get(env, NULL, 0);
 
-               rc = mdd_get_lov_ea(env, mdd_obj, lov_buf);
+               rc = mdd_stripe_get(env, mdd_obj, lov_buf, XATTR_NAME_LOV);
                if (rc) {
                        rc = 0;
                } else {
@@ -1488,7 +1488,7 @@ static int mdd_xattr_merge(const struct lu_env *env, struct md_object *md_obj,
 
        /* get EA of victim file */
        memset(buf_vic, 0, sizeof(*buf_vic));
-       rc = mdd_get_lov_ea(env, vic, buf_vic);
+       rc = mdd_stripe_get(env, vic, buf_vic, XATTR_NAME_LOV);
        if (rc < 0) {
                if (rc == -ENODATA)
                        rc = 0;
@@ -1502,7 +1502,7 @@ static int mdd_xattr_merge(const struct lu_env *env, struct md_object *md_obj,
 
        /* save EA of target file for restore */
        memset(buf, 0, sizeof(*buf));
-       rc = mdd_get_lov_ea(env, obj, buf);
+       rc = mdd_stripe_get(env, obj, buf, XATTR_NAME_LOV);
        if (rc < 0)
                GOTO(out, rc);
 
@@ -1714,7 +1714,7 @@ static int mdd_xattr_split(const struct lu_env *env, struct md_object *md_obj,
 
        /* get EA of mirrored file */
        memset(buf_save, 0, sizeof(*buf));
-       rc = mdd_get_lov_ea(env, obj, buf_save);
+       rc = mdd_stripe_get(env, obj, buf_save, XATTR_NAME_LOV);
        if (rc < 0)
                GOTO(out, rc);
 
@@ -1982,58 +1982,50 @@ stop:
 }
 
 /*
- * read lov EA of an object
- * return the lov EA in an allocated lu_buf
+ * read lov/lmv EA of an object
+ * return the lov/lmv EA in an allocated lu_buf
  */
-int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
-                  struct lu_buf *lmm_buf)
+int mdd_stripe_get(const struct lu_env *env, struct mdd_object *obj,
+                  struct lu_buf *lmm_buf, const char *name)
 {
-       struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
-       int              rc, bufsize;
+       struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
+       int rc;
+
        ENTRY;
 
-repeat:
-       rc = mdo_xattr_get(env, obj, buf, XATTR_NAME_LOV);
+       if (buf->lb_buf == NULL) {
+               buf = lu_buf_check_and_alloc(buf, 4096);
+               if (buf->lb_buf == NULL)
+                       RETURN(-ENOMEM);
+       }
 
+repeat:
+       rc = mdo_xattr_get(env, obj, buf, name);
        if (rc == -ERANGE) {
                /* mti_big_buf is allocated but is too small
                 * we need to increase it */
                buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
                                             buf->lb_len * 2);
                if (buf->lb_buf == NULL)
-                       GOTO(out, rc = -ENOMEM);
+                       RETURN(-ENOMEM);
                goto repeat;
-       }
-
-       if (rc < 0)
+       } else if (rc < 0) {
                RETURN(rc);
-
-       if (rc == 0)
+       } else if (rc == 0) {
                RETURN(-ENODATA);
-
-       bufsize = rc;
-       if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
-               /* mti_big_buf was not allocated, so we have to
-                * allocate it based on the ea size */
-               buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
-                                            bufsize);
-               if (buf->lb_buf == NULL)
-                       GOTO(out, rc = -ENOMEM);
-               goto repeat;
        }
 
-       lu_buf_alloc(lmm_buf, bufsize);
+       lu_buf_alloc(lmm_buf, rc);
        if (lmm_buf->lb_buf == NULL)
-               GOTO(out, rc = -ENOMEM);
+               RETURN(-ENOMEM);
 
-       memcpy(lmm_buf->lb_buf, buf->lb_buf, bufsize);
-       rc = 0;
-       EXIT;
+       /*
+        * we don't use lmm_buf directly, because we don't know xattr size, so
+        * by using mti_big_buf we can avoid calling mdo_xattr_get() twice.
+        */
+       memcpy(lmm_buf->lb_buf, buf->lb_buf, rc);
 
-out:
-       if (rc < 0)
-               lu_buf_free(lmm_buf);
-       return rc;
+       RETURN(0);
 }
 
 static int mdd_xattr_hsm_replace(const struct lu_env *env,
@@ -2252,11 +2244,11 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1,
        mdd_write_lock(env, fst_o, MOR_TGT_CHILD);
        mdd_write_lock(env, snd_o, MOR_TGT_CHILD);
 
-       rc = mdd_get_lov_ea(env, fst_o, fst_buf);
+       rc = mdd_stripe_get(env, fst_o, fst_buf, XATTR_NAME_LOV);
        if (rc < 0 && rc != -ENODATA)
                GOTO(stop, rc);
 
-       rc = mdd_get_lov_ea(env, snd_o, snd_buf);
+       rc = mdd_stripe_get(env, snd_o, snd_buf, XATTR_NAME_LOV);
        if (rc < 0 && rc != -ENODATA)
                GOTO(stop, rc);
 
@@ -2812,7 +2804,7 @@ mdd_layout_change(const struct lu_env *env, struct md_object *o,
        if (IS_ERR(handle))
                RETURN(PTR_ERR(handle));
 
-       rc = mdd_get_lov_ea(env, obj, buf);
+       rc = mdd_stripe_get(env, obj, buf, XATTR_NAME_LOV);
        if (rc < 0) {
                if (rc == -ENODATA)
                        rc = -EINVAL;
index 540f093..8607b23 100644 (file)
@@ -269,18 +269,13 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
 {
        struct mdt_device *mdt = info->mti_mdt;
        struct lu_name *lname = &info->mti_name;
-       char *name = NULL;
+       char *filename = info->mti_filename;
        struct mdt_object *parent;
        u32 mode;
        int rc = 0;
 
        LASSERT(!info->mti_cross_ref);
 
-       OBD_ALLOC(name, NAME_MAX + 1);
-       if (name == NULL)
-               return -ENOMEM;
-       lname->ln_name = name;
-
        /*
         * We may want to allow this to mount a completely separate
         * fileset from the MDT in the future, but keeping it to
@@ -316,8 +311,9 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                        break;
                }
 
-               strncpy(name, s1, lname->ln_namelen);
-               name[lname->ln_namelen] = '\0';
+               strncpy(filename, s1, lname->ln_namelen);
+               filename[lname->ln_namelen] = '\0';
+               lname->ln_name = filename;
 
                parent = mdt_object_find(info->mti_env, mdt, fid);
                if (IS_ERR(parent)) {
@@ -342,8 +338,6 @@ static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
                }
        }
 
-       OBD_FREE(name, NAME_MAX + 1);
-
        return rc;
 }
 
@@ -941,6 +935,8 @@ int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
                return -EINVAL;
        }
 
+       LASSERT(buf->lb_buf);
+
        rc = mo_xattr_get(info->mti_env, next, buf, name);
        if (rc > 0) {
 
@@ -2242,7 +2238,7 @@ static int mdt_reint(struct tgt_session_info *tsi)
 }
 
 /* this should sync the whole device */
-static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
+int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
 {
         struct dt_device *dt = mdt->mdt_bottom;
         int rc;
index ffff9d6..b1068af 100644 (file)
@@ -225,7 +225,8 @@ struct mdt_device {
                                   mo_acl:1,
                                   mo_cos:1,
                                   mo_evict_tgt_nids:1,
-                                  mo_dom_read_open:1;
+                                  mo_dom_read_open:1,
+                                  mo_migrate_hsm_allowed:1;
                unsigned int       mo_dom_lock;
        } mdt_opts;
         /* mdt state flags */
@@ -487,6 +488,7 @@ struct mdt_thread_info {
 
         /* Ops object filename */
         struct lu_name             mti_name;
+       char                       mti_filename[NAME_MAX + 1];
        /* per-thread values, can be re-used, may be vmalloc'd */
        void                      *mti_big_lmm;
        void                      *mti_big_acl;
@@ -494,7 +496,7 @@ struct mdt_thread_info {
        int                        mti_big_aclsize;
        /* should be enough to fit lustre_mdt_attrs */
        char                       mti_xattr_buf[128];
-       struct ldlm_enqueue_info   mti_einfo;
+       struct ldlm_enqueue_info   mti_einfo[2];
        /* einfo used by mdt_remote_object_lock_try() */
        struct ldlm_enqueue_info   mti_remote_einfo;
        struct tg_reply_data      *mti_reply_data;
@@ -839,6 +841,7 @@ int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *,
 void mdt_reconstruct_open(struct mdt_thread_info *, struct mdt_lock_handle *);
 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
                      struct md_layout_change *spec);
+int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt);
 
 struct lu_buf *mdt_buf(const struct lu_env *env, void *area, ssize_t len);
 const struct lu_buf *mdt_buf_const(const struct lu_env *env,
index d929b67..8b53d43 100644 (file)
@@ -1428,8 +1428,52 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
        if (rc < 0)
                RETURN(rc);
 
+       spec->no_create = !!req_is_replay(mdt_info_req(info));
+
+       rc = mdt_dlmreq_unpack(info);
+
+       RETURN(rc);
+}
+
+static int mdt_migrate_unpack(struct mdt_thread_info *info)
+{
+       struct lu_ucred *uc = mdt_ucred(info);
+       struct mdt_rec_rename *rec;
+       struct lu_attr *attr = &info->mti_attr.ma_attr;
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct req_capsule *pill = info->mti_pill;
+       struct md_op_spec *spec = &info->mti_spec;
+       int rc;
+
+       ENTRY;
+
+       CLASSERT(sizeof(*rec) == sizeof(struct mdt_rec_reint));
+       rec = req_capsule_client_get(pill, &RMF_REC_REINT);
+       if (rec == NULL)
+               RETURN(-EFAULT);
+
+       /* This prior initialization is needed for old_init_ucred_reint() */
+       uc->uc_fsuid = rec->rn_fsuid;
+       uc->uc_fsgid = rec->rn_fsgid;
+       uc->uc_cap   = rec->rn_cap;
+       uc->uc_suppgids[0] = rec->rn_suppgid1;
+       uc->uc_suppgids[1] = rec->rn_suppgid2;
+
+       attr->la_uid = rec->rn_fsuid;
+       attr->la_gid = rec->rn_fsgid;
+       rr->rr_fid1 = &rec->rn_fid1;
+       rr->rr_fid2 = &rec->rn_fid2;
+       attr->la_ctime = rec->rn_time;
+       attr->la_mtime = rec->rn_time;
+       /* rename_tgt contains the mode already */
+       attr->la_mode = rec->rn_mode;
+       attr->la_valid = LA_UID | LA_GID | LA_CTIME | LA_MTIME | LA_MODE;
+
+       rc = mdt_name_unpack(pill, &RMF_NAME, &rr->rr_name, 0);
+       if (rc < 0)
+               RETURN(rc);
+
        if (rec->rn_bias & MDS_CLOSE_MIGRATE) {
-               req_capsule_extend(info->mti_pill, &RQF_MDS_REINT_MIGRATE);
                rc = mdt_close_handle_unpack(info);
                if (rc)
                        RETURN(rc);
@@ -1656,7 +1700,7 @@ static reint_unpacker mdt_reint_unpackers[REINT_MAX] = {
        [REINT_OPEN]     = mdt_open_unpack,
        [REINT_SETXATTR] = mdt_setxattr_unpack,
        [REINT_RMENTRY]  = mdt_rmentry_unpack,
-       [REINT_MIGRATE]  = mdt_rename_unpack,
+       [REINT_MIGRATE]  = mdt_migrate_unpack,
        [REINT_RESYNC]   = mdt_resync_unpack,
 };
 
index ed1d9a8..a149533 100644 (file)
@@ -894,6 +894,34 @@ mdt_dom_read_open_seq_write(struct file *file, const char __user *buffer,
 }
 LPROC_SEQ_FOPS(mdt_dom_read_open);
 
+static int mdt_migrate_hsm_allowed_seq_show(struct seq_file *m, void *data)
+{
+       struct obd_device *obd = m->private;
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+       seq_printf(m, "%u\n",  (mdt->mdt_opts.mo_migrate_hsm_allowed != 0));
+       return 0;
+}
+
+static ssize_t
+mdt_migrate_hsm_allowed_seq_write(struct file *file, const char __user *buffer,
+                                 size_t count, loff_t *off)
+{
+       struct seq_file *m = file->private_data;
+       struct obd_device *obd = m->private;
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       bool val;
+       int rc;
+
+       rc = kstrtobool_from_user(buffer, count, &val);
+       if (rc)
+               return rc;
+
+       mdt->mdt_opts.mo_migrate_hsm_allowed = val;
+       return count;
+}
+LPROC_SEQ_FOPS(mdt_migrate_hsm_allowed);
+
 LPROC_SEQ_FOPS_RO_TYPE(mdt, recovery_status);
 LPROC_SEQ_FOPS_RO_TYPE(mdt, num_exports);
 LPROC_SEQ_FOPS_RO_TYPE(mdt, target_instance);
@@ -973,8 +1001,8 @@ static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
          .fops =       &mdt_sync_count_fops                    },
        { .name =       "dom_lock",
          .fops =       &mdt_dom_lock_fops                      },
-       { .name =       "dom_read_open",
-         .fops =       &mdt_dom_read_open_fops                 },
+       { .name =       "migrate_hsm_allowed",
+         .fops =       &mdt_migrate_hsm_allowed_fops           },
        { NULL }
 };
 
index ff7718c..821c525 100644 (file)
@@ -519,7 +519,7 @@ static int mdt_create(struct mdt_thread_info *info)
         */
        if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
                struct mdt_lock_handle *lhc;
-               struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+               struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
                bool cos_incompat;
 
                rc = mdt_object_striped(info, child);
@@ -571,7 +571,7 @@ static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
        int do_vbr = ma->ma_attr.la_valid &
                        (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
        __u64 lockpart = MDS_INODELOCK_UPDATE;
-       struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+       struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
        bool cos_incompat;
        int rc;
        ENTRY;
@@ -867,7 +867,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
        struct mdt_object *mc;
        struct mdt_lock_handle *parent_lh;
        struct mdt_lock_handle *child_lh;
-       struct ldlm_enqueue_info *einfo = &info->mti_einfo;
+       struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
        __u64 lock_ibits;
        bool cos_incompat = false, discard = false;
        int no_name = 0;
@@ -1304,473 +1304,815 @@ static void mdt_rename_unlock(struct lustre_handle *lh)
        EXIT;
 }
 
-/* Update object linkEA */
-struct mdt_lock_list {
-       struct mdt_object       *mll_obj;
-       struct mdt_lock_handle  mll_lh;
-       struct list_head        mll_list;
+static struct mdt_object *mdt_object_find_check(struct mdt_thread_info *info,
+                                               const struct lu_fid *fid,
+                                               int idx)
+{
+       struct mdt_object *dir;
+       int rc;
+
+       ENTRY;
+
+       dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
+       if (IS_ERR(dir))
+               RETURN(dir);
+
+       /* check early, the real version will be saved after locking */
+       rc = mdt_version_get_check(info, dir, idx);
+       if (rc)
+               GOTO(out_put, rc);
+
+       RETURN(dir);
+out_put:
+       mdt_object_put(info->mti_env, dir);
+       return ERR_PTR(rc);
+}
+
+/*
+ * in case obj is remote obj on its parent, revoke LOOKUP lock,
+ * herein we don't really check it, just do revoke.
+ */
+static int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
+                                        struct mdt_object *pobj,
+                                        struct mdt_object *obj)
+{
+       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+       int rc;
+
+       mdt_lock_handle_init(lh);
+       mdt_lock_reg_init(lh, LCK_EX);
+
+       if (mdt_object_remote(pobj)) {
+               rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
+                                           &lh->mlh_rreg_lh, LCK_EX,
+                                           MDS_INODELOCK_LOOKUP, false);
+       } else {
+               struct ldlm_res_id *res = &info->mti_res_id;
+               union ldlm_policy_data *policy = &info->mti_policy;
+               __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
+                                LDLM_FL_COS_INCOMPAT;
+
+               fid_build_reg_res_name(mdt_object_fid(obj), res);
+               memset(policy, 0, sizeof(*policy));
+               policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
+               rc = mdt_fid_lock(info->mti_mdt->mdt_namespace, &lh->mlh_reg_lh,
+                                 LCK_EX, policy, res, dlmflags, NULL);
+       }
+
+       if (rc != ELDLM_OK)
+               return rc;
+
+       /*
+        * TODO, currently we don't save this lock because there is no place to
+        * hold this lock handle, but to avoid race we need to save this lock.
+        */
+       mdt_object_unlock(info, NULL, lh, 1);
+
+       return 0;
+}
+
+/*
+ * operation may takes locks of linkea, or directory stripes, group them in
+ * different list.
+ */
+struct mdt_sub_lock {
+       struct mdt_object      *msl_obj;
+       struct mdt_lock_handle  msl_lh;
+       struct list_head        msl_linkage;
 };
 
 static void mdt_unlock_list(struct mdt_thread_info *info,
-                           struct list_head *list, int rc)
+                           struct list_head *list, int decref)
 {
-       struct mdt_lock_list *mll;
-       struct mdt_lock_list *mll2;
+       struct mdt_sub_lock *msl;
+       struct mdt_sub_lock *tmp;
 
-       list_for_each_entry_safe(mll, mll2, list, mll_list) {
-               mdt_object_unlock_put(info, mll->mll_obj, &mll->mll_lh, rc);
-               list_del(&mll->mll_list);
-               OBD_FREE_PTR(mll);
+       list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
+               mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
+               list_del(&msl->msl_linkage);
+               OBD_FREE_PTR(msl);
        }
 }
 
-static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info,
-                                     struct mdt_object *obj,
-                                     struct mdt_object *pobj,
-                                     struct list_head *lock_list)
+/*
+ * lock parents of links, and also check whether total locks don't exceed
+ * RS_MAX_LOCKS.
+ *
+ * \retval     0 on success, and locks can be saved in ptlrpc_reply_stat
+ * \retval     1 on success, but total lock count may exceed RS_MAX_LOCKS
+ * \retval     -ev negative errno upon error
+ */
+static int mdt_lock_links(struct mdt_thread_info *info,
+                         struct mdt_object *pobj,
+                         const struct md_attr *ma,
+                         struct mdt_object *obj,
+                         struct list_head *link_locks)
 {
-       struct lu_buf           *buf = &info->mti_big_buf;
-       struct linkea_data      ldata = { NULL };
-       int                     count;
-       int                     retry_count;
-       int                     rc;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct lu_buf *buf = &info->mti_big_buf;
+       struct lu_name *lname = &info->mti_name;
+       struct linkea_data ldata = { NULL };
+       bool blocked = false;
+       int retries = 5;
+       int local_lnkp_cnt = 0;
+       int rc;
+
        ENTRY;
 
        if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
                RETURN(0);
 
-       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
        if (buf->lb_buf == NULL)
                RETURN(-ENOMEM);
 
        ldata.ld_buf = buf;
        rc = mdt_links_read(info, obj, &ldata);
-       if (rc != 0) {
+       if (rc) {
                if (rc == -ENOENT || rc == -ENODATA)
                        rc = 0;
                RETURN(rc);
        }
 
-       /* ignore the migrating parent(@pobj) */
-       retry_count = ldata.ld_leh->leh_reccount - 1;
-
-again:
-       LASSERT(ldata.ld_leh != NULL);
-       ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
-       for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
-               struct mdt_device *mdt = info->mti_mdt;
-               struct mdt_object *mdt_pobj;
-               struct mdt_lock_list *mll;
-               struct lu_name name;
-               struct lu_fid  fid;
+repeat:
+       for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
+            linkea_next_entry(&ldata)) {
+               struct mdt_object *lnkp;
+               struct mdt_sub_lock *msl;
+               struct lu_fid fid;
                __u64 ibits;
 
-               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
-                                   &name, &fid);
-               mdt_pobj = mdt_object_find(info->mti_env, mdt, &fid);
-               if (IS_ERR(mdt_pobj)) {
-                       CWARN("%s: cannot find obj "DFID": rc = %ld\n",
-                             mdt_obd_name(mdt), PFID(&fid), PTR_ERR(mdt_pobj));
-                       goto next;
+               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
+                                   &fid);
+
+               /* check if it's also linked to parent */
+               if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
+                       CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
+                              PFID(&fid), PNAME(lname));
+                       /* in case link is remote object, revoke LOOKUP lock */
+                       rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
+                       continue;
                }
 
-               if (!mdt_object_exists(mdt_pobj)) {
-                       CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
-                             mdt_obd_name(mdt), PFID(&fid));
-                       mdt_object_put(info->mti_env, mdt_pobj);
-                       goto next;
+               lnkp = NULL;
+
+               /* check if it's linked to a stripe of parent */
+               if (ma->ma_valid & MA_LMV) {
+                       struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+                       struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
+                       int j = 0;
+
+                       for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
+                               fid_le_to_cpu(stripe_fid,
+                                             &lmv->lmv_stripe_fids[j]);
+                               if (lu_fid_eq(stripe_fid, &fid)) {
+                                       CDEBUG(D_INFO, "skip stripe "DFID
+                                              ", reovke "DNAME"\n",
+                                              PFID(&fid), PNAME(lname));
+                                       lnkp = mdt_object_find(info->mti_env,
+                                                              mdt, &fid);
+                                       if (IS_ERR(lnkp))
+                                               GOTO(out, rc = PTR_ERR(lnkp));
+                                       break;
+                               }
+                       }
+
+                       if (lnkp) {
+                               rc = mdt_revoke_remote_lookup_lock(info, lnkp,
+                                                                  obj);
+                               mdt_object_put(info->mti_env, lnkp);
+                               continue;
+                       }
                }
 
-               /* Check if the object already exists in the list */
-               list_for_each_entry(mll, lock_list, mll_list) {
-                       if (mll->mll_obj == mdt_pobj) {
-                               mdt_object_put(info->mti_env, mdt_pobj);
-                               goto next;
+               /* Check if it's already locked */
+               list_for_each_entry(msl, link_locks, msl_linkage) {
+                       if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
+                               CDEBUG(D_INFO,
+                                      DFID" was locked, revoke "DNAME"\n",
+                                      PFID(&fid), PNAME(lname));
+                               lnkp = msl->msl_obj;
+                               break;
                        }
                }
 
-               if (mdt_pobj == pobj) {
-                       CDEBUG(D_INFO, "%s: skipping parent obj "DFID"\n",
-                              mdt_obd_name(mdt), PFID(&fid));
-                       mdt_object_put(info->mti_env, mdt_pobj);
-                       goto next;
+               if (lnkp) {
+                       rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
+                       continue;
                }
 
-               OBD_ALLOC_PTR(mll);
-               if (mll == NULL) {
-                       mdt_object_put(info->mti_env, mdt_pobj);
-                       GOTO(out, rc = -ENOMEM);
+               CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
+                      PFID(&fid), PNAME(lname));
+
+               lnkp = mdt_object_find(info->mti_env, mdt, &fid);
+               if (IS_ERR(lnkp)) {
+                       CWARN("%s: cannot find obj "DFID": %ld\n",
+                             mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
+                       continue;
+               }
+
+               if (!mdt_object_exists(lnkp)) {
+                       CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
+                             PFID(&fid), PNAME(lname));
+                       mdt_object_put(info->mti_env, lnkp);
+                       continue;
                }
 
-               /* Since this needs to lock all of objects in linkea, to avoid
-                * deadlocks, because it does not follow parent-child order as
-                * other MDT operation, let's use try_lock here and if the lock
-                * cannot be gotten because of conflicting locks, then drop all
-                * current locks, send an AST to the client, and start again. */
-               mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
+               if (!mdt_object_remote(lnkp))
+                       local_lnkp_cnt++;
+
+               OBD_ALLOC_PTR(msl);
+               if (msl == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               /*
+                * we can't follow parent-child lock order like other MD
+                * operations, use lock_try here to avoid deadlock, if the lock
+                * cannot be taken, drop all locks taken, revoke the blocked
+                * one, and continue processing the remaining entries, and in
+                * the end of the loop restart from beginning.
+                */
+               mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
                ibits = 0;
-               rc = mdt_object_lock_try(info, mdt_pobj, &mll->mll_lh, &ibits,
+               rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
                                         MDS_INODELOCK_UPDATE, true);
                if (!(ibits & MDS_INODELOCK_UPDATE)) {
-                       mdt_unlock_list(info, lock_list, 0);
+                       blocked = true;
 
-                       CDEBUG(D_INFO, "%s: busy lock on "DFID" %s retry %d\n",
-                              mdt_obd_name(mdt), PFID(&fid), name.ln_name,
-                              retry_count);
+                       CDEBUG(D_INFO, "busy lock on "DFID" "DNAME" retry %d\n",
+                              PFID(&fid), PNAME(lname), retries);
 
-                       if (retry_count == 0) {
-                               mdt_object_put(info->mti_env, mdt_pobj);
-                               OBD_FREE_PTR(mll);
-                               GOTO(out, rc = -EBUSY);
-                       }
+                       mdt_unlock_list(info, link_locks, 1);
 
-                       mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
-                       rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
+                       mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
+                       rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
                                             MDS_INODELOCK_UPDATE);
-                       if (rc != 0) {
-                               mdt_object_put(info->mti_env, mdt_pobj);
-                               OBD_FREE_PTR(mll);
+                       if (rc) {
+                               mdt_object_put(info->mti_env, lnkp);
+                               OBD_FREE_PTR(msl);
                                GOTO(out, rc);
                        }
 
-                       if (mdt_object_remote(mdt_pobj)) {
+                       if (mdt_object_remote(lnkp)) {
                                struct ldlm_lock *lock;
 
-                               /* For remote object, Set lock to cb_atomic,
+                               /*
+                                * for remote object, set lock cb_atomic,
                                 * so lock can be released in blocking_ast()
-                                * immediately, then the next try_lock will
-                                * have better chance to succeds */
-                               lock =
-                               ldlm_handle2lock(&mll->mll_lh.mlh_rreg_lh);
+                                * immediately, then the next lock_try will
+                                * have better chance of success.
+                                */
+                               lock = ldlm_handle2lock(
+                                               &msl->msl_lh.mlh_rreg_lh);
                                LASSERT(lock != NULL);
                                lock_res_and_lock(lock);
                                ldlm_set_atomic_cb(lock);
                                unlock_res_and_lock(lock);
                                LDLM_LOCK_PUT(lock);
                        }
-                       mdt_object_unlock_put(info, mdt_pobj, &mll->mll_lh, rc);
-                       OBD_FREE_PTR(mll);
-                       retry_count--;
-                       goto again;
+
+                       mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
+                       OBD_FREE_PTR(msl);
+                       continue;
+               }
+
+               INIT_LIST_HEAD(&msl->msl_linkage);
+               msl->msl_obj = lnkp;
+               list_add_tail(&msl->msl_linkage, link_locks);
+
+               rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
+       }
+
+       if (blocked) {
+               rc = -EBUSY;
+               if (--retries > 0) {
+                       mdt_unlock_list(info, link_locks, rc);
+                       blocked = false;
+                       local_lnkp_cnt = 0;
+                       goto repeat;
                }
-               rc = 0;
-               INIT_LIST_HEAD(&mll->mll_list);
-               mll->mll_obj = mdt_pobj;
-               list_add_tail(&mll->mll_list, lock_list);
-next:
-               ldata.ld_lee = (struct link_ea_entry *)((char *)ldata.ld_lee +
-                                                        ldata.ld_reclen);
        }
+
+       EXIT;
 out:
-       if (rc != 0)
-               mdt_unlock_list(info, lock_list, rc);
-       RETURN(rc);
+       if (rc)
+               mdt_unlock_list(info, link_locks, rc);
+       else if (local_lnkp_cnt > RS_MAX_LOCKS - 6)
+               /*
+                * parent may have 3 local objects: master object and 2 stripes
+                * (if it's being migrated too); source may have 2 local
+                * objects: master and 1 stripe; target has 1 local object.
+                */
+               rc = 1;
+       return rc;
 }
 
-/* migrate files from one MDT to another MDT */
-static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
-                                     struct mdt_lock_handle *lhc)
+static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
+                                 struct mdt_object *obj,
+                                 const struct md_attr *ma,
+                                 struct list_head *slave_locks)
 {
-       struct mdt_reint_record *rr = &info->mti_rr;
-       struct md_attr          *ma = &info->mti_attr;
-       struct mdt_object       *msrcdir;
-       struct mdt_object       *mold;
-       struct mdt_object       *mnew = NULL;
-       struct mdt_lock_handle  *lh_dirp;
-       struct mdt_lock_handle  *lh_childp;
-       struct mdt_lock_handle  *lh_tgtp = NULL;
-       struct lu_fid           *old_fid = &info->mti_tmp_fid1;
-       struct list_head        lock_list;
-       __u64                   lock_ibits;
-       struct ldlm_lock        *lease = NULL;
-       bool                    lock_open_sem = false;
-       int                     rc;
-       ENTRY;
+       struct mdt_device *mdt = info->mti_mdt;
+       const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+       struct lu_fid *fid = &info->mti_tmp_fid1;
+       struct mdt_object *slave;
+       struct mdt_sub_lock *msl;
+       int i;
+       int rc;
 
-       CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
-              PNAME(&rr->rr_name), PFID(rr->rr_fid2));
+       ENTRY;
 
-       /* 1: lock the source dir. */
-       msrcdir = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
-       if (IS_ERR(msrcdir)) {
-               CDEBUG(D_OTHER, "%s: cannot find source dir "DFID" : rc = %d\n",
-                       mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
-                       (int)PTR_ERR(msrcdir));
-               RETURN(PTR_ERR(msrcdir));
-       }
+       LASSERT(mdt_object_remote(obj));
+       LASSERT(ma->ma_valid & MA_LMV);
+       LASSERT(lmv);
 
-       lh_dirp = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
-       rc = mdt_reint_object_lock(info, msrcdir, lh_dirp, MDS_INODELOCK_UPDATE,
-                                  true);
-       if (rc)
-               GOTO(out_put_parent, rc);
+       if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
+               RETURN(-EINVAL);
 
-       if (!mdt_object_remote(msrcdir)) {
-               rc = mdt_version_get_check_save(info, msrcdir, 0);
-               if (rc)
-                       GOTO(out_unlock_parent, rc);
-       }
+       if (le32_to_cpu(lmv->lmv_stripe_count) < 1)
+               RETURN(0);
 
-       /* 2: sanity check and find the object to be migrated. */
-       fid_zero(old_fid);
-       rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
-       if (rc != 0)
-               GOTO(out_unlock_parent, rc);
+       for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
+               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
 
-       if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
-               GOTO(out_unlock_parent, rc = -EINVAL);
+               slave = mdt_object_find(info->mti_env, mdt, fid);
+               if (IS_ERR(slave))
+                       GOTO(out, rc = PTR_ERR(slave));
 
-       if (!fid_is_md_operative(old_fid))
-               GOTO(out_unlock_parent, rc = -EPERM);
+               OBD_ALLOC_PTR(msl);
+               if (!msl) {
+                       mdt_object_put(info->mti_env, slave);
+                       GOTO(out, rc = -ENOMEM);
+               }
 
-       if (lu_fid_eq(old_fid, &info->mti_mdt->mdt_md_root_fid))
-               GOTO(out_unlock_parent, rc = -EPERM);
+               mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
+               rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
+                                          MDS_INODELOCK_UPDATE, true);
+               if (rc) {
+                       OBD_FREE_PTR(msl);
+                       mdt_object_put(info->mti_env, slave);
+                       GOTO(out, rc);
+               }
 
-       mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
-       if (IS_ERR(mold))
-               GOTO(out_unlock_parent, rc = PTR_ERR(mold));
+               INIT_LIST_HEAD(&msl->msl_linkage);
+               msl->msl_obj = slave;
+               list_add_tail(&msl->msl_linkage, slave_locks);
 
-       if (!mdt_object_exists(mold)) {
-               LU_OBJECT_DEBUG(D_INODE, info->mti_env,
-                               &mold->mot_obj,
-                               "object does not exist");
-               GOTO(out_put_child, rc = -ENOENT);
        }
+       EXIT;
 
-       if (mdt_object_remote(mold)) {
-               CDEBUG(D_OTHER, "%s: source "DFID" is on the remote MDT\n",
-                      mdt_obd_name(info->mti_mdt), PFID(old_fid));
-               GOTO(out_put_child, rc = -EREMOTE);
-       }
+out:
+       if (rc)
+               mdt_unlock_list(info, slave_locks, rc);
+       return rc;
+}
 
-       if (S_ISREG(lu_object_attr(&mold->mot_obj)) &&
-           !mdt_object_remote(msrcdir)) {
-               CDEBUG(D_OTHER, "%s: parent "DFID" is still on the same"
-                      " MDT, which should be migrated first:"
-                      " rc = %d\n", mdt_obd_name(info->mti_mdt),
-                      PFID(mdt_object_fid(msrcdir)), -EPERM);
-               GOTO(out_put_child, rc = -EPERM);
+static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
+                                            struct mdt_object *obj,
+                                            struct mdt_lock_handle *lh,
+                                            struct ldlm_enqueue_info *einfo,
+                                            struct list_head *slave_locks,
+                                            int decref)
+{
+       if (mdt_object_remote(obj)) {
+               mdt_unlock_list(info, slave_locks, decref);
+               mdt_object_unlock(info, obj, lh, decref);
+       } else {
+               mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
        }
+}
 
-       rc = mdt_remote_permission(info);
-       if (rc != 0)
-               GOTO(out_put_child, rc);
+/* lock parent and its stripes */
+static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
+                                  struct mdt_object *obj,
+                                  const struct md_attr *ma,
+                                  struct mdt_lock_handle *lh,
+                                  struct ldlm_enqueue_info *einfo,
+                                  struct list_head *slave_locks)
+{
+       int rc;
 
-       /* 3: iterate the linkea of the object and lock all of the objects */
-       INIT_LIST_HEAD(&lock_list);
-       rc = mdt_lock_objects_in_linkea(info, mold, msrcdir, &lock_list);
-       if (rc != 0)
-               GOTO(out_put_child, rc);
+       if (mdt_object_remote(obj)) {
+               rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
+                                           &lh->mlh_rreg_lh, LCK_PW,
+                                           MDS_INODELOCK_UPDATE, false);
+               if (rc != ELDLM_OK)
+                       return rc;
 
-       if (info->mti_spec.sp_migrate_close) {
-               struct close_data *data;
-               struct mdt_body  *repbody;
-               bool lease_broken = false;
+               /*
+                * if obj is remote and striped, lock its stripes explicitly
+                * because it's not striped in LOD layer on this MDT.
+                */
+               if (ma->ma_valid & MA_LMV) {
+                       rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
+                       if (rc)
+                               mdt_object_unlock(info, obj, lh, rc);
+               }
+       } else {
+               rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
+                                           einfo, true);
+       }
 
-               if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
-                                     RCL_CLIENT) ||
-                   !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
-                                     RCL_CLIENT))
-                       GOTO(out_lease, rc = -EPROTO);
+       return rc;
+}
 
-               data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
-               if (data == NULL)
-                       GOTO(out_lease, rc = -EPROTO);
+/*
+ * in migration, object may be remote, and we need take full lock of it and its
+ * stripes if it's directory, besides, object may be a remote object on its
+ * parent, revoke its LOOKUP lock on where its parent is located.
+ */
+static int mdt_migrate_object_lock(struct mdt_thread_info *info,
+                                  struct mdt_object *pobj,
+                                  struct mdt_object *obj,
+                                  struct mdt_lock_handle *lh,
+                                  struct ldlm_enqueue_info *einfo,
+                                  struct list_head *slave_locks)
+{
+       int rc;
 
-               lease = ldlm_handle2lock(&data->cd_handle);
-               if (lease == NULL)
-                       GOTO(out_lease, rc = -ESTALE);
+       if (mdt_object_remote(obj)) {
+               /* don't bother to check if pobj and obj are on the same MDT. */
+               rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
+               if (rc)
+                       return rc;
 
-               /* try to hold open_sem so that nobody else can open the file */
-               if (!down_write_trylock(&mold->mot_open_sem)) {
-                       ldlm_lock_cancel(lease);
-                       GOTO(out_lease, rc = -EBUSY);
-               }
+               rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
+                                           &lh->mlh_rreg_lh, LCK_EX,
+                                           MDS_INODELOCK_FULL, false);
+               if (rc != ELDLM_OK)
+                       return rc;
 
-               lock_open_sem = true;
-               /* Check if the lease open lease has already canceled */
-               lock_res_and_lock(lease);
-               lease_broken = ldlm_is_cancel(lease);
-               unlock_res_and_lock(lease);
+               /*
+                * if obj is remote and striped, lock its stripes explicitly
+                * because it's not striped in LOD layer on this MDT.
+                */
+               if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
+                       struct md_attr *ma = &info->mti_attr;
 
-               LDLM_DEBUG(lease, DFID " lease broken? %d",
-                          PFID(mdt_object_fid(mold)), lease_broken);
+                       ma->ma_lmv = info->mti_big_lmm;
+                       ma->ma_lmv_size = info->mti_big_lmmsize;
+                       ma->ma_valid = 0;
+                       rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
+                       if (rc) {
+                               mdt_object_unlock(info, obj, lh, rc);
+                               return rc;
+                       }
 
-               /* Cancel server side lease. Client side counterpart should
-                * have been cancelled. It's okay to cancel it now as we've
-                * held mot_open_sem. */
-               ldlm_lock_cancel(lease);
+                       if (ma->ma_valid & MA_LMV) {
+                               rc = mdt_lock_remote_slaves(info, obj, ma,
+                                                           slave_locks);
+                               if (rc)
+                                       mdt_object_unlock(info, obj, lh, rc);
+                       }
+               }
+       } else {
+               if (mdt_object_remote(pobj)) {
+                       rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
+                       if (rc)
+                               return rc;
+               }
 
-               if (lease_broken)
-                       GOTO(out_lease, rc = -EAGAIN);
-out_lease:
-               rc = mdt_close_internal(info, mdt_info_req(info), NULL);
-               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-               repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
-               if (rc != 0)
-                       GOTO(out_unlock_list, rc);
+               rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
+                                           einfo, true);
        }
 
-       /* 4: lock of the object migrated object */
-       lh_childp = &info->mti_lh[MDT_LH_OLD];
-       mdt_lock_reg_init(lh_childp, LCK_EX);
-       lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
-                    MDS_INODELOCK_LAYOUT;
-       if (mdt_object_remote(msrcdir)) {
-               /* Enqueue lookup lock from the parent MDT */
-               rc = mdt_remote_object_lock(info, msrcdir, mdt_object_fid(mold),
-                                           &lh_childp->mlh_rreg_lh,
-                                           lh_childp->mlh_rreg_mode,
-                                           MDS_INODELOCK_LOOKUP, false);
-               if (rc != ELDLM_OK)
-                       GOTO(out_unlock_list, rc);
+       return rc;
+}
 
-               lock_ibits &= ~MDS_INODELOCK_LOOKUP;
-       }
+/*
+ * lookup source by name, if parent is striped directory, we need to find the
+ * corresponding stripe where source is located, and then lookup there.
+ *
+ * besides, if parent is migrating too, and file is already in target stripe,
+ * this should be a redo of 'lfs migrate' on client side.
+ */
+static int mdt_migrate_lookup(struct mdt_thread_info *info,
+                             struct mdt_object *pobj,
+                             const struct md_attr *ma,
+                             const struct lu_name *lname,
+                             struct mdt_object **spobj,
+                             struct mdt_object **sobj)
+{
+       const struct lu_env *env = info->mti_env;
+       struct lu_fid *fid = &info->mti_tmp_fid1;
+       struct mdt_object *stripe;
+       int rc;
 
-       rc = mdt_reint_object_lock(info, mold, lh_childp, lock_ibits, true);
-       if (rc != 0)
-               GOTO(out_unlock_child, rc);
+       if (ma->ma_valid & MA_LMV) {
+               /* if parent is striped, lookup on corresponding stripe */
+               struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
+               __u32 hash_type = le32_to_cpu(lmv->lmv_hash_type);
+               __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
+               bool is_migrating = le32_to_cpu(lmv->lmv_hash_type) &
+                                   LMV_HASH_FLAG_MIGRATION;
+
+               if (is_migrating) {
+                       hash_type = le32_to_cpu(lmv->lmv_migrate_hash);
+                       stripe_count -= le32_to_cpu(lmv->lmv_migrate_offset);
+               }
 
-       /* Migration is incompatible with HSM. */
-       ma->ma_need = MA_HSM;
-       ma->ma_valid = 0;
-       rc = mdt_attr_get_complex(info, mold, ma);
-       if (rc != 0)
-               GOTO(out_unlock_child, rc);
-
-       if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0) {
-               rc = -ENOSYS;
-               CDEBUG(D_OTHER,
-                      "%s: cannot migrate HSM archived file "DFID": rc = %d\n",
-                      mdt_obd_name(info->mti_mdt), PFID(old_fid), rc);
-               GOTO(out_unlock_child, rc);
-       }
+               rc = lmv_name_to_stripe_index(hash_type, stripe_count,
+                                             lname->ln_name,
+                                             lname->ln_namelen);
+               if (rc < 0)
+                       return rc;
 
-       ma->ma_need = MA_LMV;
-       ma->ma_valid = 0;
-       ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
-       ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
-       rc = mdt_stripe_get(info, mold, ma, XATTR_NAME_LMV);
-       if (rc != 0)
-               GOTO(out_unlock_child, rc);
-
-       if ((ma->ma_valid & MA_LMV)) {
-               struct lmv_mds_md_v1 *lmm1;
-
-               lmv_le_to_cpu(ma->ma_lmv, ma->ma_lmv);
-               lmm1 = &ma->ma_lmv->lmv_md_v1;
-               if (!(lmm1->lmv_hash_type & LMV_HASH_FLAG_MIGRATION)) {
-                       CDEBUG(D_OTHER, "%s: can not migrate striped dir "DFID
-                              ": rc = %d\n", mdt_obd_name(info->mti_mdt),
-                              PFID(mdt_object_fid(mold)), -EPERM);
-                       GOTO(out_unlock_child, rc = -EPERM);
-               }
+               if (le32_to_cpu(lmv->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION)
+                       rc += le32_to_cpu(lmv->lmv_migrate_offset);
+
+               fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
+
+               stripe = mdt_object_find(env, info->mti_mdt, fid);
+               if (IS_ERR(stripe))
+                       return PTR_ERR(stripe);
+
+               fid_zero(fid);
+               rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
+                               &info->mti_spec);
+               if (rc == -ENOENT && is_migrating) {
+                       /*
+                        * if parent is migrating, and lookup child failed on
+                        * source stripe, lookup again on target stripe, if it
+                        * exists, it means previous migration was interrupted,
+                        * and current file was migrated already.
+                        */
+                       mdt_object_put(env, stripe);
+
+                       hash_type = le32_to_cpu(lmv->lmv_hash_type);
+                       stripe_count = le32_to_cpu(lmv->lmv_migrate_offset);
+
+                       rc = lmv_name_to_stripe_index(hash_type, stripe_count,
+                                                     lname->ln_name,
+                                                     lname->ln_namelen);
+                       if (rc < 0)
+                               return rc;
 
-               if (!fid_is_sane(&lmm1->lmv_stripe_fids[1]))
-                       GOTO(out_unlock_child, rc = -EINVAL);
+                       fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
 
-               mnew = mdt_object_find(info->mti_env, info->mti_mdt,
-                                      &lmm1->lmv_stripe_fids[1]);
-               if (IS_ERR(mnew))
-                       GOTO(out_unlock_child, rc = PTR_ERR(mnew));
-
-               if (!mdt_object_remote(mnew)) {
-                       CDEBUG(D_OTHER,
-                              "%s: "DFID" being migrated is on this MDT:"
-                              " rc  = %d\n", mdt_obd_name(info->mti_mdt),
-                              PFID(rr->rr_fid2), -EPERM);
-                       GOTO(out_put_new, rc = -EPERM);
-               }
+                       stripe = mdt_object_find(env, info->mti_mdt, fid);
+                       if (IS_ERR(stripe))
+                               return PTR_ERR(stripe);
 
-               lh_tgtp = &info->mti_lh[MDT_LH_CHILD];
-               mdt_lock_reg_init(lh_tgtp, LCK_EX);
-               rc = mdt_remote_object_lock(info, mnew,
-                                           mdt_object_fid(mnew),
-                                           &lh_tgtp->mlh_rreg_lh,
-                                           lh_tgtp->mlh_rreg_mode,
-                                           MDS_INODELOCK_UPDATE, false);
-               if (rc != 0) {
-                       lh_tgtp = NULL;
-                       GOTO(out_put_new, rc);
+                       fid_zero(fid);
+                       rc = mdo_lookup(env, mdt_object_child(stripe), lname,
+                                       fid, &info->mti_spec);
+                       mdt_object_put(env, stripe);
+                       return rc ?: -EALREADY;
+               } else if (rc) {
+                       mdt_object_put(env, stripe);
+                       return rc;
                }
        } else {
-               mnew = mdt_object_find(info->mti_env, info->mti_mdt,
-                                      rr->rr_fid2);
-               if (IS_ERR(mnew))
-                       GOTO(out_unlock_child, rc = PTR_ERR(mnew));
-               if (!mdt_object_remote(mnew)) {
-                       CDEBUG(D_OTHER, "%s: Migration "DFID" is on this MDT:"
-                              " rc = %d\n", mdt_obd_name(info->mti_mdt),
-                              PFID(rr->rr_fid2), -EXDEV);
-                       GOTO(out_put_new, rc = -EXDEV);
-               }
+               fid_zero(fid);
+               rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
+                               &info->mti_spec);
+               if (rc)
+                       return rc;
+
+               stripe = pobj;
+               mdt_object_get(env, stripe);
        }
 
-       /* 5: migrate it */
-       mdt_reint_init_ma(info, ma);
+       *spobj = stripe;
 
-       mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
-                      OBD_FAIL_MDS_REINT_RENAME_WRITE);
+       *sobj = mdt_object_find(env, info->mti_mdt, fid);
+       if (IS_ERR(*sobj)) {
+               mdt_object_put(env, stripe);
+               rc = PTR_ERR(*sobj);
+               *spobj = NULL;
+               *sobj = NULL;
+       }
 
-       rc = mdo_migrate(info->mti_env, mdt_object_child(msrcdir),
-                        mdt_object_child(mold), &rr->rr_name,
-                        mdt_object_child(mnew), ma);
-       if (rc != 0)
-               GOTO(out_unlock_new, rc);
+       return rc;
+}
 
-out_unlock_new:
-       if (lh_tgtp != NULL)
-               mdt_object_unlock(info, mnew, lh_tgtp, rc);
-out_put_new:
-       if (mnew)
-               mdt_object_put(info->mti_env, mnew);
-out_unlock_child:
-       mdt_object_unlock(info, mold, lh_childp, rc);
-out_unlock_list:
-       /* we don't really modify linkea objects, so we can safely decref these
-        * locks, and this can avoid saving them as COS locks, which may prevent
-        * subsequent migrate. */
-       mdt_unlock_list(info, &lock_list, 1);
-       if (lease != NULL) {
-               ldlm_reprocess_all(lease->l_resource);
-               LDLM_LOCK_PUT(lease);
+/* end lease and close file for regular file */
+static int mdd_migrate_close(struct mdt_thread_info *info,
+                            struct mdt_object *obj)
+{
+       struct close_data *data;
+       struct mdt_body *repbody;
+       struct ldlm_lock *lease;
+       int rc;
+       int rc2;
+
+       rc = -EPROTO;
+       if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
+                                     RCL_CLIENT) ||
+           !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
+                                     RCL_CLIENT))
+               goto close;
+
+       data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
+       if (!data)
+               goto close;
+
+       rc = -ESTALE;
+       lease = ldlm_handle2lock(&data->cd_handle);
+       if (!lease)
+               goto close;
+
+       /* check if the lease was already canceled */
+       lock_res_and_lock(lease);
+       rc = ldlm_is_cancel(lease);
+       unlock_res_and_lock(lease);
+
+       if (rc) {
+               rc = -EAGAIN;
+               LDLM_DEBUG(lease, DFID" lease broken",
+                          PFID(mdt_object_fid(obj)));
        }
 
-       if (lock_open_sem)
-               up_write(&mold->mot_open_sem);
-out_put_child:
-       mdt_object_put(info->mti_env, mold);
-out_unlock_parent:
-       mdt_object_unlock(info, msrcdir, lh_dirp, rc);
-out_put_parent:
-       mdt_object_put(info->mti_env, msrcdir);
+       /*
+        * cancel server side lease, client side counterpart should have been
+        * cancelled, it's okay to cancel it now as we've held mot_open_sem.
+        */
+       ldlm_lock_cancel(lease);
+       ldlm_reprocess_all(lease->l_resource);
+       LDLM_LOCK_PUT(lease);
 
-       RETURN(rc);
+close:
+       rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
+
+       return rc ?: rc2;
 }
 
-static struct mdt_object *mdt_object_find_check(struct mdt_thread_info *info,
-                                               const struct lu_fid *fid,
-                                               int idx)
+/*
+ * migrate file in below steps:
+ *  1. lock parent and its stripes
+ *  2. lookup source by name
+ *  3. lock parents of source links if source is not directory
+ *  4. reject if source is in HSM
+ *  5. take source open_sem and close file if source is regular file
+ *  6. lock source and its stripes if it's directory
+ *  7. lock target so subsequent change to it can trigger COS
+ *  8. migrate file
+ *  9. unlock above locks
+ * 10. sync device if source has links
+ */
+static int mdt_reint_migrate_internal(struct mdt_thread_info *info)
 {
-       struct mdt_object *dir;
+       const struct lu_env *env = info->mti_env;
+       struct mdt_device *mdt = info->mti_mdt;
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct md_attr *ma = &info->mti_attr;
+       struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
+       struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
+       struct mdt_object *pobj;
+       struct mdt_object *spobj = NULL;
+       struct mdt_object *sobj = NULL;
+       struct mdt_object *tobj;
+       struct mdt_lock_handle *lhp;
+       struct mdt_lock_handle *lhs;
+       struct mdt_lock_handle *lht;
+       LIST_HEAD(parent_slave_locks);
+       LIST_HEAD(child_slave_locks);
+       LIST_HEAD(link_locks);
+       bool open_sem_locked = false;
+       bool do_sync = false;
        int rc;
        ENTRY;
 
-       dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
-       if (IS_ERR(dir))
-               RETURN(dir);
+       CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
+              PNAME(&rr->rr_name), PFID(rr->rr_fid2));
 
-       /* check early, the real version will be saved after locking */
-       rc = mdt_version_get_check(info, dir, idx);
+       /* don't allow migrate . or .. */
+       if (lu_name_is_dot_or_dotdot(&rr->rr_name))
+               RETURN(-EBUSY);
+
+       rc = mdt_remote_permission(info);
        if (rc)
-               GOTO(out_put, rc);
+               RETURN(rc);
 
-       RETURN(dir);
-out_put:
-       mdt_object_put(info->mti_env, dir);
-       return ERR_PTR(rc);
+       /* pobj is master object of parent */
+       pobj = mdt_object_find_check(info, rr->rr_fid1, 0);
+       if (IS_ERR(pobj))
+               RETURN(PTR_ERR(pobj));
+
+       if (unlikely(!info->mti_big_lmm)) {
+               info->mti_big_lmmsize = lmv_mds_md_size(64, LMV_MAGIC);
+               OBD_ALLOC(info->mti_big_lmm, info->mti_big_lmmsize);
+               if (!info->mti_big_lmm)
+                       GOTO(put_parent, rc = -ENOMEM);
+       }
+
+       ma->ma_lmv = info->mti_big_lmm;
+       ma->ma_lmv_size = info->mti_big_lmmsize;
+       ma->ma_valid = 0;
+       rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
+       if (rc)
+               GOTO(put_parent, rc);
+
+       /* lock parent object */
+       lhp = &info->mti_lh[MDT_LH_PARENT];
+       mdt_lock_reg_init(lhp, LCK_PW);
+       rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
+                                    &parent_slave_locks);
+       if (rc)
+               GOTO(put_parent, rc);
+
+       /*
+        * spobj is the corresponding stripe against name if pobj is striped
+        * directory, which is the real parent, and no need to lock, because
+        * we've taken full lock of pobj.
+        */
+       rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
+       if (rc)
+               GOTO(unlock_parent, rc);
+
+       /* lock parents of source links, and revoke LOOKUP lock of links */
+       rc = mdt_lock_links(info, pobj, ma, sobj, &link_locks);
+       if (rc < 0)
+               GOTO(put_source, rc);
+
+       /*
+        * RS_MAX_LOCKS is the limit of number of locks that can be saved along
+        * with one request, if total lock count exceeds this limit, we will
+        * drop all locks after migration, and synchronous device in the end.
+        */
+       do_sync = rc;
+
+       /* if migration HSM is allowed */
+       if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
+               ma->ma_need = MA_HSM;
+               ma->ma_valid = 0;
+               rc = mdt_attr_get_complex(info, sobj, ma);
+               if (rc)
+                       GOTO(unlock_links, rc);
+
+               if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
+                       GOTO(unlock_links, rc = -EOPNOTSUPP);
+       }
+
+       /* end lease and close file for regular file */
+       if (info->mti_spec.sp_migrate_close) {
+               /* try to hold open_sem so that nobody else can open the file */
+               if (!down_write_trylock(&sobj->mot_open_sem)) {
+                       /* close anyway */
+                       mdd_migrate_close(info, sobj);
+                       GOTO(unlock_links, rc = -EBUSY);
+               } else {
+                       open_sem_locked = true;
+                       rc = mdd_migrate_close(info, sobj);
+                       if (rc)
+                               GOTO(unlock_open_sem, rc);
+               }
+       }
+
+       /* lock source */
+       lhs = &info->mti_lh[MDT_LH_OLD];
+       mdt_lock_reg_init(lhs, LCK_EX);
+       rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
+                                    &child_slave_locks);
+       if (rc)
+               GOTO(unlock_open_sem, rc);
+
+       /* lock target */
+       tobj = mdt_object_find(env, mdt, rr->rr_fid2);
+       if (IS_ERR(tobj))
+               GOTO(unlock_source, rc = PTR_ERR(tobj));
+
+       lht = &info->mti_lh[MDT_LH_NEW];
+       mdt_lock_reg_init(lht, LCK_EX);
+       rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
+       if (rc)
+               GOTO(put_target, rc);
+
+       /* Don't do lookup sanity check. We know name doesn't exist. */
+       info->mti_spec.sp_cr_lookup = 0;
+       info->mti_spec.sp_feat = &dt_directory_features;
+
+       rc = mdo_migrate(env, mdt_object_child(pobj),
+                        mdt_object_child(sobj), &rr->rr_name,
+                        mdt_object_child(tobj), &info->mti_spec, ma);
+       EXIT;
+
+       mdt_object_unlock(info, tobj, lht, rc);
+put_target:
+       mdt_object_put(env, tobj);
+unlock_source:
+       mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
+                                 &child_slave_locks, rc);
+unlock_open_sem:
+       if (open_sem_locked)
+               up_write(&sobj->mot_open_sem);
+unlock_links:
+       mdt_unlock_list(info, &link_locks, rc);
+put_source:
+       mdt_object_put(env, sobj);
+       mdt_object_put(env, spobj);
+unlock_parent:
+       mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
+                                 &parent_slave_locks, rc);
+put_parent:
+       mdt_object_put(env, pobj);
+
+       if (!rc && do_sync)
+               mdt_device_sync(env, mdt);
+
+       return rc;
 }
 
 static int mdt_object_lock_save(struct mdt_thread_info *info,
@@ -2258,7 +2600,7 @@ static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info,
        if (!req_is_replay(req)) {
                rc = mdt_rename_lock(info, &rename_lh);
                if (rc != 0) {
-                       CERROR("%s: can't lock FS for rename: rc  = %d\n",
+                       CERROR("%s: can't lock FS for rename: rc = %d\n",
                               mdt_obd_name(info->mti_mdt), rc);
                        RETURN(rc);
                }
@@ -2267,7 +2609,7 @@ static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info,
        if (rename)
                rc = mdt_reint_rename_internal(info, lhc);
        else
-               rc = mdt_reint_migrate_internal(info, lhc);
+               rc = mdt_reint_migrate_internal(info);
 
        if (lustre_handle_is_used(&rename_lh))
                mdt_rename_unlock(&rename_lh);
index a268b88..006995f 100644 (file)
@@ -3418,6 +3418,12 @@ static int osd_declare_create(const struct lu_env *env, struct dt_object *dt,
        osd_trans_declare_op(env, oh, OSD_OT_INSERT,
                             osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
 
+       /* will help to find FID->ino mapping at dt_insert() */
+       rc = osd_idc_find_and_init(env, osd_obj2dev(osd_dt_obj(dt)),
+                                  osd_dt_obj(dt));
+       if (rc != 0)
+               RETURN(rc);
+
        if (!attr)
                RETURN(0);
 
@@ -3427,10 +3433,6 @@ static int osd_declare_create(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0)
                RETURN(rc);
 
-       /* will help to find FID->ino mapping at dt_insert() */
-       rc = osd_idc_find_and_init(env, osd_obj2dev(osd_dt_obj(dt)),
-                                  osd_dt_obj(dt));
-
        RETURN(rc);
 }
 
@@ -5971,19 +5973,24 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
 
        idc = osd_idc_find(env, osd, fid);
        if (unlikely(idc == NULL)) {
-               /*
-                * this dt_insert() wasn't declared properly, so
-                * FID is missing in OI cache. we better do not
-                * lookup FID in FLDB/OI and don't risk to deadlock,
-                * but in some special cases (lfsck testing, etc)
-                * it's much simpler than fixing a caller
-                */
-               CERROR("%s: "DFID" wasn't declared for insert\n",
-                      osd_name(osd), PFID(fid));
-               dump_stack();
                idc = osd_idc_find_or_init(env, osd, fid);
-               if (IS_ERR(idc))
+               if (IS_ERR(idc)) {
+                       /*
+                        * this dt_insert() wasn't declared properly, so
+                        * FID is missing in OI cache. we better do not
+                        * lookup FID in FLDB/OI and don't risk to deadlock,
+                        * but in some special cases (lfsck testing, etc)
+                        * it's much simpler than fixing a caller.
+                        *
+                        * normally this error should be placed after the first
+                        * find, but migrate may attach source stripes to
+                        * target, which doesn't create stripes.
+                        */
+                       CERROR("%s: "DFID" wasn't declared for insert\n",
+                              osd_name(osd), PFID(fid));
+                       dump_stack();
                        RETURN(PTR_ERR(idc));
+               }
        }
 
        if (idc->oic_remote) {
index 7d69754..2231182 100644 (file)
@@ -1022,11 +1022,12 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
                 * lookup FID in FLDB/OI and don't risk to deadlock,
                 * but in some special cases (lfsck testing, etc)
                 * it's much simpler than fixing a caller */
-               CERROR("%s: "DFID" wasn't declared for insert\n",
-                      osd_name(osd), PFID(fid));
                idc = osd_idc_find_or_init(env, osd, fid);
-               if (IS_ERR(idc))
+               if (IS_ERR(idc)) {
+                       CERROR("%s: "DFID" wasn't declared for insert\n",
+                              osd_name(osd), PFID(fid));
                        RETURN(PTR_ERR(idc));
+               }
        }
 
        CLASSERT(sizeof(zde->lzd_reg) == 8);
index 35c36c9..f6ac3a7 100644 (file)
@@ -969,7 +969,17 @@ static int __osd_sa_xattr_del(const struct lu_env *env, struct osd_object *obj,
                return rc;
 
        rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
-       if (rc == 0)
+       if (rc)
+               return rc;
+
+       /*
+        * only migrate delete LMV, and it needs to be done immediately, because
+        * it's used in deleting sub stripes, and if this is delayed, later when
+        * destroying the master object, it will delete sub stripes again.
+        */
+       if (!strcmp(name, XATTR_NAME_LMV))
+               rc = __osd_sa_xattr_update(env, obj, oh);
+       else
                rc = __osd_sa_xattr_schedule_update(env, obj, oh);
        return rc;
 }
index 02b3c96..056d130 100644 (file)
@@ -1823,13 +1823,17 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_layout_version));
        LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_layout_version) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_layout_version));
-       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding1) == 20, "found %lld\n",
-                (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding1));
-       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding1) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding1));
-       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding2) == 24, "found %lld\n",
+       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_migrate_offset) == 20, "found %lld\n",
+                (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_migrate_offset));
+       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_offset) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_offset));
+       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_migrate_hash) == 24, "found %lld\n",
+                (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_migrate_hash));
+       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_hash) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_hash));
+       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding2) == 28, "found %lld\n",
                 (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding2));
-       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2) == 8, "found %lld\n",
+       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2));
        LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding3) == 32, "found %lld\n",
                 (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding3));
index 904ff71..0510b08 100644 (file)
@@ -26,6 +26,9 @@ CLIENTS=${CLIENTS:-$HOSTNAME}
 RACERDIRS=${RACERDIRS:-"$DIR $DIR2"}
 echo RACERDIRS=$RACERDIRS
 
+#LU-4684
+RACER_ENABLE_MIGRATION=false
+
 if ((MDSCOUNT > 1 &&
      $(lustre_version_code $SINGLEMDS) >= $(version_code 2.8.0))); then
        RACER_ENABLE_REMOTE_DIRS=${RACER_ENABLE_REMOTE_DIRS:-true}
index c6ef161..90edbf2 100755 (executable)
@@ -2109,27 +2109,27 @@ test_110f () {
 run_test 110f "remove remote directory: drop slave rep"
 
 test_110g () {
-       [[ $(lustre_version_code $SINGLEMDS) -ge $(version_code 2.6.57) ]] ||
-               { skip "Need MDS version at least 2.6.57"; return 0; }
+       [[ $(lustre_version_code $SINGLEMDS) -ge $(version_code 2.11.0) ]] ||
+               { skip "Need MDS version at least 2.11.0"; return 0; }
 
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
-       local remote_dir=$DIR/$tdir/remote_dir
-       local mdtidx=1
-
-       mkdir -p $remote_dir
-
-       createmany -o $remote_dir/f 100
 
-       #define OBD_FAIL_MIGRATE_NET_REP                0x1800
-       do_facet mds$mdtidx lctl set_param fail_loc=0x1800
-       $LFS migrate -m $mdtidx $remote_dir || error "migrate failed"
-       do_facet mds$mdtidx lctl set_param fail_loc=0x0
+       mkdir -p $DIR/$tdir
+       touch $DIR/$tdir/$tfile
 
-       for file in $(find $remote_dir); do
-               mdt_index=$($LFS getstripe -m $file)
-               [ $mdt_index == $mdtidx ] ||
-                       error "$file is not on MDT${mdtidx}"
-       done
+       # OBD_FAIL_MDS_REINT_NET_REP    0x119
+       do_facet mds1 $LCTL set_param fail_loc=0x119
+       $LFS migrate -m 1 $DIR/$tdir &
+       migrate_pid=$!
+       sleep 5
+       do_facet mds1 $LCTL set_param fail_loc=0
+       wait $migrate_pid
+
+       local mdt_index
+       mdt_index=$($LFS getstripe -m $DIR/$tdir)
+       [ $mdt_index == 1 ] || error "$tdir is not on MDT1"
+       mdt_index=$($LFS getstripe -m $DIR/$tdir/$tfile)
+       [ $mdt_index == 1 ] || error "$tfile is not on MDT1"
 
        rm -rf $DIR/$tdir || error "rmdir failed"
 }
index a7fd2ef..4025f38 100644 (file)
@@ -4522,8 +4522,8 @@ test_29c()
 
        cancel_lru_locks mdc
        if [ $MDSCOUNT -ge 2 ]; then
-               $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null ||
-                       error "(3.1) Migrate failure"
+               $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null &&
+                       error "(3.1) Migrate should fail"
 
                echo "The object with linkEA overflow should NOT be migrated"
                local newfid=$($LFS path2fid $DIR/$tdir/guard/f0)
@@ -4537,8 +4537,8 @@ test_29c()
        unlinkmany $DIR/$tdir/foo/ttttttttttt 100 || error "(4) Fail to unlink"
 
        if [ $MDSCOUNT -ge 2 ]; then
-               $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null ||
-                       error "(5.1) Migrate failure"
+               $LFS migrate -m 1 $DIR/$tdir/guard 2>/dev/null &&
+                       error "(5.1) Migrate should fail"
 
                # The overflow timestamp is still there, so migration will fail.
                local newfid=$($LFS path2fid $DIR/$tdir/guard/f0)
index 2876399..3005fea 100755 (executable)
@@ -14886,6 +14886,8 @@ run_test 229 "getstripe/stat/rm/attr changes work on released files"
 test_230a() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+               skip "Need MDS version at least 2.11.52"
 
        local MDTIDX=1
 
@@ -14912,6 +14914,8 @@ run_test 230a "Create remote directory and files under the remote directory"
 test_230b() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+               skip "Need MDS version at least 2.11.52"
 
        local MDTIDX=1
        local mdt_index
@@ -15075,9 +15079,11 @@ test_230b() {
 run_test 230b "migrate directory"
 
 test_230c() {
-       [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
+       [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
        remote_mds_nodsh && skip "remote MDS with nodsh"
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+               skip "Need MDS version at least 2.11.52"
 
        local MDTIDX=1
        local mdt_index
@@ -15092,31 +15098,45 @@ test_230c() {
        createmany -o $migrate_dir/f 10 ||
                error "create files under ${migrate_dir} failed"
 
-       #failed after migrating 5 entries
+       # fail after migrating top dir, and this will fail only once, so one
+       # sub file migration will fail, others succeed.
        #OBD_FAIL_MIGRATE_ENTRIES       0x1801
-       do_facet mds1 lctl set_param fail_loc=0x20001801
-       do_facet mds1 lctl  set_param fail_val=5
+       do_facet mds1 lctl set_param fail_loc=0x1801
        local t=$(ls $migrate_dir | wc -l)
        $LFS migrate --mdt-index $MDTIDX $migrate_dir &&
-               error "migrate should fail after 5 entries"
-
-       mkdir $migrate_dir/dir &&
-               error "mkdir succeeds under migrating directory"
-       touch $migrate_dir/file &&
-               error "touch file succeeds under migrating directory"
+               error "migrate should fail"
+
+       # add new dir/file should succeed
+       mkdir $migrate_dir/dir ||
+               error "mkdir failed under migrating directory"
+       touch $migrate_dir/file ||
+               error "touch file failed under migrating directory"
+       # add file with existing name should fail
+       $OPENFILE -f O_CREAT:O_EXCL $migrate_dir/f1 &&
+               error "open(O_CREAT|O_EXCL) f1 should fail"
+       $MULTIOP $migrate_dir/f1 m &&
+               error "create f1 should fail"
+       $MULTIOP $migrate_dir/f3 m &&
+               error "create f3 should fail"
 
        local u=$(ls $migrate_dir | wc -l)
+       u=$((u - 2))
        [ "$u" == "$t" ] || error "$u != $t during migration"
 
        for file in $(find $migrate_dir); do
                stat $file || error "stat $file failed"
        done
 
-       do_facet mds1 lctl set_param fail_loc=0
-       do_facet mds1 lctl set_param fail_val=0
+       # resume migration with different options should fail
+       $LFS migrate -m 0 $migrate_dir &&
+               error "migrate -m 0 $migrate_dir should fail"
+
+       $LFS migrate -m $MDTIDX -c 2 $migrate_dir &&
+               error "migrate -c 2 $migrate_dir should fail"
 
+       # resume migration should succeed
        $LFS migrate -m $MDTIDX $migrate_dir ||
-               error "migrate open files should failed with open files"
+               error "migrate $migrate_dir failed"
 
        echo "Finish migration, then checking.."
        for file in $(find $migrate_dir); do
@@ -15127,20 +15147,35 @@ test_230c() {
 
        rm -rf $DIR/$tdir || error "rm dir failed after migration"
 }
-run_test 230c "check directory accessiblity if migration is failed"
+run_test 230c "check directory accessiblity if migration failed"
 
 test_230d() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+               skip "Need MDS version at least 2.11.52"
 
-       local MDTIDX=1
-       local mdt_index
        local migrate_dir=$DIR/$tdir/migrate_dir
+       local old_index
+       local new_index
+       local old_count
+       local new_count
+       local new_hash
+       local mdt_index
        local i
        local j
 
+       old_index=$((RANDOM % MDSCOUNT))
+       old_count=$((MDSCOUNT - old_index))
+       new_index=$((RANDOM % MDSCOUNT))
+       new_count=$((MDSCOUNT - new_index))
+       new_hash="all_char"
+
+       [ $old_count -gt 1 ] && old_count=$((old_count - RANDOM % old_count))
+       [ $new_count -gt 1 ] && new_count=$((new_count - RANDOM % new_count))
+
        test_mkdir $DIR/$tdir
-       test_mkdir -i0 -c1 $migrate_dir
+       test_mkdir -i $old_index -c $old_count $migrate_dir
 
        for ((i=0; i<100; i++)); do
                test_mkdir -i0 -c1 $migrate_dir/dir_${i}
@@ -15148,14 +15183,23 @@ test_230d() {
                        error "create files under remote dir failed $i"
        done
 
-       $LFS migrate -m $MDTIDX $migrate_dir ||
+       echo -n "Migrate from MDT$old_index "
+       [ $old_count -gt 1 ] && echo -n "... MDT$((old_index + old_count - 1)) "
+       echo -n "to MDT$new_index"
+       [ $new_count -gt 1 ] && echo -n " ... MDT$((new_index + new_count - 1))"
+       echo
+
+       echo "$LFS migrate -m$new_index -c$new_count -H $new_hash $migrate_dir"
+       $LFS migrate -m $new_index -c $new_count -H $new_hash $migrate_dir ||
                error "migrate remote dir error"
 
        echo "Finish migration, then checking.."
        for file in $(find $migrate_dir); do
                mdt_index=$($LFS getstripe -m $file)
-               [ $mdt_index == $MDTIDX ] ||
-                       error "$file is not on MDT${MDTIDX}"
+               if [ $mdt_index -lt $new_index ] ||
+                  [ $mdt_index -gt $((new_index + new_count - 1)) ]; then
+                       error "$file is on MDT$mdt_index"
+               fi
        done
 
        rm -rf $DIR/$tdir || error "rm dir failed after migration"
@@ -15165,6 +15209,8 @@ run_test 230d "check migrate big directory"
 test_230e() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+               skip "Need MDS version at least 2.11.52"
 
        local i
        local j
@@ -15211,6 +15257,8 @@ run_test 230e "migrate mulitple local link files"
 test_230f() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+               skip "Need MDS version at least 2.11.52"
 
        local a_fid
        local ln_fid
@@ -15260,6 +15308,8 @@ run_test 230f "migrate mulitple remote link files"
 test_230g() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+               skip "Need MDS version at least 2.11.52"
 
        mkdir -p $DIR/$tdir/migrate_dir
 
@@ -15272,8 +15322,8 @@ run_test 230g "migrate dir to non-exist MDT"
 test_230h() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
-       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.64) ] &&
-               skip "Need MDS version at least 2.7.64"
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+               skip "Need MDS version at least 2.11.52"
 
        local mdt_index
 
@@ -15285,21 +15335,19 @@ test_230h() {
        $LFS migrate -m1 $DIR/$tdir/.. &&
                error "migrating mountpoint2 should fail"
 
-       $LFS migrate -m1 $DIR/$tdir/migrate_dir/.. ||
-               error "migrating $tdir fail"
-
-       mdt_index=$($LFS getstripe -m $DIR/$tdir)
-       [ $mdt_index == 1 ] || error "$mdt_index != 1 after migration"
-
-       mdt_index=$($LFS getstripe -m $DIR/$tdir/migrate_dir)
-       [ $mdt_index == 1 ] || error "$mdt_index != 1 after migration"
+       # same as mv
+       $LFS migrate -m1 $DIR/$tdir/migrate_dir/.. &&
+               error "migrating $tdir/migrate_dir/.. should fail"
 
+       true
 }
 run_test 230h "migrate .. and root"
 
 test_230i() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
        [ $MDSCOUNT -lt 2 ] && skip_env "needs >= 2 MDTs"
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.11.52) ] &&
+               skip "Need MDS version at least 2.11.52"
 
        mkdir -p $DIR/$tdir/migrate_dir
 
@@ -17764,7 +17812,7 @@ test_300k() {
 
        #define OBD_FAIL_LARGE_STRIPE   0x1703
        $LCTL set_param fail_loc=0x1703
-       $LFS setdirstripe -i 0 -c512 $DIR/$tdir/striped_dir ||
+       $LFS setdirstripe -i 0 -c192 $DIR/$tdir/striped_dir ||
                error "set striped dir error"
        $LCTL set_param fail_loc=0
 
index 86750a8..a9cb7f1 100644 (file)
@@ -1924,8 +1924,10 @@ static int llapi_semantic_traverse(char *path, int size, DIR *parent,
                        rc = 0;
                        if (sem_init) {
                                rc = sem_init(path, d, NULL, data, dent);
-                               if (rc < 0 && ret == 0)
+                               if (rc < 0 && ret == 0) {
                                        ret = rc;
+                                       break;
+                               }
                        }
                        if (sem_fini && rc == 0)
                                sem_fini(path, d, NULL, data, dent);
@@ -4434,11 +4436,18 @@ migrate:
                        sync();
                        retry = true;
                        goto migrate;
+               } else if (errno == EALREADY) {
+                       if (param->fp_verbose & VERBOSE_DETAIL)
+                               fprintf(stdout,
+                                       "%s was migrated to MDT%d already\n",
+                                       path, lmu->lum_stripe_offset);
+                       ret = 0;
+               } else {
+                       ret = -errno;
+                       fprintf(stderr, "%s migrate failed: %s (%d)\n",
+                               path, strerror(-ret), ret);
+                       goto out;
                }
-               ret = -errno;
-               fprintf(stderr, "%s migrate failed: %s (%d)\n",
-                       path, strerror(-ret), ret);
-               goto out;
        } else if (param->fp_verbose & VERBOSE_DETAIL) {
                fprintf(stdout, "migrate %s to MDT%d stripe count %d\n",
                        path, lmu->lum_stripe_offset, lmu->lum_stripe_count);
index 91afeb5..8bd3da7 100644 (file)
@@ -837,7 +837,8 @@ check_lmv_mds_md_v1(void)
        CHECK_MEMBER(lmv_mds_md_v1, lmv_master_mdt_index);
        CHECK_MEMBER(lmv_mds_md_v1, lmv_hash_type);
        CHECK_MEMBER(lmv_mds_md_v1, lmv_layout_version);
-       CHECK_MEMBER(lmv_mds_md_v1, lmv_padding1);
+       CHECK_MEMBER(lmv_mds_md_v1, lmv_migrate_offset);
+       CHECK_MEMBER(lmv_mds_md_v1, lmv_migrate_hash);
        CHECK_MEMBER(lmv_mds_md_v1, lmv_padding2);
        CHECK_MEMBER(lmv_mds_md_v1, lmv_padding3);
        CHECK_MEMBER(lmv_mds_md_v1, lmv_pool_name[LOV_MAXPOOLNAME]);
index 8cb87c0..f2bbe03 100644 (file)
@@ -1844,13 +1844,17 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_layout_version));
        LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_layout_version) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_layout_version));
-       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding1) == 20, "found %lld\n",
-                (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding1));
-       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding1) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding1));
-       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding2) == 24, "found %lld\n",
+       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_migrate_offset) == 20, "found %lld\n",
+                (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_migrate_offset));
+       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_offset) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_offset));
+       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_migrate_hash) == 24, "found %lld\n",
+                (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_migrate_hash));
+       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_hash) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_migrate_hash));
+       LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding2) == 28, "found %lld\n",
                 (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding2));
-       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2) == 8, "found %lld\n",
+       LASSERTF((int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct lmv_mds_md_v1 *)0)->lmv_padding2));
        LASSERTF((int)offsetof(struct lmv_mds_md_v1, lmv_padding3) == 32, "found %lld\n",
                 (long long)(int)offsetof(struct lmv_mds_md_v1, lmv_padding3));