Whamcloud - gitweb
LU-2430 mdd: add lfs mv to migrate inode. 62/6662/46
authorwang di <di.wang@intel.com>
Sun, 10 Aug 2014 09:58:24 +0000 (02:58 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 28 Feb 2014 04:02:37 +0000 (04:02 +0000)
Add lfs mv to migrate the individual inode from one MDT to
another MDT, and this function will only migrate inode layout
on MDT but not touch data object on OST.

lfs mv -M 1 /mnt/lustre/test1 #move test1 to MDT1.

The directory will be migrated from top to the bottom, i.e.
migrating parent first, then migrating the child.

Add migrate into sanity 17n to check on-disk format.

Add sanity 230(c,d,e,f) sanityn 80 to for migration.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: Ib4456a1db8909bd96260c67fff48922081948dcd
Reviewed-on: http://review.whamcloud.com/6662
Tested-by: Jenkins
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
75 files changed:
lustre/fid/fid_store.c
lustre/include/dt_object.h
lustre/include/lclient.h
lustre/include/liblustre.h
lustre/include/linux/lustre_intent.h
lustre/include/lu_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre/lustre_user.h
lustre/include/lustre/lustreapi.h
lustre/include/lustre_fid.h
lustre/include/lustre_linkea.h
lustre/include/lustre_lmv.h
lustre/include/lustre_mdc.h
lustre/include/md_object.h
lustre/include/obd.h
lustre/include/obd_support.h
lustre/lfsck/lfsck_bookmark.c
lustre/lfsck/lfsck_layout.c
lustre/lfsck/lfsck_lib.c
lustre/liblustre/file.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/namei.c
lustre/llite/rw.c
lustre/llite/statahead.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/lod/lod_lov.c
lustre/lod/lod_object.c
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_locks.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdt/mdt_capa.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/mgs/mgs_nids.c
lustre/obdclass/linkea.c
lustre/obdclass/llog_osd.c
lustre/obdclass/local_storage.c
lustre/ofd/ofd_fs.c
lustre/ofd/ofd_obd.c
lustre/ofd/ofd_objects.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_io.c
lustre/osd-ldiskfs/osd_quota.c
lustre/osd-zfs/osd_io.c
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_precreate.c
lustre/osp/osp_trans.c
lustre/ptlrpc/wiretest.c
lustre/target/out_handler.c
lustre/target/tgt_internal.h
lustre/target/tgt_lastrcvd.c
lustre/tests/Makefile.am
lustre/tests/racer/dir_migrate.sh [new file with mode: 0755]
lustre/tests/racer/dir_remote.sh
lustre/tests/racer/racer.sh
lustre/tests/recovery-small.sh
lustre/tests/sanity.sh
lustre/tests/sanityn.sh
lustre/utils/lfs.c
lustre/utils/liblustreapi.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 5ae8307..8e86091 100644 (file)
@@ -121,8 +121,11 @@ int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
+       /* Store ranges in le format. */
+       range_cpu_to_le(&info->sti_space, &seq->lss_space);
+
        rc = dt_declare_record_write(env, seq->lss_obj,
-                                    sizeof(struct lu_seq_range), 0, th);
+                                    seq_store_buf(info), 0, th);
        if (rc)
                GOTO(exit, rc);
 
@@ -138,9 +141,6 @@ int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
        if (rc)
                GOTO(exit, rc);
 
-       /* Store ranges in le format. */
-       range_cpu_to_le(&info->sti_space, &seq->lss_space);
-
        rc = dt_record_write(env, seq->lss_obj, seq_store_buf(info), &pos, th);
        if (rc) {
                CERROR("%s: Can't write space data, rc %d\n",
index acd0fd0..64c87a2 100644 (file)
@@ -487,17 +487,17 @@ struct dt_body_operations {
         ssize_t (*dbo_read)(const struct lu_env *env, struct dt_object *dt,
                             struct lu_buf *buf, loff_t *pos,
                             struct lustre_capa *capa);
-        /**
-         * precondition: dt_object_exists(dt);
-         */
-        ssize_t (*dbo_declare_write)(const struct lu_env *env,
-                                     struct dt_object *dt,
-                                     const loff_t size, loff_t pos,
-                                     struct thandle *handle);
-        ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt,
-                             const struct lu_buf *buf, loff_t *pos,
-                             struct thandle *handle, struct lustre_capa *capa,
-                             int ignore_quota);
+       /**
+        * precondition: dt_object_exists(dt);
+        */
+       ssize_t (*dbo_declare_write)(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    const struct lu_buf *buf, loff_t pos,
+                                    struct thandle *handle);
+       ssize_t (*dbo_write)(const struct lu_env *env, struct dt_object *dt,
+                            const struct lu_buf *buf, loff_t *pos,
+                            struct thandle *handle, struct lustre_capa *capa,
+                            int ignore_quota);
         /*
          * methods for zero-copy IO
          */
@@ -1038,18 +1038,19 @@ static inline int dt_trans_cb_add(struct thandle *th,
 
 
 static inline int dt_declare_record_write(const struct lu_env *env,
-                                          struct dt_object *dt,
-                                          int size, loff_t pos,
-                                          struct thandle *th)
-{
-        int rc;
-
-        LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
-        LASSERT(th != NULL);
-        LASSERT(dt->do_body_ops);
-        LASSERT(dt->do_body_ops->dbo_declare_write);
-        rc = dt->do_body_ops->dbo_declare_write(env, dt, size, pos, th);
-        return rc;
+                                         struct dt_object *dt,
+                                         const struct lu_buf *buf,
+                                         loff_t pos,
+                                         struct thandle *th)
+{
+       int rc;
+
+       LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
+       LASSERT(th != NULL);
+       LASSERT(dt->do_body_ops);
+       LASSERT(dt->do_body_ops->dbo_declare_write);
+       rc = dt->do_body_ops->dbo_declare_write(env, dt, buf, pos, th);
+       return rc;
 }
 
 static inline int dt_declare_create(const struct lu_env *env,
index c61c81b..3344160 100644 (file)
@@ -455,11 +455,20 @@ struct cl_client_cache {
        wait_queue_head_t       ccc_unstable_waitq; /* Signaled on BRW commit */
 };
 
+enum {
+       LUSTRE_OPC_MKDIR    = 0,
+       LUSTRE_OPC_SYMLINK  = 1,
+       LUSTRE_OPC_MKNOD    = 2,
+       LUSTRE_OPC_CREATE   = 3,
+       LUSTRE_OPC_ANY      = 5
+};
+
 enum op_cli_flags {
        CLI_SET_MEA     = 1 << 0,
        CLI_RM_ENTRY    = 1 << 1,
        CLI_HASH64      = 1 << 2,
        CLI_API32       = 1 << 3,
+       CLI_MIGRATE     = 1 << 4,
 };
 
 #endif /*LCLIENT_H */
index 882fd18..e0ba5be 100644 (file)
@@ -229,6 +229,21 @@ struct lookup_intent {
        } d;
 };
 
+static inline int it_disposition(const struct lookup_intent *it, int flag)
+{
+       return it->d.lustre.it_disposition & flag;
+}
+
+static inline void it_set_disposition(struct lookup_intent *it, int flag)
+{
+       it->d.lustre.it_disposition |= flag;
+}
+
+static inline void it_clear_disposition(struct lookup_intent *it, int flag)
+{
+       it->d.lustre.it_disposition &= ~flag;
+}
+
 static inline void intent_init(struct lookup_intent *it, int op, int flags)
 {
         memset(it, 0, sizeof(*it));
index 588f211..3a5c972 100644 (file)
@@ -59,4 +59,19 @@ struct lookup_intent {
        } d;
 };
 
+static inline int it_disposition(const struct lookup_intent *it, int flag)
+{
+       return it->d.lustre.it_disposition & flag;
+}
+
+static inline void it_set_disposition(struct lookup_intent *it, int flag)
+{
+       it->d.lustre.it_disposition |= flag;
+}
+
+static inline void it_clear_disposition(struct lookup_intent *it, int flag)
+{
+       it->d.lustre.it_disposition &= ~flag;
+}
+
 #endif
index bb43780..efffc61 100644 (file)
@@ -896,8 +896,8 @@ struct lu_rdpg {
 };
 
 enum lu_xattr_flags {
-        LU_XATTR_REPLACE = (1 << 0),
-        LU_XATTR_CREATE  = (1 << 1)
+       LU_XATTR_REPLACE = (1 << 0),
+       LU_XATTR_CREATE  = (1 << 1)
 };
 
 /** @} helpers */
index 48e326d..869e682 100644 (file)
@@ -1584,6 +1584,7 @@ enum obdo_flags {
 #define LOV_MAGIC         LOV_MAGIC_V1
 #define LOV_MAGIC_JOIN_V1 0x0BD20BD0
 #define LOV_MAGIC_V3      0x0BD30BD0
+#define LOV_MAGIC_MIGRATE 0x0BD40BD0
 
 /*
  * magic for fully defined striping
@@ -2163,7 +2164,7 @@ typedef enum {
        REINT_OPEN     = 6,
        REINT_SETXATTR = 7,
        REINT_RMENTRY  = 8,
-//      REINT_WRITE    = 9,
+       REINT_MIGRATE  = 9,
         REINT_MAX
 } mds_reint_t, mdt_reint_t;
 
@@ -2494,6 +2495,7 @@ enum mds_op_bias {
        MDS_CREATE_VOLATILE     = 1 << 10,
        MDS_OWNEROVERRIDE       = 1 << 11,
        MDS_HSM_RELEASE         = 1 << 12,
+       MDS_RENAME_MIGRATE      = 1 << 13,
 };
 
 /* instance of mdt_reint_rec */
@@ -2703,11 +2705,13 @@ extern void lustre_swab_lmv_desc (struct lmv_desc *ld);
 /* lmv structures */
 #define LMV_MAGIC_V1   0x0CD10CD0    /* normal stripe lmv magic */
 #define LMV_USER_MAGIC 0x0CD20CD0    /* default lmv magic*/
+#define LMV_MAGIC_MIGRATE      0x0CD30CD0    /* migrate stripe lmv magic */
 #define LMV_MAGIC      LMV_MAGIC_V1
 
 enum lmv_hash_type {
        LMV_HASH_TYPE_ALL_CHARS = 1,
        LMV_HASH_TYPE_FNV_1A_64 = 2,
+       LMV_HASH_TYPE_MIGRATION = 3,
 };
 
 #define LMV_HASH_NAME_ALL_CHARS        "all_char"
@@ -2764,7 +2768,8 @@ extern void lustre_swab_lmv_mds_md(union lmv_mds_md *lmm);
 static inline int lmv_mds_md_size(int stripe_count, unsigned int lmm_magic)
 {
        switch (lmm_magic) {
-       case LMV_MAGIC_V1: {
+       case LMV_MAGIC_V1:
+       case LMV_MAGIC_MIGRATE: {
                struct lmv_mds_md_v1 *lmm1;
 
                return sizeof(*lmm1) + stripe_count *
@@ -2779,6 +2784,7 @@ static inline int lmv_mds_md_stripe_count_get(const union lmv_mds_md *lmm)
 {
        switch (le32_to_cpu(lmm->lmv_magic)) {
        case LMV_MAGIC_V1:
+       case LMV_MAGIC_MIGRATE:
                return le32_to_cpu(lmm->lmv_md_v1.lmv_stripe_count);
        case LMV_USER_MAGIC:
                return le32_to_cpu(lmm->lmv_user_md.lum_stripe_count);
@@ -2792,6 +2798,7 @@ static inline int lmv_mds_md_stripe_count_set(union lmv_mds_md *lmm,
 {
        switch (le32_to_cpu(lmm->lmv_magic)) {
        case LMV_MAGIC_V1:
+       case LMV_MAGIC_MIGRATE:
                lmm->lmv_md_v1.lmv_stripe_count = cpu_to_le32(stripe_count);
                break;
        case LMV_USER_MAGIC:
@@ -3901,6 +3908,7 @@ enum update_type {
        OUT_INDEX_LOOKUP        = 9,
        OUT_INDEX_INSERT        = 10,
        OUT_INDEX_DELETE        = 11,
+       OUT_WRITE               = 12,
        OUT_LAST
 };
 
index 4aa218d..2cc6c46 100644 (file)
@@ -267,6 +267,7 @@ struct ost_id {
 #define LL_IOC_GET_LEASE               _IO('f', 244)
 #define LL_IOC_HSM_IMPORT              _IOWR('f', 245, struct hsm_user_import)
 #define LL_IOC_LMV_SET_DEFAULT_STRIPE  _IOWR('f', 246, struct lmv_user_md)
+#define LL_IOC_MIGRATE                 _IOR('f', 247, int)
 
 #define LL_STATFS_LMV          1
 #define LL_STATFS_LOV          2
index 8f7d103..7d11033 100644 (file)
@@ -161,7 +161,8 @@ struct find_param {
                                 exclude_stripecount:1,
                                 check_layout:1,
                                 exclude_layout:1,
-                                get_default_lmv:1; /* Get default LMV */
+                                get_default_lmv:1, /* Get default LMV */
+                                migrate:1;
 
        int                      verbose;
        int                      quiet;
@@ -247,6 +248,7 @@ extern void llapi_ping_target(char *obd_type, char *obd_name,
 
 extern int llapi_search_rootpath(char *pathname, const char *fsname);
 extern int llapi_nodemap_exists(const char *name);
+extern int llapi_mv(char *path, struct find_param *param);
 
 struct mntent;
 #define HAVE_LLAPI_IS_LUSTRE_MNT
index 5d44838..d0f86e7 100644 (file)
@@ -326,6 +326,12 @@ static inline void lu_last_id_fid(struct lu_fid *fid, __u64 seq, __u32 ost_idx)
        fid->f_ver = 0;
 }
 
+static inline bool fid_is_md_operative(const struct lu_fid *fid)
+{
+       return fid_is_mdt0(fid) || fid_is_igif(fid) ||
+              fid_is_norm(fid) || fid_is_root(fid);
+}
+
 /* seq client type */
 enum lu_cli_type {
        LUSTRE_SEQ_METADATA = 1,
index 1e07f3a..170cc04 100644 (file)
@@ -46,6 +46,8 @@ int linkea_data_new(struct linkea_data *ldata, struct lu_buf *buf);
 int linkea_init(struct linkea_data *ldata);
 void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
                         struct lu_name *lname, struct lu_fid *pfid);
+int linkea_entry_pack(struct link_ea_entry *lee, const struct lu_name *lname,
+                     const struct lu_fid *pfid);
 int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
                   const struct lu_fid *pfid);
 void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname);
index ce0bfa6..fc15717 100644 (file)
@@ -98,8 +98,10 @@ static inline void lmv_cpu_to_le(union lmv_mds_md *lmv_dst,
 {
        switch (lmv_src->lmv_magic) {
        case LMV_MAGIC_V1:
+       case LMV_MAGIC_MIGRATE: {
                lmv1_cpu_to_le(&lmv_dst->lmv_md_v1, &lmv_src->lmv_md_v1);
                break;
+       }
        default:
                break;
        }
@@ -110,10 +112,13 @@ static inline void lmv_le_to_cpu(union lmv_mds_md *lmv_dst,
 {
        switch (le32_to_cpu(lmv_src->lmv_magic)) {
        case LMV_MAGIC_V1:
+       case LMV_MAGIC_MIGRATE: {
                lmv1_le_to_cpu(&lmv_dst->lmv_md_v1, &lmv_src->lmv_md_v1);
                break;
+       }
        default:
                break;
        }
 }
+
 #endif
index 3c7b6f0..4f69667 100644 (file)
@@ -188,7 +188,7 @@ struct mdc_cache_waiter {
 };
 
 /* mdc/mdc_locks.c */
-int it_disposition(struct lookup_intent *it, int flag);
+int it_disposition(const struct lookup_intent *it, int flag);
 void it_clear_disposition(struct lookup_intent *it, int flag);
 void it_set_disposition(struct lookup_intent *it, int flag);
 int it_open_error(int phase, struct lookup_intent *it);
index 82f6322..270e302 100644 (file)
@@ -302,6 +302,9 @@ struct md_dir_operations {
                          struct md_object *cobj, const struct lu_name *lname,
                          struct md_attr *ma, int no_name);
 
+       int (*mdo_migrate)(const struct lu_env *env, struct md_object *pobj,
+                          const struct lu_fid *lf, const struct lu_name *lname,
+                          struct md_object *tobj, struct md_attr *ma);
         /** This method is used to compare a requested layout to an existing
          * layout (struct lov_mds_md_v1/3 vs struct lov_mds_md_v1/3) */
         int (*mdo_lum_lmm_cmp)(const struct lu_env *env,
@@ -744,6 +747,17 @@ static inline int mdo_rename(const struct lu_env *env,
                                           ma);
 }
 
+static inline int mdo_migrate(const struct lu_env *env,
+                            struct md_object *pobj,
+                            const struct lu_fid *lf,
+                            const struct lu_name *lname,
+                            struct md_object *tobj,
+                            struct md_attr *ma)
+{
+       LASSERT(pobj->mo_dir_ops->mdo_migrate);
+       return pobj->mo_dir_ops->mdo_migrate(env, pobj, lf, lname, tobj, ma);
+}
+
 static inline int mdo_is_subdir(const struct lu_env *env,
                                 struct md_object *mo,
                                 const struct lu_fid *fid,
index ac80412..3be4697 100644 (file)
@@ -975,9 +975,6 @@ struct md_op_data {
        /* Various operation flags. */
        enum mds_op_bias        op_bias;
 
-       /* Operation type */
-       __u32                   op_opc;
-
        /* Used by readdir */
        __u64                   op_hash_offset;
 
@@ -1192,14 +1189,6 @@ struct obd_ops {
          * Also, add a wrapper function in include/linux/obd_class.h. */
 };
 
-enum {
-        LUSTRE_OPC_MKDIR    = (1 << 0),
-        LUSTRE_OPC_SYMLINK  = (1 << 1),
-        LUSTRE_OPC_MKNOD    = (1 << 2),
-        LUSTRE_OPC_CREATE   = (1 << 3),
-        LUSTRE_OPC_ANY      = (1 << 4)
-};
-
 /* lmv structures */
 struct lustre_md {
        struct mdt_body         *body;
index d33d85d..80be071 100644 (file)
@@ -518,6 +518,10 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_OUT_UPDATE_NET                0x1700
 #define OBD_FAIL_OUT_UPDATE_NET_REP    0x1701
 
+/* MIGRATE */
+#define OBD_FAIL_MIGRATE_NET_REP               0x1800
+#define OBD_FAIL_MIGRATE_ENTRIES               0x1801
+#define OBD_FAIL_MIGRATE_LINKEA                        0x1802
 
 /* Assign references to moved code to reduce code changes */
 #define OBD_FAIL_PRECHECK(id)                   CFS_FAIL_PRECHECK(id)
index 27f846a..cf0e552 100644 (file)
@@ -117,7 +117,10 @@ int lfsck_bookmark_store(const struct lu_env *env, struct lfsck_instance *lfsck)
                RETURN(rc);
        }
 
-       rc = dt_declare_record_write(env, obj, len, 0, handle);
+       rc = dt_declare_record_write(env, obj,
+                                    lfsck_buf_get(env,
+                                    &lfsck->li_bookmark_disk, len),
+                                    0, handle);
        if (rc != 0) {
                CERROR("%s: fail to declare trans for storing lfsck_bookmark: "
                       "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
index b01122e..d2f7ae7 100644 (file)
@@ -780,7 +780,8 @@ static int lfsck_layout_store(const struct lu_env *env,
                RETURN(rc);
        }
 
-       rc = dt_declare_record_write(env, obj, size, pos, handle);
+       rc = dt_declare_record_write(env, obj, lfsck_buf_get(env, lo, size),
+                                    pos, handle);
        if (rc != 0) {
                CERROR("%s: fail to declare trans for storing lfsck_layout(1): "
                       "rc = %d\n", lfsck_lfsck2name(lfsck), rc);
@@ -922,7 +923,10 @@ lfsck_layout_lastid_create(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop, rc);
 
-       rc = dt_declare_record_write(env, obj, sizeof(lastid), pos, th);
+       rc = dt_declare_record_write(env, obj,
+                                    lfsck_buf_get(env, &lastid,
+                                                  sizeof(lastid)),
+                                    pos, th);
        if (rc != 0)
                GOTO(stop, rc);
 
@@ -1042,8 +1046,10 @@ lfsck_layout_lastid_store(const struct lu_env *env,
                        continue;
                }
 
+               lastid = cpu_to_le64(lls->lls_lastid);
                rc = dt_declare_record_write(env, lls->lls_lastid_obj,
-                                            sizeof(lastid), pos, th);
+                                            lfsck_buf_get(env, &lastid,
+                                            sizeof(lastid)), pos, th);
                if (rc != 0)
                        goto stop;
 
@@ -1051,7 +1057,6 @@ lfsck_layout_lastid_store(const struct lu_env *env,
                if (rc != 0)
                        goto stop;
 
-               lastid = cpu_to_le64(lls->lls_lastid);
                dt_write_lock(env, lls->lls_lastid_obj, 0);
                rc = dt_record_write(env, lls->lls_lastid_obj,
                                     lfsck_buf_get(env, &lastid,
index 4f05c19..5b5491a 100644 (file)
@@ -395,7 +395,8 @@ static int lfsck_create_lpf_local(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 5a. update bookmark */
-       rc = dt_declare_record_write(env, bk_obj, len, 0, th);
+       rc = dt_declare_record_write(env, bk_obj,
+                                    lfsck_buf_get(env, bk, len), 0, th);
        if (rc != 0)
                GOTO(stop, rc);
 
@@ -584,7 +585,8 @@ static int lfsck_create_lpf_remote(const struct lu_env *env,
                GOTO(stop, rc);
 
        /* 8a. update bookmark locally. */
-       rc = dt_declare_record_write(env, bk_obj, len, 0, th);
+       rc = dt_declare_record_write(env, bk_obj,
+                                    lfsck_buf_get(env, bk, len), 0, th);
        if (rc != 0)
                GOTO(stop, rc);
 
index 13350c2..e27dcb5 100644 (file)
@@ -95,7 +95,6 @@ void llu_prep_md_op_data(struct md_op_data *op_data, struct inode *i1,
         else
                 fid_zero(&op_data->op_fid2);
 
-        op_data->op_opc = opc;
         op_data->op_name = name;
         op_data->op_mode = mode;
         op_data->op_namelen = namelen;
index 6a1e3c5..27f23e4 100644 (file)
@@ -579,6 +579,7 @@ int ll_dir_getstripe(struct inode *inode, void **plmm, int *plmm_size,
                        lustre_swab_lov_user_md_v3((struct lov_user_md_v3 *)lmm);
                break;
        case LMV_MAGIC:
+       case LMV_MAGIC_MIGRATE:
                if (LOV_MAGIC != cpu_to_le32(LOV_MAGIC))
                        lustre_swab_lmv_mds_md((union lmv_mds_md *)lmm);
                break;
@@ -597,8 +598,7 @@ out:
        return rc;
 }
 
-static int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi,
-                                const struct lu_fid *fid)
+int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi, const struct lu_fid *fid)
 {
        struct md_op_data       *op_data;
        int                     rc;
@@ -1739,6 +1739,38 @@ out_rmdir:
                OBD_FREE_PTR(copy);
                RETURN(rc);
        }
+       case LL_IOC_MIGRATE: {
+               char            *buf = NULL;
+               const char      *filename;
+               int             namelen = 0;
+               int             len;
+               int             rc;
+               int             mdtidx;
+
+               rc = obd_ioctl_getdata(&buf, &len, (void __user *)arg);
+               if (rc < 0)
+                       RETURN(rc);
+
+               data = (struct obd_ioctl_data *)buf;
+               if (data->ioc_inlbuf1 == NULL || data->ioc_inlbuf2 == NULL ||
+                   data->ioc_inllen1 == 0 || data->ioc_inllen2 == 0)
+                       GOTO(migrate_free, rc = -EINVAL);
+
+               filename = data->ioc_inlbuf1;
+               namelen = data->ioc_inllen1;
+               if (namelen < 1)
+                       GOTO(migrate_free, rc = -EINVAL);
+
+               if (data->ioc_inllen2 != sizeof(mdtidx))
+                       GOTO(migrate_free, rc = -EINVAL);
+               mdtidx = *(int *)data->ioc_inlbuf2;
+
+               rc = ll_migrate(inode, file, mdtidx, filename, namelen);
+migrate_free:
+               obd_ioctl_freedata(buf, len);
+
+               RETURN(rc);
+       }
        default:
                RETURN(obd_iocontrol(cmd, sbi->ll_dt_exp, 0, NULL,
                                     (void *)arg));
index dabc5d1..f81a8ac 100644 (file)
@@ -384,7 +384,8 @@ int ll_file_release(struct inode *inode, struct file *file)
         }
 
         if (!S_ISDIR(inode->i_mode)) {
-               lov_read_and_clear_async_rc(lli->lli_clob);
+               if (lli->lli_clob != NULL)
+                       lov_read_and_clear_async_rc(lli->lli_clob);
                 lli->lli_async_rc = 0;
         }
 
@@ -2604,6 +2605,7 @@ long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
                OBD_FREE_PTR(hui);
                RETURN(rc);
        }
+
        default: {
                int err;
 
@@ -2722,9 +2724,11 @@ int ll_flush(struct file *file, fl_owner_t id)
         * failed for pages in this mapping. */
        rc = lli->lli_async_rc;
        lli->lli_async_rc = 0;
-       err = lov_read_and_clear_async_rc(lli->lli_clob);
-       if (rc == 0)
-               rc = err;
+       if (lli->lli_clob != NULL) {
+               err = lov_read_and_clear_async_rc(lli->lli_clob);
+               if (rc == 0)
+                       rc = err;
+       }
 
        /* The application has been told write failure already.
         * Do not report failure again. */
@@ -2996,6 +3000,109 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
         RETURN(rc);
 }
 
+static int ll_get_fid_by_name(struct inode *parent, const char *name,
+                             int namelen, struct lu_fid *fid)
+{
+       struct md_op_data       *op_data = NULL;
+       struct mdt_body         *body;
+       struct ptlrpc_request   *req;
+       int                     rc;
+
+       op_data = ll_prep_md_op_data(NULL, parent, NULL, name, namelen, 0,
+                                    LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data))
+               return PTR_ERR(op_data);
+
+       op_data->op_valid = OBD_MD_FLID;
+       rc = md_getattr_name(ll_i2sbi(parent)->ll_md_exp, op_data, &req);
+       if (rc < 0)
+               GOTO(out_free, rc);
+
+       body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
+       if (body == NULL)
+               GOTO(out_req, rc = -EFAULT);
+
+       *fid = body->fid1;
+out_req:
+       ptlrpc_req_finished(req);
+out_free:
+       if (op_data != NULL)
+               ll_finish_md_op_data(op_data);
+       return rc;
+}
+
+int ll_migrate(struct inode *parent, struct file *file, int mdtidx,
+              const char *name, int namelen)
+{
+       struct dentry         *dchild = NULL;
+       struct md_op_data     *op_data;
+       struct ptlrpc_request *request = NULL;
+       struct qstr           qstr;
+       int                    rc;
+       ENTRY;
+
+       CDEBUG(D_VFSTRACE, "migrate %s under"DFID" to MDT%d\n",
+              name, PFID(ll_inode2fid(parent)), mdtidx);
+
+       op_data = ll_prep_md_op_data(NULL, parent, NULL, name, namelen,
+                                    0, LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data))
+               RETURN(PTR_ERR(op_data));
+
+       /* Get child FID first */
+       qstr.hash = full_name_hash(name, namelen);
+       qstr.name = name;
+       qstr.len = namelen;
+       dchild = d_lookup(file->f_dentry, &qstr);
+       if (dchild != NULL && dchild->d_inode != NULL) {
+               op_data->op_fid3 = *ll_inode2fid(dchild->d_inode);
+       } else {
+               rc = ll_get_fid_by_name(parent, name, strnlen(name, namelen),
+                                       &op_data->op_fid3);
+               if (rc != 0)
+                       GOTO(out_free, rc);
+       }
+
+       if (!fid_is_sane(&op_data->op_fid3)) {
+               CERROR("%s: migrate %s , but fid "DFID" is insane\n",
+                      ll_get_fsname(parent->i_sb, NULL, 0), name,
+                      PFID(&op_data->op_fid3));
+               GOTO(out_free, rc);
+       }
+
+       rc = ll_get_mdt_idx_by_fid(ll_i2sbi(parent), &op_data->op_fid3);
+       if (rc < 0)
+               GOTO(out_free, rc);
+
+       if (rc == mdtidx) {
+               CDEBUG(D_INFO, "%s:"DFID" is already on MDT%d.\n", name,
+                      PFID(&op_data->op_fid3), mdtidx);
+               GOTO(out_free, rc = 0);
+       }
+
+       op_data->op_mds = mdtidx;
+       op_data->op_cli_flags = CLI_MIGRATE;
+       rc = md_rename(ll_i2sbi(parent)->ll_md_exp, op_data, name,
+                      strnlen(name, namelen), name, strnlen(name, namelen),
+                      &request);
+       if (rc == 0)
+               ll_update_times(request, parent);
+
+       ptlrpc_req_finished(request);
+       if (rc != 0)
+               GOTO(out_free, rc);
+
+out_free:
+       if (dchild != NULL) {
+               if (dchild->d_inode != NULL)
+                       ll_delete_inode(dchild->d_inode);
+               dput(dchild);
+       }
+
+       ll_finish_md_op_data(op_data);
+       RETURN(rc);
+}
+
 int ll_file_noflock(struct file *file, int cmd, struct file_lock *file_lock)
 {
         ENTRY;
index 59ecdfb..fb8b666 100644 (file)
@@ -729,6 +729,7 @@ extern struct inode_operations ll_dir_inode_operations;
 int ll_dir_read(struct inode *inode, struct md_op_data *op_data,
                void *cookie, filldir_t filldir);
 int ll_get_mdt_idx(struct inode *inode);
+int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi, const struct lu_fid *fid);
 
 struct lu_dirent *ll_dir_entry_start(struct inode *dir,
                                     struct md_op_data *op_data,
@@ -752,6 +753,9 @@ struct lookup_intent *ll_convert_intent(struct open_intent *oit,
 #endif
 struct dentry *ll_splice_alias(struct inode *inode, struct dentry *de);
 int ll_rmdir_entry(struct inode *dir, char *name, int namelen);
+int ll_d_mountpoint(struct dentry *dparent, struct dentry *dchild,
+                   struct qstr *name);
+void ll_update_times(struct ptlrpc_request *request, struct inode *inode);
 
 /* llite/rw.c */
 int ll_writepage(struct page *page, struct writeback_control *wbc);
@@ -819,7 +823,8 @@ int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
 int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct kstat *stat);
 struct ll_file_data *ll_file_data_get(void);
 struct posix_acl * ll_get_acl(struct inode *inode, int type);
-
+int ll_migrate(struct inode *parent, struct file *file, int mdtidx,
+              const char *name, int namelen);
 #ifdef HAVE_GENERIC_PERMISSION_4ARGS
 int ll_inode_permission(struct inode *inode, int mask, unsigned int flags);
 #else
index 49eaea8..7d2054c 100644 (file)
@@ -1333,9 +1333,32 @@ static void ll_update_lsm_md(struct inode *inode, struct lustre_md *md)
        struct ll_inode_info *lli = ll_i2info(inode);
        struct lmv_stripe_md *lsm = md->lmv;
        int idx;
+       ENTRY;
 
-       LASSERT(lsm != NULL);
        LASSERT(S_ISDIR(inode->i_mode));
+       CDEBUG(D_INODE, "update lsm %p of "DFID"\n", lli->lli_lsm_md,
+              PFID(ll_inode2fid(inode)));
+
+       /* no striped information from request. */
+       if (lsm == NULL) {
+               if (lli->lli_lsm_md == NULL) {
+                       RETURN_EXIT;
+               } else if (lli->lli_lsm_md->lsm_md_magic == LMV_MAGIC_MIGRATE) {
+                       /* migration is done, the temporay MIGRATE layout has
+                        * been removed */
+                       CDEBUG(D_INODE, DFID" finish migration.\n",
+                              PFID(ll_inode2fid(inode)));
+                       lmv_free_memmd(lli->lli_lsm_md);
+                       lli->lli_lsm_md = NULL;
+                       RETURN_EXIT;
+               } else {
+                       /* The lustre_md from req does not include stripeEA,
+                        * see ll_md_setattr */
+                       RETURN_EXIT;
+               }
+       }
+
+       /* set the directory layout */
        if (lli->lli_lsm_md == NULL) {
                int rc;
 
@@ -1350,7 +1373,9 @@ static void ll_update_lsm_md(struct inode *inode, struct lustre_md *md)
                /* set lsm_md to NULL, so the following free lustre_md
                 * will not free this lsm */
                md->lmv = NULL;
-               return;
+               CDEBUG(D_INODE, "Set lsm %p magic %x to "DFID"\n", lsm,
+                      lsm->lsm_md_magic, PFID(ll_inode2fid(inode)));
+               RETURN_EXIT;
        }
 
        /* Compare the old and new stripe information */
@@ -1391,6 +1416,8 @@ static void ll_update_lsm_md(struct inode *inode, struct lustre_md *md)
 
        md_update_lsm_md(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
                         md->body, ll_md_blocking_ast);
+
+       RETURN_EXIT;
 }
 
 void ll_clear_inode(struct inode *inode)
@@ -1920,7 +1947,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                        lli->lli_maxbytes = MAX_LFS_FILESIZE;
        }
 
-       if (S_ISDIR(inode->i_mode) && md->lmv != NULL)
+       if (S_ISDIR(inode->i_mode))
                ll_update_lsm_md(inode, md);
 
        if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
@@ -2534,7 +2561,6 @@ struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data,
        if ((opc == LUSTRE_OPC_CREATE) && (name != NULL) &&
             filename_is_volatile(name, namelen, NULL))
                op_data->op_bias |= MDS_CREATE_VOLATILE;
-       op_data->op_opc = opc;
        op_data->op_mds = 0;
        op_data->op_data = data;
 
index da02c9f..9149182 100644 (file)
@@ -58,8 +58,8 @@ static int ll_create_it(struct inode *, struct dentry *,
  * Check if we have something mounted at the named dchild.
  * In such a case there would always be dentry present.
  */
-static int ll_d_mountpoint(struct dentry *dparent, struct dentry *dchild,
-                           struct qstr *name)
+int ll_d_mountpoint(struct dentry *dparent, struct dentry *dchild,
+                   struct qstr *name)
 {
         int mounted = 0;
 
@@ -909,8 +909,7 @@ static int ll_create_it(struct inode *dir, struct dentry *dentry, int mode,
        RETURN(0);
 }
 
-static void ll_update_times(struct ptlrpc_request *request,
-                            struct inode *inode)
+void ll_update_times(struct ptlrpc_request *request, struct inode *inode)
 {
         struct mdt_body *body = req_capsule_server_get(&request->rq_pill,
                                                        &RMF_MDT_BODY);
index a1d82c3..361dfca 100644 (file)
@@ -1163,6 +1163,10 @@ int ll_writepages(struct address_space *mapping, struct writeback_control *wbc)
                 * evicted to avoid hitting LBUG when truncate_inode_pages()
                 * is called later on. */
                ignore_layout = 1;
+
+       if (cl_i2info(inode)->lli_clob == NULL)
+               RETURN(0);
+
        result = cl_sync_file_range(inode, start, end, mode, ignore_layout);
        if (result > 0) {
                wbc->nr_to_write -= result;
index 011b448..b96b13c 100644 (file)
@@ -1588,7 +1588,8 @@ int do_statahead_enter(struct inode *dir, struct dentry **dentryp,
                                               (*dentryp)->d_name.name,
                                               PFID(ll_inode2fid((*dentryp)->d_inode)),
                                               PFID(ll_inode2fid(inode)));
-                                        ll_sai_unplug(sai, entry);
+                                       ll_intent_release(&it);
+                                       ll_sai_unplug(sai, entry);
                                         RETURN(-ESTALE);
                                 } else {
                                        iput(inode);
index 363eb3a..771d381 100644 (file)
@@ -54,6 +54,7 @@
 #include <lustre_lib.h>
 #include <lustre_net.h>
 #include <lustre_dlm.h>
+#include <lustre_mdc.h>
 #include <obd_class.h>
 #include <lprocfs_status.h>
 #include "lmv_internal.h"
@@ -357,6 +358,8 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
 
                        oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
                                                        op_data->op_namelen);
+                       if (IS_ERR(oinfo))
+                               RETURN(PTR_ERR(oinfo));
                        op_data->op_fid1 = oinfo->lmo_fid;
                }
 
@@ -431,11 +434,12 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
                       ldlm_blocking_callback cb_blocking,
                      __u64 extra_lock_flags)
 {
-       struct obd_device      *obd = exp->exp_obd;
-       struct lmv_obd         *lmv = &obd->u.lmv;
-       struct lmv_tgt_desc    *tgt = NULL;
-       struct mdt_body        *body;
-       int                     rc = 0;
+       struct obd_device       *obd = exp->exp_obd;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       struct lmv_tgt_desc     *tgt = NULL;
+       struct mdt_body         *body;
+       struct lmv_stripe_md    *lsm = op_data->op_mea1;
+       int                     rc = 0;
        ENTRY;
 
        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
@@ -446,16 +450,15 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
                fid_zero(&op_data->op_fid2);
 
        CDEBUG(D_INODE, "LOOKUP_INTENT with fid1="DFID", fid2="DFID
-              ", name='%s' -> mds #%d\n", PFID(&op_data->op_fid1),
-              PFID(&op_data->op_fid2),
+              ", name='%s' -> mds #%d lsm=%p lsm_magic=%x\n",
+              PFID(&op_data->op_fid1), PFID(&op_data->op_fid2),
               op_data->op_name ? op_data->op_name : "<NULL>",
-              tgt->ltd_idx);
+              tgt->ltd_idx, lsm, lsm == NULL ? -1 : lsm->lsm_md_magic);
 
        op_data->op_bias &= ~MDS_CROSS_REF;
 
        rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
                             flags, reqp, cb_blocking, extra_lock_flags);
-
        if (rc < 0)
                RETURN(rc);
 
@@ -464,13 +467,31 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
                 * during update_inode process (see ll_update_lsm_md) */
                if (op_data->op_mea2 != NULL) {
                        rc = lmv_revalidate_slaves(exp, NULL, op_data->op_mea2,
-                                               cb_blocking, extra_lock_flags);
+                                                  cb_blocking,
+                                                  extra_lock_flags);
                        if (rc != 0)
                                RETURN(rc);
                }
                RETURN(rc);
-       }
+       } else if (it_disposition(it, DISP_LOOKUP_NEG) &&
+                  lsm != NULL && lsm->lsm_md_magic == LMV_MAGIC_MIGRATE) {
+               /* For migrating directory, if it can not find the child in
+                * the source directory(master stripe), try the targeting
+                * directory(stripe 1) */
+               tgt = lmv_find_target(lmv, &lsm->lsm_md_oinfo[1].lmo_fid);
+               if (IS_ERR(tgt))
+                       RETURN(PTR_ERR(tgt));
 
+               ptlrpc_req_finished(*reqp);
+               CDEBUG(D_INODE, "For migrating dir, try target dir "DFID"\n",
+                      PFID(&lsm->lsm_md_oinfo[1].lmo_fid));
+
+               op_data->op_fid1 = lsm->lsm_md_oinfo[1].lmo_fid;
+               it->d.lustre.it_disposition &= ~DISP_ENQ_COMPLETE;
+               rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
+                                   flags, reqp, cb_blocking, extra_lock_flags);
+               RETURN(rc);
+       }
        /*
         * MDS has returned success. Probably name has been resolved in
         * remote inode. Let's check this.
index 78e4009..622b2e1 100644 (file)
@@ -138,7 +138,13 @@ lsm_name_to_stripe_info(const struct lmv_stripe_md *lsm, const char *name,
        stripe_index = lmv_name_to_stripe_index(lsm->lsm_md_hash_type,
                                                lsm->lsm_md_stripe_count,
                                                name, namelen);
-       LASSERT(stripe_index < lsm->lsm_md_stripe_count);
+       if (stripe_index < 0)
+               return ERR_PTR(stripe_index);
+
+       LASSERTF(stripe_index < lsm->lsm_md_stripe_count,
+                "stripe_index = %d, stripe_count = %d hash_type = %x"
+                "name = %.*s\n", stripe_index, lsm->lsm_md_stripe_count,
+                lsm->lsm_md_hash_type, namelen, name);
 
        return &lsm->lsm_md_oinfo[stripe_index];
 }
index ee5f3a7..ce5d137 100644 (file)
@@ -106,6 +106,13 @@ int lmv_name_to_stripe_index(enum lmv_hash_type hashtype,
        case LMV_HASH_TYPE_FNV_1A_64:
                idx = lmv_hash_fnv1a(max_mdt_index, name, namelen);
                break;
+       /* LMV_HASH_TYPE_MIGRATION means the file is being migrated,
+        * and the file should be accessed by client, except for
+        * lookup(see lmv_intent_lookup), return -EACCES here */
+       case LMV_HASH_TYPE_MIGRATION:
+               CERROR("%.*s is being migrated: rc = %d\n", namelen,
+                      name, -EACCES);
+               return -EACCES;
        default:
                CERROR("Unknown hash type 0x%x\n", hashtype);
                return -EINVAL;
@@ -1785,6 +1792,8 @@ lmv_locate_target_for_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm,
        const struct lmv_oinfo  *oinfo;
 
        oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
+       if (IS_ERR(oinfo))
+               RETURN((void *)oinfo);
        *fid = oinfo->lmo_fid;
        *mds = oinfo->lmo_mds;
        tgt = lmv_get_target(lmv, *mds);
@@ -1801,7 +1810,8 @@ struct lmv_tgt_desc
        struct lmv_tgt_desc     *tgt;
 
        if (lsm == NULL || lsm->lsm_md_stripe_count <= 1 ||
-           op_data->op_namelen == 0) {
+           op_data->op_namelen == 0 ||
+           lsm->lsm_md_magic == LMV_MAGIC_MIGRATE) {
                tgt = lmv_find_target(lmv, fid);
                if (IS_ERR(tgt))
                        return tgt;
@@ -2046,23 +2056,25 @@ lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
          NULL)
 
-static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
-                            int op_tgt, ldlm_mode_t mode, int bits, int flag)
+static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt,
+                           struct md_op_data *op_data,
+                           int op_tgt, ldlm_mode_t mode, int bits, int flag)
 {
-        struct lu_fid          *fid = md_op_data_fid(op_data, flag);
-        struct obd_device      *obd = exp->exp_obd;
-        struct lmv_obd         *lmv = &obd->u.lmv;
-        struct lmv_tgt_desc    *tgt;
-        ldlm_policy_data_t      policy = {{0}};
-        int                     rc = 0;
-        ENTRY;
+       struct lu_fid          *fid = md_op_data_fid(op_data, flag);
+       struct obd_device      *obd = exp->exp_obd;
+       struct lmv_obd         *lmv = &obd->u.lmv;
+       ldlm_policy_data_t      policy = {{ 0 }};
+       int                     rc = 0;
+       ENTRY;
 
-        if (!fid_is_sane(fid))
-                RETURN(0);
+       if (!fid_is_sane(fid))
+               RETURN(0);
 
-       tgt = lmv_find_target(lmv, fid);
-       if (IS_ERR(tgt))
-               RETURN(PTR_ERR(tgt));
+       if (tgt == NULL) {
+               tgt = lmv_find_target(lmv, fid);
+               if (IS_ERR(tgt))
+                       RETURN(PTR_ERR(tgt));
+       }
 
        if (tgt->ltd_idx != op_tgt) {
                CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
@@ -2112,6 +2124,9 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
 
                oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
                                                op_data->op_namelen);
+               if (IS_ERR(oinfo))
+                       RETURN(PTR_ERR(oinfo));
+
                op_data->op_fid2 = oinfo->lmo_fid;
        }
 
@@ -2123,7 +2138,7 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
         * Cancel UPDATE lock on child (fid1).
         */
        op_data->op_flags |= MF_MDC_CANCEL_FID2;
-       rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
+       rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
                              MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
        if (rc != 0)
                RETURN(rc);
@@ -2158,32 +2173,44 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
        op_data->op_fsuid = current_fsuid();
        op_data->op_fsgid = current_fsgid();
        op_data->op_cap = cfs_curproc_cap_pack();
-
-       if (op_data->op_mea1 != NULL) {
-               struct lmv_stripe_md    *lsm = op_data->op_mea1;
-               const struct lmv_oinfo  *oinfo;
-
-               oinfo = lsm_name_to_stripe_info(lsm, old, oldlen);
-               op_data->op_fid1 = oinfo->lmo_fid;
-               op_data->op_mds = oinfo->lmo_mds;
-               src_tgt = lmv_get_target(lmv, op_data->op_mds);
-               if (IS_ERR(src_tgt))
-                       RETURN(PTR_ERR(src_tgt));
+       if (op_data->op_cli_flags & CLI_MIGRATE) {
+               LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID "DFID"\n",
+                        PFID(&op_data->op_fid3));
+               rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
+               if (rc)
+                       RETURN(rc);
+               src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid3);
        } else {
-               src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
-               if (IS_ERR(src_tgt))
-                       RETURN(PTR_ERR(src_tgt));
+               if (op_data->op_mea1 != NULL) {
+                       struct lmv_stripe_md    *lsm = op_data->op_mea1;
+
+                       src_tgt = lmv_locate_target_for_name(lmv, lsm, old,
+                                                            oldlen,
+                                                            &op_data->op_fid1,
+                                                            &op_data->op_mds);
+                       if (IS_ERR(src_tgt))
+                               RETURN(PTR_ERR(src_tgt));
+               } else {
+                       src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+                       if (IS_ERR(src_tgt))
+                               RETURN(PTR_ERR(src_tgt));
 
-               op_data->op_mds = src_tgt->ltd_idx;
-       }
+                       op_data->op_mds = src_tgt->ltd_idx;
+               }
 
-       if (op_data->op_mea2) {
-               struct lmv_stripe_md    *lsm = op_data->op_mea2;
-               const struct lmv_oinfo  *oinfo;
+               if (op_data->op_mea2) {
+                       struct lmv_stripe_md    *lsm = op_data->op_mea2;
+                       const struct lmv_oinfo  *oinfo;
 
-               oinfo = lsm_name_to_stripe_info(lsm, new, newlen);
-               op_data->op_fid2 = oinfo->lmo_fid;
+                       oinfo = lsm_name_to_stripe_info(lsm, new, newlen);
+                       if (IS_ERR(oinfo))
+                               RETURN(PTR_ERR(oinfo));
+
+                       op_data->op_fid2 = oinfo->lmo_fid;
+               }
        }
+       if (IS_ERR(src_tgt))
+               RETURN(PTR_ERR(src_tgt));
 
        /*
         * LOOKUP lock on src child (fid3) should also be cancelled for
@@ -2195,33 +2222,50 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
         * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
         * own target.
         */
-       rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+       rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
                              LCK_EX, MDS_INODELOCK_UPDATE,
                              MF_MDC_CANCEL_FID2);
 
+       if (rc != 0)
+               RETURN(rc);
        /*
-        * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
+        * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt.
         */
-       if (rc == 0) {
-               rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+       if (fid_is_sane(&op_data->op_fid3)) {
+               struct lmv_tgt_desc *tgt;
+
+               tgt = lmv_find_target(lmv, &op_data->op_fid1);
+               if (IS_ERR(tgt))
+                       RETURN(PTR_ERR(tgt));
+
+               /* Cancel LOOKUP lock on its parent */
+               rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx,
                                      LCK_EX, MDS_INODELOCK_LOOKUP,
-                                     MF_MDC_CANCEL_FID4);
+                                     MF_MDC_CANCEL_FID3);
+               if (rc != 0)
+                       RETURN(rc);
+
+               rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
+                                     LCK_EX, MDS_INODELOCK_FULL,
+                                     MF_MDC_CANCEL_FID3);
+               if (rc != 0)
+                       RETURN(rc);
        }
 
        /*
         * Cancel all the locks on tgt child (fid4).
         */
-       if (rc == 0)
-               rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+       if (fid_is_sane(&op_data->op_fid4))
+               rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
                                      LCK_EX, MDS_INODELOCK_FULL,
                                      MF_MDC_CANCEL_FID4);
 
        CDEBUG(D_INODE, DFID":m%d to "DFID"\n", PFID(&op_data->op_fid1),
               op_data->op_mds, PFID(&op_data->op_fid2));
 
-       if (rc == 0)
-               rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
-                              new, newlen, request);
+       rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen, new, newlen,
+                      request);
+
        RETURN(rc);
 }
 
@@ -2472,6 +2516,7 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
        struct lmv_tgt_desc     *tgt = NULL;
+       struct lmv_tgt_desc     *parent_tgt = NULL;
        struct mdt_body         *body;
        int                     rc;
        ENTRY;
@@ -2489,13 +2534,18 @@ retry:
                /* For striped dir, we need to locate the parent as well */
                if (op_data->op_mea1 != NULL &&
                    op_data->op_mea1->lsm_md_stripe_count > 1) {
+                       struct lmv_tgt_desc *tmp;
+
                        LASSERT(op_data->op_name != NULL &&
                                op_data->op_namelen != 0);
-                       lmv_locate_target_for_name(lmv, op_data->op_mea1,
+                       tmp = lmv_locate_target_for_name(lmv,
+                                                  op_data->op_mea1,
                                                   op_data->op_name,
                                                   op_data->op_namelen,
                                                   &op_data->op_fid1,
                                                   &op_data->op_mds);
+                       if (IS_ERR(tmp))
+                               RETURN(PTR_ERR(tmp));
                }
        } else {
                tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
@@ -2519,9 +2569,18 @@ retry:
        /*
         * Cancel FULL locks on child (fid3).
         */
-       rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
-                             MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
+       parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+       if (IS_ERR(parent_tgt))
+               RETURN(PTR_ERR(parent_tgt));
+
+       if (parent_tgt != tgt) {
+               rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx,
+                                     LCK_EX, MDS_INODELOCK_LOOKUP,
+                                     MF_MDC_CANCEL_FID3);
+       }
 
+       rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
+                             MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
        if (rc != 0)
                RETURN(rc);
 
@@ -2854,12 +2913,25 @@ int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp,
        }
 
        /* Unpack memmd */
-       if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1) {
-               CERROR("%s: invalid magic %x.\n", exp->exp_obd->obd_name,
-                      le32_to_cpu(lmm->lmv_magic));
-               RETURN(-EINVAL);
+       if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 &&
+           le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_MIGRATE &&
+           le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) {
+               CERROR("%s: invalid lmv magic %x: rc = %d\n",
+                      exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic),
+                      -EIO);
+               RETURN(-EIO);
        }
 
+       if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1 ||
+           le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_MIGRATE)
+               lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
+       else
+               /**
+                * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md,
+                * stripecount should be 0 then.
+                */
+               lsm_size = lmv_stripe_md_size(0);
+
        lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
        if (lsm == NULL) {
                OBD_ALLOC(lsm, lsm_size);
@@ -2871,6 +2943,7 @@ int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp,
 
        switch (le32_to_cpu(lmm->lmv_magic)) {
        case LMV_MAGIC_V1:
+       case LMV_MAGIC_MIGRATE:
                rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1);
                break;
        default:
index bfa8b55..dc5201e 100644 (file)
@@ -614,6 +614,7 @@ repeat:
                info->lti_buf.lb_len = info->lti_ea_store_size;
                rc = dt_xattr_get(env, next, &info->lti_buf, name, BYPASS_CAPA);
        }
+
        /* if object is not striped or inaccessible */
        if (rc == -ENODATA || rc == -ENOENT)
                RETURN(0);
index ef9781d..5462d79 100644 (file)
@@ -629,6 +629,9 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
        int                     rc = 0;
        ENTRY;
 
+       if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_MIGRATE)
+               RETURN(0);
+
        if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
                RETURN(-EINVAL);
 
@@ -861,7 +864,8 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                        GOTO(out_put, rc);
        }
 
-       rc = dt_declare_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, 0, th);
+       rc = dt_declare_xattr_set(env, dt_object_child(dt), &lmv_buf,
+                                 XATTR_NAME_LMV, 0, th);
        if (rc != 0)
                GOTO(out_put, rc);
 
@@ -1203,7 +1207,8 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                                  capa);
        }
 
-       rc = dt_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, fl, th, capa);
+       rc = dt_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV,
+                         fl, th, capa);
 
        RETURN(rc);
 }
@@ -1352,6 +1357,11 @@ static int lod_cache_parent_lov_striping(const struct lu_env *env,
        if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0)
                GOTO(unlock, rc = 0);
 
+       CDEBUG(D_INFO, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d\n",
+              PFID(lu_object_fid(&lp->ldo_obj.do_lu)),
+              (int)v1->lmm_stripe_count,
+              (int)v1->lmm_stripe_size, (int)v1->lmm_stripe_offset);
+
        lp->ldo_def_stripenr = v1->lmm_stripe_count;
        lp->ldo_def_stripe_size = v1->lmm_stripe_size;
        lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
@@ -1681,22 +1691,33 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt,
                GOTO(out, rc = -ENOMEM);
        }
 
-       /* choose OST and generate appropriate objects */
-       rc = lod_qos_prep_create(env, lo, attr, lovea, th);
-       if (rc) {
-               /* failed to create striping, let's reset
-                * config so that others don't get confused */
-               lod_object_free_striping(env, lo);
-               GOTO(out, rc);
-       }
+       if (!dt_object_remote(next)) {
+               /* choose OST and generate appropriate objects */
+               rc = lod_qos_prep_create(env, lo, attr, lovea, th);
+               if (rc) {
+                       /* failed to create striping, let's reset
+                        * config so that others don't get confused */
+                       lod_object_free_striping(env, lo);
+                       GOTO(out, rc);
+               }
 
-       /*
-        * declare storage for striping data
-        */
-       info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
+               /*
+                * declare storage for striping data
+                */
+               info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr,
                                lo->ldo_pool ?  LOV_MAGIC_V3 : LOV_MAGIC_V1);
-       rc = dt_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV,
-                                 0, th);
+       } else {
+               /* LOD can not choose OST objects for remote objects, i.e.
+                * stripes must be ready before that. Right now, it can only
+                * happen during migrate, i.e. migrate process needs to create
+                * remote regular file (mdd_migrate_create), then the migrate
+                * process will provide stripeEA. */
+               LASSERT(lovea != NULL);
+               info->lti_buf = *lovea;
+       }
+
+       rc = dt_declare_xattr_set(env, next, &info->lti_buf,
+                                 XATTR_NAME_LOV, 0, th);
        if (rc)
                GOTO(out, rc);
 
@@ -1892,7 +1913,11 @@ static int lod_declare_object_create(const struct lu_env *env,
                        rc = lod_declare_striped_object(env, dt, attr,
                                                        NULL, th);
        } else if (dof->dof_type == DFT_DIR) {
-               rc = lod_declare_dir_striping_create(env, dt, attr, dof, th);
+               /* Orphan object (like migrating object) does not have
+                * lod_dir_stripe, see lod_ah_init */
+               if (lo->ldo_dir_stripe != NULL)
+                       rc = lod_declare_dir_striping_create(env, dt, attr,
+                                                            dof, th);
        }
 out:
        RETURN(rc);
@@ -1936,7 +1961,8 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
        rc = dt_create(env, next, attr, hint, dof, th);
 
        if (rc == 0) {
-               if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
+               if (S_ISDIR(dt->do_lu.lo_header->loh_attr) &&
+                   lo->ldo_dir_stripe != NULL)
                        rc = lod_dir_striping_create(env, dt, attr, dof, th);
                else if (lo->ldo_stripe && dof->u.dof_reg.striped != 0)
                        rc = lod_striping_create(env, dt, attr, dof, th);
@@ -2244,11 +2270,11 @@ static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
 
 static ssize_t lod_declare_write(const struct lu_env *env,
                                 struct dt_object *dt,
-                                const loff_t size, loff_t pos,
+                                const struct lu_buf *buf, loff_t pos,
                                 struct thandle *th)
 {
        return dt_declare_record_write(env, dt_object_child(dt),
-                                      size, pos, th);
+                                      buf, pos, th);
 }
 
 static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt,
index 3de8ca5..c55dd8c 100644 (file)
@@ -41,6 +41,7 @@
 #endif
 #include <lustre_net.h>
 #include <lustre/lustre_idl.h>
+#include <obd_class.h>
 #include <obd.h>
 #include <cl_object.h>
 #include <lclient.h>
@@ -455,7 +456,8 @@ void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
 
         /* XXX do something about time, uid, gid */
-        rec->rn_opcode   = REINT_RENAME;
+       rec->rn_opcode  = op_data->op_cli_flags & CLI_MIGRATE ?
+                                       REINT_MIGRATE : REINT_RENAME;
         rec->rn_fsuid    = op_data->op_fsuid;
         rec->rn_fsgid    = op_data->op_fsgid;
         rec->rn_cap      = op_data->op_cap;
@@ -467,7 +469,7 @@ void mdc_rename_pack(struct ptlrpc_request *req, struct md_op_data *op_data,
         rec->rn_mode     = op_data->op_mode;
         rec->rn_bias     = op_data->op_bias;
 
-        mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
+       mdc_pack_capa(req, &RMF_CAPA1, op_data->op_capa1);
         mdc_pack_capa(req, &RMF_CAPA2, op_data->op_capa2);
 
         tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
index e0cb962..24eaee9 100644 (file)
@@ -58,24 +58,6 @@ struct mdc_getattr_args {
         struct ldlm_enqueue_info    *ga_einfo;
 };
 
-int it_disposition(struct lookup_intent *it, int flag)
-{
-        return it->d.lustre.it_disposition & flag;
-}
-EXPORT_SYMBOL(it_disposition);
-
-void it_set_disposition(struct lookup_intent *it, int flag)
-{
-        it->d.lustre.it_disposition |= flag;
-}
-EXPORT_SYMBOL(it_set_disposition);
-
-void it_clear_disposition(struct lookup_intent *it, int flag)
-{
-        it->d.lustre.it_disposition &= ~flag;
-}
-EXPORT_SYMBOL(it_clear_disposition);
-
 int it_open_error(int phase, struct lookup_intent *it)
 {
        if (it_disposition(it, DISP_OPEN_LEASE)) {
index bef36b5..4ec2946 100644 (file)
@@ -1088,6 +1088,9 @@ int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
 {
        int rc;
 
+       if (!mdd_object_exists(mdd_obj))
+               return -ENODATA;
+
        /* First try a small buf */
        LASSERT(env != NULL);
        ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
@@ -1095,9 +1098,6 @@ int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
        if (ldata->ld_buf->lb_buf == NULL)
                return -ENOMEM;
 
-       if (!mdd_object_exists(mdd_obj))
-               return -ENODATA;
-
        rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK,
                          BYPASS_CAPA);
        if (rc == -ERANGE) {
@@ -1113,8 +1113,11 @@ int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
                rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
                                  XATTR_NAME_LINK, BYPASS_CAPA);
        }
-       if (rc < 0)
+       if (rc < 0) {
+               lu_buf_free(ldata->ld_buf);
+               ldata->ld_buf = NULL;
                return rc;
+       }
 
        return linkea_init(ldata);
 }
@@ -1150,7 +1153,7 @@ int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
        int     ea_len;
        void    *linkea;
 
-       if (ldata != NULL && ldata->ld_lee != NULL) {
+       if (ldata != NULL && ldata->ld_leh != NULL) {
                ea_len = ldata->ld_leh->leh_len;
                linkea = ldata->ld_buf->lb_buf;
        } else {
@@ -1312,10 +1315,9 @@ out_pending:
         return rc;
 }
 
-int mdd_declare_finish_unlink(const struct lu_env *env,
-                             struct mdd_object *obj,
-                             struct md_attr *ma,
-                             struct thandle *handle)
+static int mdd_declare_finish_unlink(const struct lu_env *env,
+                                    struct mdd_object *obj,
+                                    struct thandle *handle)
 {
        int     rc;
 
@@ -1419,7 +1421,7 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
                if (rc)
                        return rc;
 
-               rc = mdd_declare_finish_unlink(env, c, ma, handle);
+               rc = mdd_declare_finish_unlink(env, c, handle);
                if (rc)
                        return rc;
 
@@ -1646,6 +1648,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
        struct thandle    *handle;
        const struct lu_buf *buf;
        struct lu_attr    *attr = MDD_ENV_VAR(env, cattr);
+       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
        int                rc;
        ENTRY;
 
@@ -1667,7 +1670,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
                RETURN(rc);
 
        /* calling ->ah_make_hint() is used to transfer information from parent */
-       mdd_object_make_hint(env, mdd_pobj, son, attr, spec);
+       mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
 
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
@@ -1719,8 +1722,7 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
                                         struct mdd_object *parent,
                                         struct mdd_object *child,
                                         struct lu_attr *attr,
-                                        struct thandle *handle,
-                                        struct linkea_data *ldata)
+                                        struct thandle *handle)
 {
         int rc;
        ENTRY;
@@ -1745,19 +1747,14 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
                                              dotdot, handle);
         }
 
-       if (rc == 0)
-               mdd_declare_links_add(env, child, handle, ldata);
-
        RETURN(rc);
 }
 
 static int mdd_object_initialize(const struct lu_env *env,
                                 const struct lu_fid *pfid,
-                                const struct lu_name *lname,
                                 struct mdd_object *child,
                                 struct lu_attr *attr, struct thandle *handle,
-                                const struct md_op_spec *spec,
-                                struct linkea_data *ldata)
+                                const struct md_op_spec *spec)
 {
         int rc;
         ENTRY;
@@ -1785,9 +1782,6 @@ static int mdd_object_initialize(const struct lu_env *env,
                         mdo_ref_del(env, child, handle);
         }
 
-       if (rc == 0)
-               mdd_links_add(env, child, pfid, lname, handle, ldata, 1);
-
        RETURN(rc);
 }
 
@@ -1870,19 +1864,20 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
-                             struct mdd_object *p, struct mdd_object *c,
-                             const struct lu_name *name,
-                             struct lu_attr *attr,
-                             struct thandle *handle,
-                             const struct md_op_spec *spec,
-                             struct linkea_data *ldata,
-                             struct lu_buf *def_acl_buf,
-                             struct lu_buf *acl_buf)
+static int mdd_declare_object_create(const struct lu_env *env,
+                                    struct mdd_device *mdd,
+                                    struct mdd_object *p, struct mdd_object *c,
+                                    struct lu_attr *attr,
+                                    struct thandle *handle,
+                                    const struct md_op_spec *spec,
+                                    struct lu_buf *def_acl_buf,
+                                    struct lu_buf *acl_buf,
+                                    struct dt_allocation_hint *hint)
 {
        int rc;
 
-       rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec);
+       rc = mdd_declare_object_create_internal(env, p, c, attr, handle, spec,
+                                               hint);
         if (rc)
                 GOTO(out, rc);
 
@@ -1907,22 +1902,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                        GOTO(out, rc);
        }
 #endif
-
-       if (S_ISDIR(attr->la_mode)) {
-               rc = mdo_declare_ref_add(env, p, handle);
-               if (rc)
-                       GOTO(out, rc);
-        }
-
-       rc = mdd_declare_object_initialize(env, p, c, attr, handle, ldata);
-       if (rc)
-               GOTO(out, rc);
-
-       if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
-               rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
-       else
-               rc = mdo_declare_index_insert(env, p, mdo2fid(c),
-                                             name->ln_name, handle);
+       rc = mdd_declare_object_initialize(env, p, c, attr, handle);
        if (rc)
                GOTO(out, rc);
 
@@ -1940,15 +1920,58 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
        }
 
        if (S_ISLNK(attr->la_mode)) {
+               const char *target_name = spec->u.sp_symname;
+               int sym_len = strlen(target_name);
+               const struct lu_buf *buf;
+
+               buf = mdd_buf_get_const(env, target_name, sym_len);
                 rc = dt_declare_record_write(env, mdd_object_child(c),
-                                             strlen(spec->u.sp_symname), 0,
-                                             handle);
+                                            buf, 0, handle);
                 if (rc)
                         GOTO(out, rc);
         }
+out:
+       return rc;
+}
+
+static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
+                             struct mdd_object *p, struct mdd_object *c,
+                             const struct lu_name *name,
+                             struct lu_attr *attr,
+                             struct thandle *handle,
+                             const struct md_op_spec *spec,
+                             struct linkea_data *ldata,
+                             struct lu_buf *def_acl_buf,
+                             struct lu_buf *acl_buf,
+                             struct dt_allocation_hint *hint)
+{
+       int rc;
+
+       rc = mdd_declare_object_create(env, mdd, p, c, attr, handle, spec,
+                                      def_acl_buf, acl_buf, hint);
+       if (rc)
+               GOTO(out, rc);
+
+       if (S_ISDIR(attr->la_mode)) {
+               rc = mdo_declare_ref_add(env, p, handle);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       if (spec->sp_cr_flags & MDS_OPEN_VOLATILE) {
+               rc = orph_declare_index_insert(env, c, attr->la_mode, handle);
+               if (rc)
+                       GOTO(out, rc);
+       } else {
+               struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
 
-       if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
-               struct lu_attr  *la = &mdd_env_info(env)->mti_la_for_fix;
+               rc = mdo_declare_index_insert(env, p, mdo2fid(c), name->ln_name,
+                                             handle);
+               if (rc)
+                       return rc;
+               rc = mdd_declare_links_add(env, c, handle, ldata);
+               if (rc)
+                       return rc;
 
                *la = *attr;
                la->la_valid = LA_CTIME | LA_MTIME;
@@ -2013,6 +2036,108 @@ static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
        RETURN(rc);
 }
 
+/**
+ * Create a metadata object and initialize it, set acl, xattr.
+ **/
+static int mdd_object_create(const struct lu_env *env, struct mdd_object *pobj,
+                            struct mdd_object *son, struct lu_attr *attr,
+                            struct md_op_spec *spec, struct lu_buf *acl_buf,
+                            struct lu_buf *def_acl_buf,
+                            struct dt_allocation_hint *hint,
+                            struct thandle *handle)
+{
+       int                     rc;
+
+       mdd_write_lock(env, son, MOR_TGT_CHILD);
+       rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec,
+                                       hint);
+       if (rc)
+               GOTO(unlock, rc);
+
+#ifdef CONFIG_FS_POSIX_ACL
+       if (def_acl_buf != NULL && def_acl_buf->lb_len > 0 &&
+           S_ISDIR(attr->la_mode)) {
+               /* set default acl */
+               rc = mdo_xattr_set(env, son, def_acl_buf,
+                                  XATTR_NAME_ACL_DEFAULT, 0,
+                                  handle, BYPASS_CAPA);
+               if (rc)
+                       GOTO(err_destroy, rc);
+       }
+       /* set its own acl */
+       if (acl_buf != NULL && acl_buf->lb_len > 0) {
+               rc = mdo_xattr_set(env, son, acl_buf,
+                                  XATTR_NAME_ACL_ACCESS,
+                                  0, handle, BYPASS_CAPA);
+               if (rc)
+                       GOTO(err_destroy, rc);
+       }
+#endif
+
+       rc = mdd_object_initialize(env, mdo2fid(pobj), son, attr, handle,
+                                  spec);
+       if (rc != 0)
+               GOTO(err_destroy, rc);
+
+       /*
+        * in case of replay we just set LOVEA provided by the client
+        * XXX: I think it would be interesting to try "old" way where
+        *      MDT calls this xattr_set(LOV) in a different transaction.
+        *      probably this way we code can be made better.
+        */
+       if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA &&
+                               S_ISREG(attr->la_mode))) {
+               const struct lu_buf *buf;
+
+               buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
+                               spec->u.sp_ea.eadatalen);
+               rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
+                                  BYPASS_CAPA);
+               if (rc != 0)
+                       GOTO(err_destroy, rc);
+       }
+
+       if (S_ISLNK(attr->la_mode)) {
+               struct lu_ucred  *uc = lu_ucred_assert(env);
+               struct dt_object *dt = mdd_object_child(son);
+               const char *target_name = spec->u.sp_symname;
+               int sym_len = strlen(target_name);
+               const struct lu_buf *buf;
+               loff_t pos = 0;
+
+               buf = mdd_buf_get_const(env, target_name, sym_len);
+               rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
+                                               mdd_object_capa(env, son),
+                                               uc->uc_cap &
+                                               CFS_CAP_SYS_RESOURCE_MASK);
+
+               if (rc == sym_len)
+                       rc = 0;
+               else
+                       GOTO(err_initlized, rc = -EFAULT);
+       }
+
+err_initlized:
+       if (unlikely(rc != 0)) {
+               int rc2;
+               if (S_ISDIR(attr->la_mode)) {
+                       /* Drop the reference, no need to delete "."/"..",
+                        * because the object to be destroied directly. */
+                       rc2 = mdo_ref_del(env, son, handle);
+                       if (rc2 != 0)
+                               GOTO(unlock, rc);
+               }
+               rc2 = mdo_ref_del(env, son, handle);
+               if (rc2 != 0)
+                       GOTO(unlock, rc);
+err_destroy:
+               mdo_destroy(env, son, handle);
+       }
+unlock:
+       mdd_write_unlock(env, son);
+       RETURN(rc);
+}
+
 /*
  * Create object and insert it into namespace.
  */
@@ -2032,7 +2157,8 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        struct lu_buf           def_acl_buf;
        struct linkea_data      *ldata = &info->mti_link_data;
        const char              *name = lname->ln_name;
-       int                      rc, created = 0, initialized = 0, inserted = 0;
+       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
+       int                      rc;
        ENTRY;
 
         /*
@@ -2083,26 +2209,28 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
                GOTO(out_free, rc = -EINPROGRESS);
 
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               GOTO(out_free, rc = PTR_ERR(handle));
+
        acl_buf.lb_buf = info->mti_xattr_buf;
        acl_buf.lb_len = sizeof(info->mti_xattr_buf);
        def_acl_buf.lb_buf = info->mti_key;
        def_acl_buf.lb_len = sizeof(info->mti_key);
        rc = mdd_acl_init(env, mdd_pobj, attr, &def_acl_buf, &acl_buf);
        if (rc < 0)
-               GOTO(out_free, rc);
-
-       mdd_object_make_hint(env, mdd_pobj, son, attr, spec);
+               GOTO(out_stop, rc);
 
-        handle = mdd_trans_create(env, mdd);
-        if (IS_ERR(handle))
-                GOTO(out_free, rc = PTR_ERR(handle));
+       mdd_object_make_hint(env, mdd_pobj, son, attr, spec, hint);
 
        memset(ldata, 0, sizeof(*ldata));
-       mdd_linkea_prepare(env, son, NULL, NULL, mdd_object_fid(mdd_pobj),
-                          lname, 1, 0, ldata);
+       rc = mdd_linkea_prepare(env, son, NULL, NULL,
+                               mdd_object_fid(mdd_pobj),
+                               lname, 1, 0, ldata);
 
        rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
-                               handle, spec, ldata, &def_acl_buf, &acl_buf);
+                               handle, spec, ldata, &def_acl_buf, &acl_buf,
+                               hint);
         if (rc)
                 GOTO(out_stop, rc);
 
@@ -2110,131 +2238,52 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
         if (rc)
                 GOTO(out_stop, rc);
 
-       mdd_write_lock(env, son, MOR_TGT_CHILD);
-       rc = mdd_object_create_internal(env, NULL, son, attr, handle, spec);
-       if (rc) {
-               mdd_write_unlock(env, son);
-               GOTO(cleanup, rc);
-       }
-
-       created = 1;
-
-#ifdef CONFIG_FS_POSIX_ACL
-       if (def_acl_buf.lb_len > 0 && S_ISDIR(attr->la_mode)) {
-               /* set default acl */
-               rc = mdo_xattr_set(env, son, &def_acl_buf,
-                                  XATTR_NAME_ACL_DEFAULT, 0,
-                                  handle, BYPASS_CAPA);
-               if (rc) {
-                       mdd_write_unlock(env, son);
-                       GOTO(cleanup, rc);
-               }
-       }
-       /* set its own acl */
-       if (acl_buf.lb_len > 0) {
-               rc = mdo_xattr_set(env, son, &acl_buf,
-                                  XATTR_NAME_ACL_ACCESS,
-                                  0, handle, BYPASS_CAPA);
-               if (rc) {
-                       mdd_write_unlock(env, son);
-                       GOTO(cleanup, rc);
-               }
-       }
-#endif
-
-       rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
-                                  son, attr, handle, spec, ldata);
-
-       /*
-        * in case of replay we just set LOVEA provided by the client
-        * XXX: I think it would be interesting to try "old" way where
-        *      MDT calls this xattr_set(LOV) in a different transaction.
-        *      probably this way we code can be made better.
-        */
-       if (rc == 0 && (spec->no_create ||
-                       (spec->sp_cr_flags & MDS_OPEN_HAS_EA &&
-                        S_ISREG(attr->la_mode)))) {
-               const struct lu_buf *buf;
-
-               buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
-                               spec->u.sp_ea.eadatalen);
-               rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
-                                  BYPASS_CAPA);
-       }
-
-       if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
-               rc = __mdd_orphan_add(env, son, handle);
-
-       mdd_write_unlock(env, son);
-
+       rc = mdd_object_create(env, mdd_pobj, son, attr, spec, &acl_buf,
+                              &def_acl_buf, hint, handle);
        if (rc != 0)
-               /*
-                * Object has no links, so it will be destroyed when last
-                * reference is released. (XXX not now.)
-                */
-               GOTO(cleanup, rc);
-
-       initialized = 1;
+               GOTO(out_stop, rc);
 
-       if (!(spec->sp_cr_flags & MDS_OPEN_VOLATILE))
+       if (unlikely(spec->sp_cr_flags & MDS_OPEN_VOLATILE)) {
+               mdd_write_lock(env, son, MOR_TGT_CHILD);
+               rc = __mdd_orphan_add(env, son, handle);
+               mdd_write_unlock(env, son);
+               if (rc != 0)
+                       GOTO(err_created, rc);
+       } else {
                rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
                                        name, S_ISDIR(attr->la_mode), handle,
                                        mdd_object_capa(env, mdd_pobj));
+               if (rc != 0)
+                       GOTO(err_created, rc);
 
-       if (rc != 0)
-               GOTO(cleanup, rc);
-
-       inserted = 1;
-
-        if (S_ISLNK(attr->la_mode)) {
-               struct lu_ucred  *uc = lu_ucred_assert(env);
-                struct dt_object *dt = mdd_object_child(son);
-                const char *target_name = spec->u.sp_symname;
-                int sym_len = strlen(target_name);
-                const struct lu_buf *buf;
-                loff_t pos = 0;
-
-                buf = mdd_buf_get_const(env, target_name, sym_len);
-               rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
-                                               mdd_object_capa(env, son),
-                                               uc->uc_cap &
-                                               CFS_CAP_SYS_RESOURCE_MASK);
-
-                if (rc == sym_len)
-                        rc = 0;
-                else
-                        GOTO(cleanup, rc = -EFAULT);
-        }
-
-       /* volatile file creation does not update parent directory times */
-       if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
-               GOTO(cleanup, rc = 0);
+               mdd_links_add(env, son, mdo2fid(mdd_pobj), lname, handle,
+                             ldata, 1);
 
-       /* update parent directory mtime/ctime */
-       *la = *attr;
-       la->la_valid = LA_CTIME | LA_MTIME;
-       rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
-       if (rc)
-               GOTO(cleanup, rc);
+               /* update parent directory mtime/ctime */
+               *la = *attr;
+               la->la_valid = LA_CTIME | LA_MTIME;
+               rc = mdd_update_time(env, mdd_pobj, pattr, la, handle);
+               if (rc)
+                       GOTO(err_insert, rc);
+       }
 
-        EXIT;
-cleanup:
-       if (rc != 0 && created != 0) {
+       EXIT;
+err_insert:
+       if (rc != 0) {
                int rc2;
 
-               if (inserted != 0) {
-                       if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
-                               rc2 = __mdd_orphan_del(env, son, handle);
-                       else
-                               rc2 = __mdd_index_delete(env, mdd_pobj, name,
-                                                        S_ISDIR(attr->la_mode),
-                                                        handle, BYPASS_CAPA);
-                       if (rc2 != 0)
-                               goto out_stop;
-               }
+               if (spec->sp_cr_flags & MDS_OPEN_VOLATILE)
+                       rc2 = __mdd_orphan_del(env, son, handle);
+               else
+                       rc2 = __mdd_index_delete(env, mdd_pobj, name,
+                                                S_ISDIR(attr->la_mode),
+                                                handle, BYPASS_CAPA);
+               if (rc2 != 0)
+                       goto out_stop;
 
+err_created:
                mdd_write_lock(env, son, MOR_TGT_CHILD);
-               if (initialized != 0 && S_ISDIR(attr->la_mode)) {
+               if (S_ISDIR(attr->la_mode)) {
                        /* Drop the reference, no need to delete "."/"..",
                         * because the object to be destroied directly. */
                        rc2 = mdo_ref_del(env, son, handle);
@@ -2243,7 +2292,6 @@ cleanup:
                                goto out_stop;
                        }
                }
-
                rc2 = mdo_ref_del(env, son, handle);
                if (rc2 != 0) {
                        mdd_write_unlock(env, son);
@@ -2470,7 +2518,7 @@ static int mdd_declare_rename(const struct lu_env *env,
                if (rc)
                        return rc;
 
-               rc = mdd_declare_finish_unlink(env, mdd_tobj, ma, handle);
+               rc = mdd_declare_finish_unlink(env, mdd_tobj, handle);
                if (rc)
                        return rc;
         }
@@ -2780,12 +2828,1146 @@ out_pending:
        return rc;
 }
 
+/**
+ * During migration once the parent FID has been changed,
+ * we need update the parent FID in linkea.
+ **/
+static int mdd_linkea_update_child_internal(const struct lu_env *env,
+                                           struct mdd_object *parent,
+                                           struct mdd_object *child,
+                                           const char *name, int namelen,
+                                           struct thandle *handle,
+                                           bool declare)
+{
+       struct mdd_thread_info  *info = mdd_env_info(env);
+       struct linkea_data      ldata = {0};
+       struct lu_buf           *buf = &info->mti_link_buf;
+       int                     count;
+       int                     rc = 0;
+
+       ENTRY;
+
+       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       if (buf->lb_buf == NULL)
+               RETURN(-ENOMEM);
+
+       ldata.ld_buf = buf;
+       rc = mdd_links_read(env, child, &ldata);
+       if (rc != 0) {
+               if (rc == -ENOENT || rc == -ENODATA)
+                       rc = 0;
+               RETURN(rc);
+       }
+
+       LASSERT(ldata.ld_leh != NULL);
+       ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
+       for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
+               struct mdd_device *mdd = mdo2mdd(&child->mod_obj);
+               struct lu_name lname;
+               struct lu_fid  fid;
+
+               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
+                                   &lname, &fid);
+
+               if (strncmp(lname.ln_name, name, namelen) ||
+                   lu_fid_eq(&fid, mdd_object_fid(parent))) {
+                       ldata.ld_lee = (struct link_ea_entry *)
+                                      ((char *)ldata.ld_lee +
+                                       ldata.ld_reclen);
+                       continue;
+               }
+
+               CDEBUG(D_INFO, "%s: update "DFID" with %.*s:"DFID"\n",
+                      mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(child)),
+                      lname.ln_namelen, lname.ln_name,
+                      PFID(mdd_object_fid(parent)));
+               /* update to the new parent fid */
+               linkea_entry_pack(ldata.ld_lee, &lname,
+                                 mdd_object_fid(parent));
+               if (declare)
+                       rc = mdd_declare_links_add(env, child, handle, &ldata);
+               else
+                       rc = mdd_links_write(env, child, &ldata, handle);
+               break;
+       }
+       RETURN(rc);
+}
+
+static int mdd_linkea_declare_update_child(const struct lu_env *env,
+                                          struct mdd_object *parent,
+                                          struct mdd_object *child,
+                                          const char *name, int namelen,
+                                          struct thandle *handle)
+{
+       return mdd_linkea_update_child_internal(env, parent, child, name,
+                                               namelen, handle, true);
+}
+
+static int mdd_linkea_update_child(const struct lu_env *env,
+                                  struct mdd_object *parent,
+                                  struct mdd_object *child,
+                                  const char *name, int namelen,
+                                  struct thandle *handle)
+{
+       return mdd_linkea_update_child_internal(env, parent, child, name,
+                                               namelen, handle, false);
+}
+
+static int mdd_update_linkea_internal(const struct lu_env *env,
+                                     struct mdd_object *mdd_pobj,
+                                     struct mdd_object *mdd_sobj,
+                                     struct mdd_object *mdd_tobj,
+                                     const struct lu_name *child_name,
+                                     struct thandle *handle,
+                                     int declare)
+{
+       struct mdd_thread_info  *info = mdd_env_info(env);
+       struct linkea_data      *ldata = &info->mti_link_data;
+       int                     count;
+       int                     rc = 0;
+       ENTRY;
+
+       rc = mdd_links_read(env, mdd_sobj, ldata);
+       if (rc != 0) {
+               if (rc == -ENOENT || rc == -ENODATA)
+                       rc = 0;
+               RETURN(rc);
+       }
+
+       if (declare)
+               rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
+       else
+               rc = mdd_links_write(env, mdd_tobj, ldata, handle);
+
+       if (rc != 0)
+               RETURN(rc);
+
+       /* If it is mulitple links file, we need update the name entry for
+        * all parent */
+       LASSERT(ldata->ld_leh != NULL);
+       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
+       for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
+               struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+               struct mdd_object       *pobj;
+               struct lu_name          lname;
+               struct lu_fid           fid;
+
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
+                                   &lname, &fid);
+               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
+                                                        ldata->ld_reclen);
+               pobj = mdd_object_find(env, mdd, &fid);
+               if (IS_ERR(pobj)) {
+                       CWARN("%s: cannot find obj "DFID": rc = %ld\n",
+                             mdd2obd_dev(mdd)->obd_name, PFID(&fid),
+                             PTR_ERR(pobj));
+                       continue;
+               }
+
+               if (!mdd_object_exists(pobj)) {
+                       CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
+                             mdd2obd_dev(mdd)->obd_name, PFID(&fid));
+                       GOTO(next_put, rc);
+               }
+
+               if (pobj == mdd_pobj &&
+                   lname.ln_namelen == child_name->ln_namelen &&
+                   strncmp(lname.ln_name, child_name->ln_name,
+                           lname.ln_namelen) == 0) {
+                       CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
+                             mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
+                             PFID(&fid));
+                       GOTO(next_put, rc);
+               }
+
+               CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
+                      mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(pobj)),
+                      PNAME(&lname), PFID(mdd_object_fid(mdd_tobj)));
+
+               if (declare) {
+                       /* Remove source name from source directory */
+                       /* Insert new fid with target name into target dir */
+                       rc = mdo_declare_index_delete(env, pobj, lname.ln_name,
+                                                     handle);
+                       if (rc)
+                               GOTO(next_put, rc);
+
+                       rc = mdo_declare_index_insert(env, pobj,
+                                                     mdd_object_fid(mdd_tobj),
+                                                     lname.ln_name, handle);
+                       if (rc)
+                               GOTO(next_put, rc);
+
+                       rc = mdo_declare_ref_add(env, mdd_tobj, handle);
+                       if (rc)
+                               GOTO(next_put, rc);
+
+                       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
+                       if (rc)
+                               GOTO(next_put, rc);
+               } else {
+                       rc = __mdd_index_delete(env, pobj, lname.ln_name,
+                                               0, handle,
+                                               mdd_object_capa(env, pobj));
+                       if (rc)
+                               GOTO(next_put, rc);
+
+                       rc = __mdd_index_insert(env, pobj,
+                                               mdd_object_fid(mdd_tobj),
+                                               lname.ln_name, 0, handle,
+                                               mdd_object_capa(env, pobj));
+                       if (rc)
+                               GOTO(next_put, rc);
+
+                       mdd_write_lock(env, mdd_tobj, MOR_SRC_CHILD);
+                       rc = mdo_ref_add(env, mdd_tobj, handle);
+                       mdd_write_unlock(env, mdd_tobj);
+                       if (rc)
+                               GOTO(next_put, rc);
+
+                       mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
+                       mdo_ref_del(env, mdd_sobj, handle);
+                       mdd_write_unlock(env, mdd_sobj);
+               }
+next_put:
+               mdd_object_put(env, pobj);
+               if (rc != 0)
+                       break;
+       }
+
+       RETURN(rc);
+}
+
+static int mdd_migrate_xattrs(const struct lu_env *env,
+                             struct mdd_object *mdd_sobj,
+                             struct mdd_object *mdd_tobj)
+{
+       struct mdd_thread_info  *info = mdd_env_info(env);
+       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+       char                    *xname;
+       struct thandle          *handle;
+       struct lu_buf           xbuf;
+       int                     xlen;
+       int                     rem;
+       int                     xsize;
+       int                     list_xsize;
+       struct lu_buf           list_xbuf;
+       int                     rc;
+
+       /* retrieve xattr list from the old object */
+       list_xsize = mdo_xattr_list(env, mdd_sobj, &LU_BUF_NULL,
+                                   mdd_object_capa(env, mdd_sobj));
+       if (list_xsize == -ENODATA)
+               return 0;
+
+       if (list_xsize < 0)
+               return list_xsize;
+
+       lu_buf_check_and_alloc(&info->mti_big_buf, list_xsize);
+       if (info->mti_big_buf.lb_buf == NULL)
+               return -ENOMEM;
+
+       list_xbuf.lb_buf = info->mti_big_buf.lb_buf;
+       list_xbuf.lb_len = list_xsize;
+       rc = mdo_xattr_list(env, mdd_sobj, &list_xbuf,
+                           mdd_object_capa(env, mdd_sobj));
+       if (rc < 0)
+               return rc;
+       rc = 0;
+       rem = list_xsize;
+       xname = list_xbuf.lb_buf;
+       while (rem > 0) {
+               xlen = strnlen(xname, rem - 1) + 1;
+               if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
+                   strcmp(XATTR_NAME_LMA, xname) == 0 ||
+                   strcmp(XATTR_NAME_LMV, xname) == 0)
+                       goto next;
+
+               /* For directory, if there are default layout, migrate here */
+               if (strcmp(XATTR_NAME_LOV, xname) == 0 &&
+                   !S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu)))
+                       goto next;
+
+               xsize = mdo_xattr_get(env, mdd_sobj, &LU_BUF_NULL,
+                                     xname,
+                                     mdd_object_capa(env, mdd_sobj));
+               if (xsize == -ENODATA)
+                       goto next;
+               if (xsize < 0)
+                       GOTO(out, rc);
+
+               lu_buf_check_and_alloc(&info->mti_link_buf, xsize);
+               if (info->mti_link_buf.lb_buf == NULL)
+                       GOTO(out, rc = -ENOMEM);
+
+               xbuf.lb_len = xsize;
+               xbuf.lb_buf = info->mti_link_buf.lb_buf;
+               rc = mdo_xattr_get(env, mdd_sobj, &xbuf, xname,
+                                  mdd_object_capa(env, mdd_sobj));
+               if (rc == -ENODATA)
+                       goto next;
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               handle = mdd_trans_create(env, mdd);
+               if (IS_ERR(handle))
+                       GOTO(out, rc = PTR_ERR(handle));
+
+               rc = mdo_declare_xattr_set(env, mdd_tobj, &xbuf, xname, 0,
+                                          handle);
+               if (rc != 0)
+                       GOTO(stop_trans, rc);
+               /* Note: this transaction is part of migration, and it is not
+                * the last step of migration, so we set th_local = 1 to avoid
+                * update last rcvd for this transaction */
+               handle->th_local = 1;
+               rc = mdd_trans_start(env, mdd, handle);
+               if (rc != 0)
+                       GOTO(stop_trans, rc);
+
+               rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle,
+                                  mdd_object_capa(env, mdd_sobj));
+               if (rc == -EEXIST)
+                       GOTO(stop_trans, rc = 0);
+
+               if (rc != 0)
+                       GOTO(stop_trans, rc);
+stop_trans:
+               mdd_trans_stop(env, mdd, rc, handle);
+               if (rc != 0)
+                       GOTO(out, rc);
+next:
+               rem -= xlen;
+               memmove(xname, xname + xlen, rem);
+       }
+out:
+       return rc;
+}
+
+static int mdd_declare_migrate_create(const struct lu_env *env,
+                                     struct mdd_object *mdd_pobj,
+                                     struct mdd_object *mdd_sobj,
+                                     struct mdd_object *mdd_tobj,
+                                     struct md_op_spec *spec,
+                                     struct lu_attr *la,
+                                     union lmv_mds_md *mgr_ea,
+                                     struct thandle *handle)
+{
+       struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
+       const struct lu_buf     *buf;
+       int                     rc;
+       int                     mgr_easize;
+
+       rc = mdd_declare_object_create_internal(env, mdd_pobj, mdd_tobj, la,
+                                               handle, spec, NULL);
+       if (rc != 0)
+               return rc;
+
+       rc = mdd_declare_object_initialize(env, mdd_pobj, mdd_tobj, la,
+                                          handle);
+       if (rc != 0)
+               return rc;
+
+       if (S_ISLNK(la->la_mode)) {
+               const char *target_name = spec->u.sp_symname;
+               int sym_len = strlen(target_name);
+               const struct lu_buf *buf;
+
+               buf = mdd_buf_get_const(env, target_name, sym_len);
+               rc = dt_declare_record_write(env, mdd_object_child(mdd_tobj),
+                                            buf, 0, handle);
+               if (rc != 0)
+                       return rc;
+       }
+
+       if (spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
+               buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
+                                       spec->u.sp_ea.eadatalen);
+               rc = mdo_declare_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
+                                          0, handle);
+               if (rc)
+                       return rc;
+       }
+
+       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_MIGRATE);
+       buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
+       rc = mdo_declare_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV,
+                                  0, handle);
+       if (rc)
+               return rc;
+
+       la_flag->la_valid = LA_FLAGS;
+       la_flag->la_flags = LUSTRE_IMMUTABLE_FL;
+       mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
+       rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
+
+       return rc;
+}
+
+static int mdd_migrate_create(const struct lu_env *env,
+                             struct mdd_object *mdd_pobj,
+                             struct mdd_object *mdd_sobj,
+                             struct mdd_object *mdd_tobj,
+                             struct lu_attr *la)
+{
+       struct mdd_thread_info  *info = mdd_env_info(env);
+       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+       struct md_op_spec       *spec = &info->mti_spec;
+       struct lu_buf           lmm_buf = { 0 };
+       struct lu_buf           link_buf = { 0 };
+       const struct lu_buf     *buf;
+       struct thandle          *handle;
+       struct lmv_mds_md_v1    *mgr_ea;
+       struct lu_attr          *la_flag = MDD_ENV_VAR(env, la_for_fix);
+       int                     mgr_easize;
+       int                     rc;
+       ENTRY;
+
+       /* prepare spec for create */
+       memset(spec, 0, sizeof(*spec));
+       spec->sp_cr_lookup = 0;
+       spec->sp_feat = &dt_directory_features;
+       if (S_ISLNK(la->la_mode)) {
+               buf = lu_buf_check_and_alloc(
+                               &mdd_env_info(env)->mti_big_buf,
+                               la->la_size + 1);
+               link_buf = *buf;
+               link_buf.lb_len = la->la_size + 1;
+               rc = mdd_readlink(env, &mdd_sobj->mod_obj, &link_buf);
+               if (rc <= 0) {
+                       rc = rc != 0 ? rc : -EFAULT;
+                       CERROR("%s: "DFID" readlink failed: rc = %d\n",
+                              mdd2obd_dev(mdd)->obd_name,
+                              PFID(mdd_object_fid(mdd_sobj)), rc);
+                       RETURN(rc);
+               }
+               spec->u.sp_symname = link_buf.lb_buf;
+       } else{
+               /* retrieve lov of the old object */
+               rc = mdd_get_lov_ea(env, mdd_sobj, &lmm_buf);
+               if (rc != 0 && rc != -ENODATA)
+                       RETURN(rc);
+               if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
+                       spec->u.sp_ea.eadata = lmm_buf.lb_buf;
+                       spec->u.sp_ea.eadatalen = lmm_buf.lb_len;
+                       spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
+               }
+       }
+
+       mgr_ea = (struct lmv_mds_md_v1 *)info->mti_xattr_buf;
+       mgr_ea->lmv_magic = cpu_to_le32(LMV_MAGIC_MIGRATE);
+       mgr_ea->lmv_stripe_count = cpu_to_le32(2);
+       mgr_ea->lmv_master_mdt_index = mdd_seq_site(mdd)->ss_node_id;
+       mgr_ea->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_MIGRATION);
+       fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[0], mdd_object_fid(mdd_sobj));
+       fid_cpu_to_le(&mgr_ea->lmv_stripe_fids[1], mdd_object_fid(mdd_tobj));
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               GOTO(out_free, rc = PTR_ERR(handle));
+
+       /* Note: this transaction is part of migration, and it is not
+        * the last step of migration, so we set th_local = 1 to avoid
+        * update last rcvd for this transaction */
+       handle->th_local = 1;
+       rc = mdd_declare_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
+                                       spec, la,
+                                       (union lmv_mds_md *)info->mti_xattr_buf,
+                                       handle);
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       /* create the target object */
+       rc = mdd_object_create(env, mdd_pobj, mdd_tobj, la, spec, NULL, NULL,
+                              NULL, handle);
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       if (lmm_buf.lb_buf != NULL && lmm_buf.lb_len != 0) {
+               buf = mdd_buf_get_const(env, lmm_buf.lb_buf, lmm_buf.lb_len);
+               rc = mdo_xattr_set(env, mdd_tobj, buf, XATTR_NAME_LOV,
+                                  0, handle, mdd_object_capa(env, mdd_sobj));
+               if (rc != 0)
+                       GOTO(stop_trans, rc);
+       }
+
+       /* Set MIGRATE EA on the source inode, so once the migration needs
+        * to be re-done during failover, the re-do process can locate the
+        * target object which is already being created. */
+       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_MIGRATE);
+       buf = mdd_buf_get_const(env, mgr_ea, mgr_easize);
+       rc = mdo_xattr_set(env, mdd_sobj, buf, XATTR_NAME_LMV, 0,
+                          handle, mdd_object_capa(env, mdd_sobj));
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       /* Set immutable flag, so any modification is disabled until
+        * the migration is done. Once the migration is interrupted,
+        * if the resume process find the migrating object has both
+        * IMMUTALBE flag and MIGRATE EA, it need to clear IMMUTABLE
+        * flag and approve the migration */
+       la_flag->la_valid = LA_FLAGS;
+       la_flag->la_flags = LUSTRE_IMMUTABLE_FL;
+       mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
+       rc = mdo_attr_set(env, mdd_sobj, la_flag, handle,
+                         mdd_object_capa(env, mdd_sobj));
+stop_trans:
+       if (handle != NULL)
+               mdd_trans_stop(env, mdd, rc, handle);
+out_free:
+       if (lmm_buf.lb_buf != NULL)
+               OBD_FREE(lmm_buf.lb_buf, lmm_buf.lb_len);
+       RETURN(rc);
+}
+
+static int mdd_migrate_entries(const struct lu_env *env,
+                              struct mdd_object *mdd_sobj,
+                              struct mdd_object *mdd_tobj)
+{
+       struct dt_object        *next = mdd_object_child(mdd_sobj);
+       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+       struct dt_object        *dt_tobj = mdd_object_child(mdd_tobj);
+       struct thandle          *handle;
+       struct dt_it            *it;
+       const struct dt_it_ops  *iops;
+       int                      rc;
+       int                      result;
+       struct lu_dirent        *ent;
+       ENTRY;
+
+       OBD_ALLOC(ent, NAME_MAX + sizeof(*ent) + 1);
+       if (ent == NULL)
+               RETURN(-ENOMEM);
+
+       if (!dt_try_as_dir(env, next))
+               GOTO(out_ent, rc = -ENOTDIR);
+       /*
+        * iterate directories
+        */
+       iops = &next->do_index_ops->dio_it;
+       it = iops->init(env, next, LUDA_FID | LUDA_TYPE,
+                       mdd_object_capa(env, mdd_sobj));
+       if (IS_ERR(it))
+               GOTO(out_ent, rc = PTR_ERR(it));
+
+       rc = iops->load(env, it, 0);
+       if (rc == 0)
+               rc = iops->next(env, it);
+       else if (rc > 0)
+               rc = 0;
+       /*
+        * At this point and across for-loop:
+        *
+        *  rc == 0 -> ok, proceed.
+        *  rc >  0 -> end of directory.
+        *  rc <  0 -> error.
+        */
+       do {
+               struct mdd_object       *child;
+               char                    *name = mdd_env_info(env)->mti_key;
+               int                     len;
+               int                     recsize;
+               int                     is_dir;
+               bool                    target_exist = false;
+
+               len = iops->key_size(env, it);
+               if (len == 0)
+                       goto next;
+
+               result = iops->rec(env, it, (struct dt_rec *)ent,
+                                  LUDA_FID | LUDA_TYPE);
+               if (result == -ESTALE)
+                       goto next;
+               if (result != 0) {
+                       rc = result;
+                       goto out;
+               }
+
+               fid_le_to_cpu(&ent->lde_fid, &ent->lde_fid);
+               recsize = le16_to_cpu(ent->lde_reclen);
+
+               /* Insert new fid with target name into target dir */
+               if ((ent->lde_namelen == 1 && ent->lde_name[0] == '.') ||
+                   (ent->lde_namelen == 2 && ent->lde_name[0] == '.' &&
+                    ent->lde_name[1] == '.'))
+                       goto next;
+
+               child = mdd_object_find(env, mdd, &ent->lde_fid);
+               if (IS_ERR(child))
+                       GOTO(out, rc = PTR_ERR(child));
+
+               is_dir = S_ISDIR(lu_object_attr(&child->mod_obj.mo_lu));
+
+               snprintf(name, ent->lde_namelen + 1, "%s", ent->lde_name);
+
+               /* Check whether the name has been inserted to the target */
+               if (dt_try_as_dir(env, dt_tobj)) {
+                       struct lu_fid *fid = &mdd_env_info(env)->mti_fid2;
+
+                       rc = dt_tobj->do_index_ops->dio_lookup(env, dt_tobj,
+                                                        (struct dt_rec *)fid,
+                                                        (struct dt_key *)name,
+                                               mdd_object_capa(env, mdd_tobj));
+                       if (unlikely(rc == 0))
+                               target_exist = true;
+               }
+
+               handle = mdd_trans_create(env, mdd);
+               if (IS_ERR(handle))
+                       GOTO(out, rc = PTR_ERR(handle));
+
+               /* Note: this transaction is part of migration, and it is not
+                * the last step of migration, so we set th_local = 1 to avoid
+                * updating last rcvd for this transaction */
+               handle->th_local = 1;
+               if (likely(!target_exist)) {
+                       rc = mdo_declare_index_insert(env, mdd_tobj,
+                                                     &ent->lde_fid,
+                                                     name, handle);
+                       if (rc != 0)
+                               GOTO(out_put, rc);
+
+                       if (is_dir) {
+                               rc = mdo_declare_ref_add(env, mdd_tobj, handle);
+                               if (rc != 0)
+                                       GOTO(out_put, rc);
+                       }
+               }
+
+               rc = mdo_declare_index_delete(env, mdd_sobj, name, handle);
+               if (rc != 0)
+                       GOTO(out_put, rc);
+
+               if (is_dir) {
+                       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
+                       if (rc != 0)
+                               GOTO(out_put, rc);
+
+                       /* Update .. for child */
+                       rc = mdo_declare_index_delete(env, child, dotdot,
+                                                     handle);
+                       if (rc != 0)
+                               GOTO(out_put, rc);
+
+                       rc = mdo_declare_index_insert(env, child,
+                                                     mdd_object_fid(mdd_tobj),
+                                                             dotdot, handle);
+                       if (rc != 0)
+                               GOTO(out_put, rc);
+               }
+
+               rc = mdd_linkea_declare_update_child(env, mdd_tobj,
+                                                    child, name,
+                                                    strlen(name),
+                                                    handle);
+               if (rc != 0)
+                       GOTO(out_put, rc);
+
+               rc = mdd_trans_start(env, mdd, handle);
+               if (rc != 0) {
+                       CERROR("%s: transaction start failed: rc = %d\n",
+                              mdd2obd_dev(mdd)->obd_name, rc);
+                       GOTO(out_put, rc);
+               }
+
+               if (likely(!target_exist)) {
+                       rc = __mdd_index_insert(env, mdd_tobj, &ent->lde_fid,
+                                               name, is_dir, handle,
+                                               mdd_object_capa(env, mdd_tobj));
+                       if (rc != 0)
+                               GOTO(out_put, rc);
+
+                       if (is_dir) {
+                               rc = mdo_ref_add(env, mdd_tobj, handle);
+                               if (rc != 0)
+                                       GOTO(out_put, rc);
+
+                       }
+               }
+
+               rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle,
+                                       mdd_object_capa(env, mdd_sobj));
+               if (rc != 0)
+                       GOTO(out_put, rc);
+
+               if (is_dir) {
+                       rc = __mdd_index_delete_only(env, child, dotdot, handle,
+                                                  mdd_object_capa(env, child));
+                       if (rc != 0)
+                               GOTO(out_put, rc);
+
+                       rc = __mdd_index_insert_only(env, child,
+                                        mdd_object_fid(mdd_tobj),
+                                        dotdot, handle,
+                                        mdd_object_capa(env, child));
+                       if (rc != 0)
+                               GOTO(out_put, rc);
+               }
+
+               rc = mdd_linkea_update_child(env, mdd_tobj, child, name,
+                                            strlen(name), handle);
+
+out_put:
+               mdd_object_put(env, child);
+               mdd_trans_stop(env, mdd, rc, handle);
+               if (rc != 0)
+                       GOTO(out, rc);
+next:
+               result = iops->next(env, it);
+               if (OBD_FAIL_CHECK(OBD_FAIL_MIGRATE_ENTRIES))
+                       GOTO(out, rc = -EINTR);
+
+               if (result == -ESTALE)
+                       goto next;
+       } while (result == 0);
+out:
+       iops->put(env, it);
+       iops->fini(env, it);
+out_ent:
+       OBD_FREE(ent, NAME_MAX + sizeof(*ent) + 1);
+       RETURN(rc);
+}
+
+static int mdd_declare_update_linkea(const struct lu_env *env,
+                                    struct mdd_object *mdd_pobj,
+                                    struct mdd_object *mdd_sobj,
+                                    struct mdd_object *mdd_tobj,
+                                    struct thandle *handle,
+                                    const struct lu_name *child_name)
+{
+       return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
+                                         child_name, handle, 1);
+}
+
+static int mdd_update_linkea(const struct lu_env *env,
+                            struct mdd_object *mdd_pobj,
+                            struct mdd_object *mdd_sobj,
+                            struct mdd_object *mdd_tobj,
+                            struct thandle *handle,
+                            const struct lu_name *child_name)
+{
+       return mdd_update_linkea_internal(env, mdd_pobj, mdd_sobj, mdd_tobj,
+                                         child_name, handle, 0);
+}
+
+static int mdd_declare_migrate_update_name(const struct lu_env *env,
+                                          struct mdd_object *mdd_pobj,
+                                          struct mdd_object *mdd_sobj,
+                                          struct mdd_object *mdd_tobj,
+                                          const struct lu_name *lname,
+                                          struct lu_attr *la,
+                                          struct lu_attr *parent_la,
+                                          struct thandle *handle)
+{
+       struct lu_attr  *la_flag = MDD_ENV_VAR(env, tattr);
+       int             rc;
+
+       /* Revert IMMUTABLE flag */
+       la_flag->la_valid = LA_FLAGS;
+       la_flag->la_flags = la->la_flags & ~LUSTRE_IMMUTABLE_FL;
+       mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
+       rc = mdo_declare_attr_set(env, mdd_sobj, la_flag, handle);
+       if (rc != 0)
+               return rc;
+
+       /* delete entry from source dir */
+       rc = mdo_declare_index_delete(env, mdd_pobj, lname->ln_name, handle);
+       if (rc != 0)
+               return rc;
+
+       rc = mdd_declare_update_linkea(env, mdd_pobj, mdd_sobj,
+                                      mdd_tobj, handle, lname);
+       if (rc != 0)
+               return rc;
+
+       if (S_ISREG(mdd_object_type(mdd_sobj))) {
+               rc = mdo_declare_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
+                                          handle);
+               if (rc != 0)
+                       return rc;
+       }
+
+       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
+               rc = mdo_declare_ref_del(env, mdd_pobj, handle);
+               if (rc != 0)
+                       return rc;
+       }
+
+       /* new name */
+       rc = mdo_declare_index_insert(env, mdd_pobj, mdo2fid(mdd_tobj),
+                                     lname->ln_name, handle);
+       if (rc != 0)
+               return rc;
+
+       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
+               rc = mdo_declare_ref_add(env, mdd_pobj, handle);
+               if (rc != 0)
+                       return rc;
+       }
+
+       /* delete old object */
+       rc = mdo_declare_ref_del(env, mdd_sobj, handle);
+       if (rc != 0)
+               return rc;
+
+       if (S_ISDIR(mdd_object_type(mdd_sobj))) {
+               /* delete old object */
+               rc = mdo_declare_ref_del(env, mdd_sobj, handle);
+               if (rc != 0)
+                       return rc;
+               /* set nlink to 0 */
+               rc = mdo_declare_attr_set(env, mdd_sobj, la, handle);
+               if (rc != 0)
+                       return rc;
+       }
+
+       rc = mdd_declare_finish_unlink(env, mdd_sobj, handle);
+       if (rc)
+               return rc;
+
+       rc = mdo_declare_attr_set(env, mdd_pobj, parent_la, handle);
+
+       return rc;
+}
+
+static int mdd_migrate_update_name(const struct lu_env *env,
+                                  struct mdd_object *mdd_pobj,
+                                  struct mdd_object *mdd_sobj,
+                                  struct mdd_object *mdd_tobj,
+                                  const struct lu_name *lname,
+                                  struct md_attr *ma)
+{
+       struct lu_attr          *p_la = MDD_ENV_VAR(env, la_for_fix);
+       struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
+       struct lu_attr          *la_flag = MDD_ENV_VAR(env, tattr);
+       struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
+       struct thandle          *handle;
+       int                     is_dir = S_ISDIR(mdd_object_type(mdd_sobj));
+       const char              *name = lname->ln_name;
+       int                     rc;
+       ENTRY;
+
+       /* update time for parent */
+       LASSERT(ma->ma_attr.la_valid & LA_CTIME);
+       p_la->la_ctime = p_la->la_mtime = ma->ma_attr.la_ctime;
+       p_la->la_valid = LA_CTIME;
+
+       rc = mdd_la_get(env, mdd_sobj, so_attr, mdd_object_capa(env, mdd_sobj));
+       if (rc != 0)
+               RETURN(rc);
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       rc = mdd_declare_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj,
+                                            lname, so_attr, p_la, handle);
+       if (rc != 0) {
+               /* If the migration can not be fit in one transaction, just
+                * leave it in the original MDT */
+               if (rc == -E2BIG)
+                       GOTO(stop_trans, rc = 0);
+               else
+                       GOTO(stop_trans, rc);
+       }
+
+       CDEBUG(D_INFO, "%s: update "DFID"/"DFID" with %s:"DFID"\n",
+              mdd2obd_dev(mdd)->obd_name, PFID(mdd_object_fid(mdd_pobj)),
+              PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
+              PFID(mdd_object_fid(mdd_tobj)));
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       /* Revert IMMUTABLE flag */
+       la_flag->la_valid = LA_FLAGS;
+       la_flag->la_flags = so_attr->la_flags & ~LUSTRE_IMMUTABLE_FL;
+       mdd_flags_xlate(mdd_sobj, la_flag->la_flags);
+       rc = mdo_attr_set(env, mdd_sobj, la_flag, handle,
+                         mdd_object_capa(env, mdd_pobj));
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       /* Remove source name from source directory */
+       rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
+                               mdd_object_capa(env, mdd_pobj));
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_update_linkea(env, mdd_pobj, mdd_sobj, mdd_tobj,
+                              handle, lname);
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       if (S_ISREG(so_attr->la_mode)) {
+               if (so_attr->la_nlink == 1) {
+                       rc = mdo_xattr_del(env, mdd_sobj, XATTR_NAME_LOV,
+                                          handle,
+                                          mdd_object_capa(env, mdd_sobj));
+                       if (rc != 0 && rc != -ENODATA)
+                               GOTO(stop_trans, rc);
+               }
+       }
+
+       /* Insert new fid with target name into target dir */
+       rc = __mdd_index_insert(env, mdd_pobj, mdd_object_fid(mdd_tobj), name,
+                               is_dir, handle, mdd_object_capa(env, mdd_pobj));
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
+                          NULL, 1);
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
+       mdo_ref_del(env, mdd_sobj, handle);
+       if (is_dir)
+               mdo_ref_del(env, mdd_sobj, handle);
+
+       ma->ma_attr = *so_attr;
+       ma->ma_valid |= MA_INODE;
+       rc = mdd_finish_unlink(env, mdd_sobj, ma, handle);
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       rc = mdd_attr_set_internal(env, mdd_pobj, p_la, handle, 0);
+       if (rc != 0)
+               GOTO(stop_trans, rc);
+
+       mdd_write_unlock(env, mdd_sobj);
+
+stop_trans:
+       mdd_trans_stop(env, mdd, rc, handle);
+
+       RETURN(rc);
+}
+
+/**
+ * Check whether we should migrate the file/dir
+ * return val
+ *     < 0  permission check failed or other error.
+ *     = 0  the file can be migrated.
+ *     > 0  the file does not need to be migrated, mostly
+ *          for multiple link file
+ **/
+static int mdd_migrate_sanity_check(const struct lu_env *env,
+                                   struct mdd_object *pobj,
+                                   const struct lu_attr *pattr,
+                                   struct mdd_object *sobj,
+                                   struct lu_attr *sattr)
+{
+       struct mdd_thread_info  *info = mdd_env_info(env);
+       struct linkea_data      *ldata = &info->mti_link_data;
+       int                     mgr_easize;
+       struct lu_buf           *mgr_buf;
+       int                     count;
+       int                     rc;
+
+       ENTRY;
+
+       mgr_easize = lmv_mds_md_size(2, LMV_MAGIC_MIGRATE);
+       mgr_buf = lu_buf_check_and_alloc(&info->mti_big_buf, mgr_easize);
+       if (mgr_buf->lb_buf == NULL)
+               RETURN(-ENOMEM);
+
+       rc = mdo_xattr_get(env, sobj, mgr_buf, XATTR_NAME_LMV,
+                          mdd_object_capa(env, sobj));
+       if (rc > 0) {
+               union lmv_mds_md *lmm = mgr_buf->lb_buf;
+
+               /* If the object has migrateEA, it means IMMUTE flag
+                * is being set by previous migration process, so it
+                * needs to override the IMMUTE flag, otherwise the
+                * following sanity check will fail */
+               if (le32_to_cpu(lmm->lmv_md_v1.lmv_magic) ==
+                                               LMV_MAGIC_MIGRATE) {
+                       struct mdd_device *mdd = mdo2mdd(&sobj->mod_obj);
+
+                       sattr->la_flags &= ~LUSTRE_IMMUTABLE_FL;
+                       sobj->mod_flags &= ~IMMUTE_OBJ;
+                       CDEBUG(D_HA, "%s: "DFID" override IMMUTE FLAG\n",
+                              mdd2obd_dev(mdd)->obd_name,
+                              PFID(mdd_object_fid(sobj)));
+               }
+       }
+
+       rc = mdd_rename_sanity_check(env, pobj, pattr, pobj, pattr,
+                                    sobj, sattr, NULL, NULL);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* Then it will check if the file should be migrated. If the file
+        * has mulitple links, we only need migrate the file if all of its
+        * entries has been migrated to the remote MDT */
+       if (!S_ISREG(sattr->la_mode) || sattr->la_nlink < 2)
+               RETURN(0);
+
+       rc = mdd_links_read(env, sobj, ldata);
+       if (rc != 0) {
+               /* For multiple links files, if there are no linkEA data at all,
+                * means the file might be created before linkEA is enabled, and
+                * all all of its links should not be migrated yet, otherwise
+                * it should have some linkEA there */
+               if (rc == -ENOENT || rc == -ENODATA)
+                       RETURN(1);
+               RETURN(rc);
+       }
+
+       /* If it is mulitple links file, we need update the name entry for
+        * all parent */
+       LASSERT(ldata->ld_leh != NULL);
+       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
+       for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
+               struct mdd_device       *mdd = mdo2mdd(&sobj->mod_obj);
+               struct mdd_object       *lpobj;
+               struct lu_name          lname;
+               struct lu_fid           fid;
+
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
+                                   &lname, &fid);
+               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
+                                                        ldata->ld_reclen);
+               lpobj = mdd_object_find(env, mdd, &fid);
+               if (IS_ERR(lpobj)) {
+                       CWARN("%s: cannot find obj "DFID": rc = %ld\n",
+                             mdd2obd_dev(mdd)->obd_name, PFID(&fid),
+                             PTR_ERR(lpobj));
+                       continue;
+               }
+
+               if (!mdd_object_exists(lpobj) || mdd_object_remote(lpobj)) {
+                       CDEBUG(D_INFO, DFID"%.*s: is on remote MDT.\n",
+                              PFID(&fid), lname.ln_namelen, lname.ln_name);
+                       mdd_object_put(env, lpobj);
+                       continue;
+               }
+
+               CDEBUG(D_INFO, DFID"still has local entry %.*s "DFID"\n",
+                      PFID(mdd_object_fid(sobj)), lname.ln_namelen,
+                      lname.ln_name, PFID(&fid));
+               mdd_object_put(env, lpobj);
+               rc = 1;
+               break;
+       }
+
+       RETURN(rc);
+}
+
+static int mdd_migrate(const struct lu_env *env, struct md_object *pobj,
+                      const struct lu_fid *lf, const struct lu_name *lname,
+                      struct md_object *tobj, struct md_attr *ma)
+{
+       struct mdd_object       *mdd_pobj = md2mdd_obj(pobj);
+       struct mdd_device       *mdd = mdo2mdd(pobj);
+       struct mdd_object       *mdd_sobj = NULL;
+       struct mdd_object       *mdd_tobj = NULL;
+       struct lu_attr          *so_attr = MDD_ENV_VAR(env, cattr);
+       struct lu_attr          *pattr = MDD_ENV_VAR(env, pattr);
+       int                     rc;
+
+       ENTRY;
+       /* object has to be locked by mdt, so it must exist */
+       mdd_sobj = mdd_object_find(env, mdd, lf);
+       LASSERT(mdd_sobj != NULL);
+
+       /* If the file will being migrated, it will check whether
+        * the file is being opened by someone else right now */
+       mdd_read_lock(env, mdd_sobj, MOR_SRC_CHILD);
+       if (mdd_sobj->mod_count >= 1) {
+               CERROR("%s: "DFID"%s is already opened count %d: rc = %d\n",
+                      mdd2obd_dev(mdd)->obd_name,
+                      PFID(mdd_object_fid(mdd_sobj)), lname->ln_name,
+                      mdd_sobj->mod_count, -EBUSY);
+               mdd_read_unlock(env, mdd_sobj);
+               GOTO(put, rc = -EBUSY);
+       }
+       mdd_read_unlock(env, mdd_sobj);
+
+       rc = mdd_la_get(env, mdd_sobj, so_attr, mdd_object_capa(env, mdd_sobj));
+       if (rc != 0)
+               GOTO(put, rc);
+
+       rc = mdd_la_get(env, mdd_pobj, pattr, BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(put, rc);
+
+       rc = mdd_migrate_sanity_check(env, mdd_pobj, pattr, mdd_sobj, so_attr);
+       if (rc != 0) {
+               if (rc > 0)
+                       rc = 0;
+               GOTO(put, rc);
+       }
+
+       /* Sigh, it is impossible to finish all of migration in a single
+        * transaction, for example migrating big directory entries to the
+        * new MDT, it needs insert all of name entries of children in the
+        * new directory.
+        *
+        * So migration will be done in multiple steps and transactions.
+        *
+        * 1. create an orphan object on the remote MDT in one transaction.
+        * 2. migrate extend attributes to the new target file/directory.
+        * 3. For directory, migrate the entries to the new MDT and update
+        * linkEA of each children. Because we can not migrate all entries
+        * in a single transaction, so the migrating directory will become
+        * a striped directory during migration, so once the process is
+        * interrupted, the directory is still accessible. (During lookup,
+        * client will locate the name by searching both original and target
+        * object).
+        * 4. Finally, update the name/FID to point to the new file/directory
+        * in a separate transaction.
+        */
+
+       /* step 1: Check whether the orphan object has been created, and create
+        * orphan object on the remote MDT if needed */
+       mdd_tobj = md2mdd_obj(tobj);
+       if (!mdd_object_exists(mdd_tobj)) {
+               rc = mdd_migrate_create(env, mdd_pobj, mdd_sobj, mdd_tobj,
+                                       so_attr);
+               if (rc != 0)
+                       GOTO(put, rc);
+       }
+
+       /* step 2: migrate xattr */
+       rc = mdd_migrate_xattrs(env, mdd_sobj, mdd_tobj);
+       if (rc != 0)
+               GOTO(put, rc);
+
+       /* step 3: migrate name entries to the orphan object */
+       if (S_ISDIR(lu_object_attr(&mdd_sobj->mod_obj.mo_lu))) {
+               rc = mdd_migrate_entries(env, mdd_sobj, mdd_tobj);
+               if (rc != 0)
+                       GOTO(put, rc);
+               if (unlikely(OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_NET_REP,
+                                                 OBD_FAIL_MDS_REINT_NET_REP)))
+                       GOTO(put, rc = 0);
+       }
+
+       /* step 4: update name entry to the new object */
+       rc = mdd_migrate_update_name(env, mdd_pobj, mdd_sobj, mdd_tobj, lname,
+                                    ma);
+       if (rc != 0)
+               GOTO(put, rc);
+put:
+       if (mdd_sobj)
+               mdd_object_put(env, mdd_sobj);
+
+       RETURN(rc);
+}
+
 const struct md_dir_operations mdd_dir_ops = {
-        .mdo_is_subdir     = mdd_is_subdir,
-        .mdo_lookup        = mdd_lookup,
-        .mdo_create        = mdd_create,
-        .mdo_rename        = mdd_rename,
-        .mdo_link          = mdd_link,
-        .mdo_unlink        = mdd_unlink,
-        .mdo_create_data   = mdd_create_data,
+       .mdo_is_subdir     = mdd_is_subdir,
+       .mdo_lookup        = mdd_lookup,
+       .mdo_create        = mdd_create,
+       .mdo_rename        = mdd_rename,
+       .mdo_link          = mdd_link,
+       .mdo_unlink        = mdd_unlink,
+       .mdo_create_data   = mdd_create_data,
+       .mdo_migrate       = mdd_migrate,
 };
index c21facf..9c95279 100644 (file)
@@ -108,11 +108,11 @@ struct mdd_device {
 };
 
 enum mod_flags {
-        /* The dir object has been unlinked */
-        DEAD_OBJ   = 1 << 0,
-        APPEND_OBJ = 1 << 1,
-        IMMUTE_OBJ = 1 << 2,
-        ORPHAN_OBJ = 1 << 3,
+       /* The dir object has been unlinked */
+       DEAD_OBJ   = 1 << 0,
+       APPEND_OBJ = 1 << 1,
+       IMMUTE_OBJ = 1 << 2,
+       ORPHAN_OBJ = 1 << 3,
 };
 
 struct mdd_object {
@@ -164,6 +164,7 @@ struct mdd_thread_info {
        struct dt_object_format   mti_dof;
        struct obd_quotactl       mti_oqctl;
        struct linkea_data        mti_link_data;
+       struct md_op_spec         mti_spec;
 };
 
 extern const char orph_index_name[];
@@ -216,6 +217,7 @@ int mdd_get_md_locked(const struct lu_env *env, struct mdd_object *obj,
 int mdd_data_get(const struct lu_env *env, struct mdd_object *obj, void **data);
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
                struct lu_attr *la, struct lustre_capa *capa);
+void mdd_flags_xlate(struct mdd_object *obj, __u32 flags);
 int mdd_attr_get(const struct lu_env *env, struct md_object *obj,
                 struct md_attr *ma);
 int mdd_attr_set(const struct lu_env *env, struct md_object *obj,
@@ -237,7 +239,8 @@ int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj,
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                               struct mdd_object *c, struct lu_attr *attr,
                               struct thandle *handle,
-                              const struct md_op_spec *spec);
+                              const struct md_op_spec *spec,
+                              struct dt_allocation_hint *hint);
 int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj,
                        struct md_attr *ma);
 
@@ -356,7 +359,8 @@ struct lu_buf *mdd_link_buf_alloc(const struct lu_env *env, ssize_t len);
 int mdd_link_buf_grow(const struct lu_env *env, ssize_t len);
 extern const struct md_dir_operations    mdd_dir_ops;
 extern const struct md_object_operations mdd_obj_ops;
-
+int mdd_readlink(const struct lu_env *env, struct md_object *obj,
+                struct lu_buf *buf);
 int accmode(const struct lu_env *env, const struct lu_attr *la, int flags);
 extern struct lu_context_key mdd_thread_key;
 extern const struct lu_device_operations mdd_lu_ops;
@@ -386,7 +390,10 @@ int mdd_declare_object_create_internal(const struct lu_env *env,
                                       struct mdd_object *c,
                                       struct lu_attr *attr,
                                       struct thandle *handle,
-                                      const struct md_op_spec *spec);
+                                      const struct md_op_spec *spec,
+                                      struct dt_allocation_hint *hint);
+int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
+                  struct lu_buf *lmm_buf);
 
 /* mdd_trans.c */
 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
@@ -394,7 +401,8 @@ int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
 
 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
                          struct mdd_object *child, const struct lu_attr *attr,
-                         const struct md_op_spec *spec);
+                         const struct md_op_spec *spec,
+                         struct dt_allocation_hint *hint);
 
 static inline void mdd_object_get(struct mdd_object *o)
 {
index 5edc340..462b2b5 100644 (file)
@@ -88,7 +88,7 @@ int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
         return mdo_attr_get(env, obj, la, capa);
 }
 
-static void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
+void mdd_flags_xlate(struct mdd_object *obj, __u32 flags)
 {
         obj->mod_flags &= ~(APPEND_OBJ|IMMUTE_OBJ);
 
@@ -286,8 +286,8 @@ static int mdd_xattr_get(const struct lu_env *env,
  * Permission check is done when open,
  * no need check again.
  */
-static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
-                        struct lu_buf *buf)
+int mdd_readlink(const struct lu_env *env, struct md_object *obj,
+                struct lu_buf *buf)
 {
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct dt_object  *next;
@@ -302,7 +302,10 @@ static int mdd_readlink(const struct lu_env *env, struct md_object *obj,
         }
 
         next = mdd_object_child(mdd_obj);
-        mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
+       LASSERT(next != NULL);
+       LASSERT(next->do_body_ops != NULL);
+       LASSERT(next->do_body_ops->dbo_read != NULL);
+       mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = next->do_body_ops->dbo_read(env, next, buf, &pos,
                                          mdd_object_capa(env, mdd_obj));
         mdd_read_unlock(env, mdd_obj);
@@ -332,10 +335,10 @@ int mdd_declare_object_create_internal(const struct lu_env *env,
                                       struct mdd_object *c,
                                       struct lu_attr *attr,
                                       struct thandle *handle,
-                                      const struct md_op_spec *spec)
+                                      const struct md_op_spec *spec,
+                                      struct dt_allocation_hint *hint)
 {
         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
-       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
         const struct dt_index_features *feat = spec->sp_feat;
         int rc;
         ENTRY;
@@ -364,10 +367,10 @@ int mdd_declare_object_create_internal(const struct lu_env *env,
 
 int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
                               struct mdd_object *c, struct lu_attr *attr,
-                               struct thandle *handle,
-                               const struct md_op_spec *spec)
+                              struct thandle *handle,
+                              const struct md_op_spec *spec,
+                              struct dt_allocation_hint *hint)
 {
-        struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
         struct dt_object_format *dof = &mdd_env_info(env)->mti_dof;
         int rc;
         ENTRY;
@@ -1175,12 +1178,11 @@ stop:
  * read lov EA of an object
  * return the lov EA in an allocated lu_buf
  */
-static int mdd_get_lov_ea(const struct lu_env *env,
-                         struct mdd_object *obj,
-                         struct lu_buf *lmm_buf)
+int mdd_get_lov_ea(const struct lu_env *env, struct mdd_object *obj,
+                  struct lu_buf *lmm_buf)
 {
        struct lu_buf   *buf = &mdd_env_info(env)->mti_big_buf;
-       int              rc, sz;
+       int              rc, bufsize;
        ENTRY;
 
 repeat:
@@ -1198,27 +1200,27 @@ repeat:
        }
 
        if (rc < 0)
-               GOTO(out, rc);
+               RETURN(rc);
 
        if (rc == 0)
-               GOTO(out, rc = -ENODATA);
+               RETURN(-ENODATA);
 
-       sz = rc;
+       bufsize = rc;
        if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
                /* mti_big_buf was not allocated, so we have to
                 * allocate it based on the ea size */
                buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
-                                            sz);
+                                            bufsize);
                if (buf->lb_buf == NULL)
                        GOTO(out, rc = -ENOMEM);
                goto repeat;
        }
 
-       lu_buf_alloc(lmm_buf, sz);
+       lu_buf_alloc(lmm_buf, bufsize);
        if (lmm_buf->lb_buf == NULL)
                GOTO(out, rc = -ENOMEM);
 
-       memcpy(lmm_buf->lb_buf, buf->lb_buf, sz);
+       memcpy(lmm_buf->lb_buf, buf->lb_buf, bufsize);
        rc = 0;
        EXIT;
 
@@ -1564,9 +1566,9 @@ stop:
 
 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
                          struct mdd_object *child, const struct lu_attr *attr,
-                         const struct md_op_spec *spec)
+                         const struct md_op_spec *spec,
+                         struct dt_allocation_hint *hint)
 {
-       struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
        struct dt_object *np = parent ?  mdd_object_child(parent) : NULL;
        struct dt_object *nc = mdd_object_child(child);
 
@@ -1583,7 +1585,7 @@ void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
                hint->dah_eadata_len = 0;
        }
 
-       CDEBUG(D_INFO, DFID" eadata %p, len %d\n", PFID(mdd_object_fid(child)),
+       CDEBUG(D_INFO, DFID" eadata %p len %d\n", PFID(mdd_object_fid(child)),
               hint->dah_eadata, hint->dah_eadata_len);
        /* @hint will be initialized by underlying device. */
        nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
index 7c755ce..ee316d7 100644 (file)
@@ -91,7 +91,8 @@ static int write_capa_keys(const struct lu_env *env,
                RETURN(PTR_ERR(th));
 
        rc = dt_declare_record_write(env, mdt->mdt_ck_obj,
-                                    sizeof(*tmp) * 3, 0, th);
+                                    mdt_buf_const(env, NULL,
+                                    sizeof(*tmp) * 3), 0, th);
        if (rc)
                goto stop;
 
index f771629..fd66ec5 100644 (file)
@@ -67,7 +67,6 @@
 #include <lustre_acl.h>
 #include <lustre_param.h>
 #include <lustre_quota.h>
-#include <lustre_linkea.h>
 #include <lustre_lfsck.h>
 
 mdl_mode_t mdt_mdl_lock_modes[] = {
@@ -521,8 +520,8 @@ static int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
        RETURN(rc);
 }
 
-static int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
-                         struct md_attr *ma, const char *name)
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                  struct md_attr *ma, const char *name)
 {
        struct md_object *next = mdt_object_child(o);
        struct lu_buf    *buf = &info->mti_buf;
@@ -1796,7 +1795,8 @@ static int mdt_reint_internal(struct mdt_thread_info *info,
                 GOTO(out_ucred, rc = err_serious(rc));
 
         if (mdt_check_resent(info, mdt_reconstruct, lhc)) {
-                rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
+               DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt.");
+               rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
                 GOTO(out_ucred, rc);
         }
         rc = mdt_reint_rec(info, lhc);
@@ -1849,7 +1849,8 @@ int mdt_reint(struct tgt_session_info *tsi)
                [REINT_RENAME]   = &RQF_MDS_REINT_RENAME,
                [REINT_OPEN]     = &RQF_MDS_REINT_OPEN,
                [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
-               [REINT_RMENTRY]  = &RQF_MDS_REINT_UNLINK
+               [REINT_RMENTRY]  = &RQF_MDS_REINT_UNLINK,
+               [REINT_MIGRATE]  = &RQF_MDS_REINT_RENAME
        };
 
        ENTRY;
@@ -2719,6 +2720,9 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
 
         info->mti_spec.no_create = 0;
        info->mti_spec.sp_rm_entry = 0;
+
+       info->mti_spec.u.sp_ea.eadata = NULL;
+       info->mti_spec.u.sp_ea.eadatalen = 0;
 }
 
 void mdt_thread_info_fini(struct mdt_thread_info *info)
@@ -3935,7 +3939,6 @@ static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt,
        site->ls_top_dev = &mdt->mdt_lu_dev;
        mdt->mdt_child = lu2md_dev(mdt->mdt_child_exp->exp_obd->obd_lu_dev);
 
-
        /* now connect to bottom OSD */
        snprintf(name, MAX_OBD_NAME, "%s-osd", dev);
        rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_bottom_exp);
@@ -3944,7 +3947,6 @@ static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt,
        mdt->mdt_bottom =
                lu2dt_dev(mdt->mdt_bottom_exp->exp_obd->obd_lu_dev);
 
-
        rc = lu_env_refill((struct lu_env *)env);
        if (rc != 0)
                CERROR("Failure to refill session: '%d'\n", rc);
@@ -5211,8 +5213,8 @@ struct path_lookup_info {
        int                     pli_fidcount;   /**< number of \a pli_fids */
 };
 
-static int mdt_links_read(struct mdt_thread_info *info,
-                         struct mdt_object *mdt_obj, struct linkea_data *ldata)
+int mdt_links_read(struct mdt_thread_info *info, struct mdt_object *mdt_obj,
+                  struct linkea_data *ldata)
 {
        int rc;
 
index c3d4f7f..80e0c77 100644 (file)
@@ -64,6 +64,7 @@
 #include <lustre_idmap.h>
 #include <lustre_eacl.h>
 #include <lustre_quota.h>
+#include <lustre_linkea.h>
 
 /* check if request's xid is equal to last one or not*/
 static inline int req_xid_is_last(struct ptlrpc_request *req)
@@ -745,8 +746,8 @@ enum {
 int mdt_get_info(struct tgt_session_info *tsi);
 int mdt_attr_get_complex(struct mdt_thread_info *info,
                         struct mdt_object *o, struct md_attr *ma);
-int mdt_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
-                 struct md_attr *ma, const char *name);
+int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
+                  struct md_attr *ma, const char *name);
 int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o,
                      int created);
 int mdt_object_is_som_enabled(struct mdt_object *mo);
@@ -795,6 +796,9 @@ int mdt_hsm_attr_set(struct mdt_thread_info *info, struct mdt_object *obj,
 
 int mdt_remote_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                            void *data, int flag);
+int mdt_links_read(struct mdt_thread_info *info,
+                  struct mdt_object *mdt_obj,
+                  struct linkea_data *ldata);
 /* mdt_idmap.c */
 int mdt_init_idmap(struct tgt_session_info *tsi);
 void mdt_cleanup_idmap(struct mdt_export_data *);
index e2cd8b1..e7e3406 100644 (file)
@@ -1256,8 +1256,10 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
 
         info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info));
 
-        rc = mdt_dlmreq_unpack(info);
-        RETURN(rc);
+
+       rc = mdt_dlmreq_unpack(info);
+
+       RETURN(rc);
 }
 
 /*
@@ -1450,6 +1452,7 @@ static reint_unpacker mdt_reint_unpackers[REINT_MAX] = {
        [REINT_OPEN]     = mdt_open_unpack,
        [REINT_SETXATTR] = mdt_setxattr_unpack,
        [REINT_RMENTRY]  = mdt_rmentry_unpack,
+       [REINT_MIGRATE]  = mdt_rename_unpack,
 };
 
 int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op)
index 52354ee..6ab847a 100644 (file)
@@ -123,8 +123,7 @@ static int mdt_create_data(struct mdt_thread_info *info,
        ma->ma_valid = 0;
        mutex_lock(&o->mot_lov_mutex);
        if (!(o->mot_flags & MOF_LOV_CREATED)) {
-               if (p != NULL && (fid_is_obf(mdt_object_fid(p)) ||
-                                 fid_is_dot_lustre(mdt_object_fid(p))))
+               if (p != NULL && !fid_is_md_operative(mdt_object_fid(p)))
                        GOTO(unlock, rc = -EPERM);
 
                rc = mdo_create_data(info->mti_env,
@@ -1757,7 +1756,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
         if (result == -ENOENT) {
                /* Create under OBF and .lustre is not permitted */
-               if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+               if (!fid_is_md_operative(rr->rr_fid1))
                        GOTO(out_child, result = -EPERM);
 
                /* save versions in reply */
index 0afa5e8..9de12fa 100644 (file)
@@ -269,14 +269,11 @@ static int mdt_md_create(struct mdt_thread_info *info)
                  "in "DFID,
                  PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
 
-       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+       if (!fid_is_md_operative(rr->rr_fid1))
                RETURN(-EPERM);
 
        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
-       lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
-
        parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
        if (IS_ERR(parent))
                RETURN(PTR_ERR(parent));
@@ -793,7 +790,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
                 RETURN(err_serious(-ENOENT));
 
-       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
+       if (!fid_is_md_operative(rr->rr_fid1))
                RETURN(-EPERM);
 
         /*
@@ -856,7 +853,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 }
        }
 
-       if (fid_is_obf(child_fid) || fid_is_dot_lustre(child_fid))
+       if (!fid_is_md_operative(child_fid))
                GOTO(unlock_parent, rc = -EPERM);
 
        /* We will lock the child regardless it is local or remote. No harm. */
@@ -1029,8 +1026,8 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
                 RETURN(-EPERM);
 
-       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
-           fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
+       if (!fid_is_md_operative(rr->rr_fid1) ||
+           !fid_is_md_operative(rr->rr_fid2))
                RETURN(-EPERM);
 
         /* step 1: find & lock the target parent dir */
@@ -1201,29 +1198,333 @@ static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
         int rc = 0;
         ENTRY;
 
-        do {
-                LASSERT(fid_is_sane(&dst_fid));
-                dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
-                if (!IS_ERR(dst)) {
-                        rc = mdo_is_subdir(info->mti_env,
-                                           mdt_object_child(dst), fid,
-                                           &dst_fid);
-                        mdt_object_put(info->mti_env, dst);
-                        if (rc != -EREMOTE && rc < 0) {
-                                CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
-                        } else {
-                                /* check the found fid */
-                                if (lu_fid_eq(&dst_fid, fid))
-                                        rc = -EINVAL;
-                        }
-                } else {
-                        rc = PTR_ERR(dst);
-                }
-        } while (rc == -EREMOTE);
+       /* If the source and target are in the same directory, they can not
+        * be parent/child relationship, so subdir check is not needed */
+       if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
+               return 0;
+
+       do {
+               LASSERT(fid_is_sane(&dst_fid));
+               dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
+               if (!IS_ERR(dst)) {
+                       rc = mdo_is_subdir(info->mti_env,
+                                          mdt_object_child(dst), fid,
+                                          &dst_fid);
+                       mdt_object_put(info->mti_env, dst);
+                       if (rc != -EREMOTE && rc < 0) {
+                               CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
+                       } else {
+                               /* check the found fid */
+                               if (lu_fid_eq(&dst_fid, fid))
+                                       rc = -EINVAL;
+                       }
+               } else {
+                       rc = PTR_ERR(dst);
+               }
+       } while (rc == -EREMOTE);
 
         RETURN(rc);
 }
 
+/* Update object linkEA */
+struct mdt_lock_list {
+       struct mdt_object       *mll_obj;
+       struct mdt_lock_handle  mll_lh;
+       struct list_head        mll_list;
+};
+
+static void mdt_unlock_list(struct mdt_thread_info *info,
+                           struct list_head *list, int rc)
+{
+       struct mdt_lock_list *mll;
+       struct mdt_lock_list *mll2;
+
+       list_for_each_entry_safe(mll, mll2, list, mll_list) {
+               mdt_object_unlock_put(info, mll->mll_obj, &mll->mll_lh, rc);
+               list_del(&mll->mll_list);
+               OBD_FREE_PTR(mll);
+       }
+}
+
+static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info,
+                                     struct mdt_object *obj,
+                                     struct mdt_object *pobj,
+                                     struct list_head *lock_list)
+{
+       struct lu_buf           *buf = &info->mti_big_buf;
+       struct linkea_data      ldata = { 0 };
+       int                     count;
+       int                     rc;
+       ENTRY;
+
+       if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
+               RETURN(0);
+
+       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       if (buf->lb_buf == NULL)
+               RETURN(-ENOMEM);
+
+       ldata.ld_buf = buf;
+       rc = mdt_links_read(info, obj, &ldata);
+       if (rc != 0) {
+               if (rc == -ENOENT || rc == -ENODATA)
+                       rc = 0;
+               RETURN(rc);
+       }
+
+       LASSERT(ldata.ld_leh != NULL);
+       ldata.ld_lee = (struct link_ea_entry *)(ldata.ld_leh + 1);
+       for (count = 0; count < ldata.ld_leh->leh_reccount; count++) {
+               struct mdt_device *mdt = info->mti_mdt;
+               struct mdt_object *mdt_pobj;
+               struct mdt_lock_list *mll;
+               struct lu_name name;
+               struct lu_fid  fid;
+
+               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen,
+                                   &name, &fid);
+               ldata.ld_lee = (struct link_ea_entry *)((char *)ldata.ld_lee +
+                                                        ldata.ld_reclen);
+               mdt_pobj = mdt_object_find(info->mti_env, mdt, &fid);
+               if (IS_ERR(mdt_pobj)) {
+                       CWARN("%s: cannot find obj "DFID": rc = %ld\n",
+                             mdt_obd_name(mdt), PFID(&fid), PTR_ERR(mdt_pobj));
+                       continue;
+               }
+
+               if (!mdt_object_exists(mdt_pobj)) {
+                       CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
+                             mdt_obd_name(mdt), PFID(&fid));
+                       mdt_object_put(info->mti_env, mdt_pobj);
+                       continue;
+               }
+
+               if (mdt_pobj == pobj) {
+                       CDEBUG(D_INFO, "%s: skipping parent obj "DFID"\n",
+                              mdt_obd_name(mdt), PFID(&fid));
+                       mdt_object_put(info->mti_env, mdt_pobj);
+                       continue;
+               }
+
+               OBD_ALLOC_PTR(mll);
+               if (mll == NULL) {
+                       mdt_object_put(info->mti_env, mdt_pobj);
+                       GOTO(out, rc = -ENOMEM);
+               }
+
+               if (mdt_object_remote(mdt_pobj)) {
+                       mdt_lock_reg_init(&mll->mll_lh, LCK_EX);
+                       rc = mdt_remote_object_lock(info, mdt_pobj,
+                                                   &mll->mll_lh.mlh_rreg_lh,
+                                                   mll->mll_lh.mlh_rreg_mode,
+                                                   MDS_INODELOCK_UPDATE);
+               } else {
+                       mdt_lock_pdo_init(&mll->mll_lh, LCK_PW, &name);
+                       rc = mdt_object_lock(info, mdt_pobj, &mll->mll_lh,
+                                            MDS_INODELOCK_UPDATE,
+                                            MDT_LOCAL_LOCK);
+               }
+               if (rc != 0) {
+                       CERROR("%s: cannot lock "DFID": rc =%d\n",
+                              mdt_obd_name(mdt), PFID(&fid), rc);
+                       mdt_object_put(info->mti_env, mdt_pobj);
+                       OBD_FREE_PTR(mll);
+                       GOTO(out, rc);
+               }
+
+               CFS_INIT_LIST_HEAD(&mll->mll_list);
+               mll->mll_obj = mdt_pobj;
+               list_add_tail(&mll->mll_list, lock_list);
+       }
+out:
+       if (rc != 0)
+               mdt_unlock_list(info, lock_list, rc);
+       RETURN(rc);
+}
+
+/* migrate files from one MDT to another MDT */
+static int mdt_reint_migrate_internal(struct mdt_thread_info *info,
+                                     struct mdt_lock_handle *lhc)
+{
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct md_attr          *ma = &info->mti_attr;
+       struct mdt_object       *msrcdir;
+       struct mdt_object       *mold;
+       struct mdt_object       *mnew = NULL;
+       struct mdt_lock_handle  *lh_dirp;
+       struct mdt_lock_handle  *lh_childp;
+       struct mdt_lock_handle  *lh_tgtp = NULL;
+       struct lu_fid           *old_fid = &info->mti_tmp_fid1;
+       struct list_head        lock_list;
+       int                     rc;
+       ENTRY;
+
+       CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
+              PNAME(&rr->rr_name), PFID(rr->rr_fid2));
+       /* 1: lock the source dir. */
+       msrcdir = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+       if (IS_ERR(msrcdir)) {
+               CERROR("%s: cannot find source dir "DFID" : rc = %d\n",
+                       mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
+                       (int)PTR_ERR(msrcdir));
+               RETURN(PTR_ERR(msrcdir));
+       }
+
+       lh_dirp = &info->mti_lh[MDT_LH_PARENT];
+       if (mdt_object_remote(msrcdir)) {
+               mdt_lock_reg_init(lh_dirp, LCK_EX);
+               rc = mdt_remote_object_lock(info, msrcdir,
+                                           &lh_dirp->mlh_rreg_lh,
+                                           lh_dirp->mlh_rreg_mode,
+                                           MDS_INODELOCK_UPDATE);
+               if (rc != ELDLM_OK)
+                       GOTO(out_put_parent, rc);
+       } else {
+               mdt_lock_pdo_init(lh_dirp, LCK_PW, &rr->rr_name);
+               rc = mdt_object_lock(info, msrcdir, lh_dirp,
+                                    MDS_INODELOCK_UPDATE,
+                                    MDT_LOCAL_LOCK);
+               if (rc)
+                       GOTO(out_put_parent, rc);
+
+               rc = mdt_version_get_check_save(info, msrcdir, 0);
+               if (rc)
+                       GOTO(out_unlock_parent, rc);
+       }
+
+       /* 2: sanity check and find the object to be migrated. */
+       fid_zero(old_fid);
+       rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
+       if (rc != 0)
+               GOTO(out_unlock_parent, rc);
+
+       if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
+               GOTO(out_unlock_parent, rc = -EINVAL);
+
+       if (!fid_is_md_operative(old_fid))
+               GOTO(out_unlock_parent, rc = -EPERM);
+
+       mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
+       if (IS_ERR(mold))
+               GOTO(out_unlock_parent, rc = PTR_ERR(mold));
+
+       if (mdt_object_remote(mold)) {
+               CERROR("%s: source "DFID" is on the remote MDT\n",
+                      mdt_obd_name(info->mti_mdt), PFID(old_fid));
+               GOTO(out_put_child, rc = -EREMOTE);
+       }
+
+       if (S_ISREG(lu_object_attr(&mold->mot_obj)) &&
+           !mdt_object_remote(msrcdir)) {
+               CERROR("%s: parent "DFID" is still on the same"
+                      " MDT, which should be migrated first:"
+                      " rc = %d\n", mdt_obd_name(info->mti_mdt),
+                      PFID(mdt_object_fid(msrcdir)), -EPERM);
+               GOTO(out_put_child, rc = -EPERM);
+       }
+
+       /* 3: iterate the linkea of the object and lock all of the objects */
+       CFS_INIT_LIST_HEAD(&lock_list);
+       rc = mdt_lock_objects_in_linkea(info, mold, msrcdir, &lock_list);
+       if (rc != 0)
+               GOTO(out_put_child, rc);
+
+       /* 4: lock of the object migrated object */
+       lh_childp = &info->mti_lh[MDT_LH_OLD];
+       mdt_lock_reg_init(lh_childp, LCK_EX);
+       rc = mdt_object_lock(info, mold, lh_childp,
+                            MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
+                            MDS_INODELOCK_LAYOUT, MDT_CROSS_LOCK);
+       if (rc != 0)
+               GOTO(out_unlock_list, rc);
+
+       ma->ma_need = MA_LMV;
+       ma->ma_valid = 0;
+       ma->ma_lmv = (union lmv_mds_md *)info->mti_xattr_buf;
+       ma->ma_lmv_size = sizeof(info->mti_xattr_buf);
+       rc = mdt_stripe_get(info, mold, ma, XATTR_NAME_LMV);
+       if (rc != 0)
+               GOTO(out_unlock_list, rc);
+
+       if ((ma->ma_valid & MA_LMV)) {
+               struct lmv_mds_md_v1 *lmm1;
+
+               lmv_le_to_cpu(ma->ma_lmv, ma->ma_lmv);
+               lmm1 = &ma->ma_lmv->lmv_md_v1;
+               if (lmm1->lmv_magic != LMV_MAGIC_MIGRATE) {
+                       CERROR("%s: can not migrate striped dir "DFID
+                              ": rc = %d\n", mdt_obd_name(info->mti_mdt),
+                              PFID(mdt_object_fid(mold)), -EPERM);
+                       GOTO(out_unlock_child, rc = -EPERM);
+               }
+
+               if (!fid_is_sane(&lmm1->lmv_stripe_fids[1]))
+                       GOTO(out_unlock_child, rc = -EINVAL);
+
+               mnew = mdt_object_find(info->mti_env, info->mti_mdt,
+                                      &lmm1->lmv_stripe_fids[1]);
+               if (IS_ERR(mnew))
+                       GOTO(out_unlock_child, rc = PTR_ERR(mnew));
+
+               if (!mdt_object_remote(mnew)) {
+                       CERROR("%s: "DFID" being migrated is on this MDT:"
+                              " rc  = %d\n", mdt_obd_name(info->mti_mdt),
+                              PFID(rr->rr_fid2), -EPERM);
+                       GOTO(out_put_new, rc = -EPERM);
+               }
+
+               lh_tgtp = &info->mti_lh[MDT_LH_CHILD];
+               mdt_lock_reg_init(lh_tgtp, LCK_EX);
+               rc = mdt_remote_object_lock(info, mnew,
+                                           &lh_tgtp->mlh_rreg_lh,
+                                           lh_tgtp->mlh_rreg_mode,
+                                           MDS_INODELOCK_UPDATE);
+               if (rc != 0) {
+                       lh_tgtp = NULL;
+                       GOTO(out_put_new, rc);
+               }
+       } else {
+               mnew = mdt_object_find(info->mti_env, info->mti_mdt,
+                                      rr->rr_fid2);
+               if (IS_ERR(mnew))
+                       GOTO(out_unlock_child, rc = PTR_ERR(mnew));
+               if (!mdt_object_remote(mnew)) {
+                       CERROR("%s: Migration "DFID" is on this MDT !\n",
+                              mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid2));
+                       GOTO(out_put_new, rc = -EXDEV);
+               }
+       }
+
+       /* 5: migrate it */
+       mdt_reint_init_ma(info, ma);
+
+       mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
+                      OBD_FAIL_MDS_REINT_RENAME_WRITE);
+
+       rc = mdo_migrate(info->mti_env, mdt_object_child(msrcdir),
+                        old_fid, &rr->rr_name, mdt_object_child(mnew), ma);
+       if (rc != 0)
+               GOTO(out_unlock_new, rc);
+out_unlock_new:
+       if (lh_tgtp != NULL)
+               mdt_object_unlock(info, mnew, lh_tgtp, rc);
+out_put_new:
+       if (mnew)
+               mdt_object_put(info->mti_env, mnew);
+out_unlock_child:
+       mdt_object_unlock(info, mold, lh_childp, rc);
+out_unlock_list:
+       mdt_unlock_list(info, &lock_list, rc);
+out_put_child:
+       mdt_object_put(info->mti_env, mold);
+out_unlock_parent:
+       mdt_object_unlock(info, msrcdir, lh_dirp, rc);
+out_put_parent:
+       mdt_object_put(info->mti_env, msrcdir);
+
+       RETURN(rc);
+}
+
 /*
  * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent;
  * 2 - src child; 3 - tgt child.
@@ -1240,80 +1541,64 @@ static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
  *    And tgt_c will be still in the same MDT as the original
  *    src_c.
  */
-static int mdt_reint_rename(struct mdt_thread_info *info,
-                            struct mdt_lock_handle *lhc)
+static int mdt_reint_rename_internal(struct mdt_thread_info *info,
+                                    struct mdt_lock_handle *lhc)
 {
-        struct mdt_reint_record *rr = &info->mti_rr;
-        struct md_attr          *ma = &info->mti_attr;
-        struct ptlrpc_request   *req = mdt_info_req(info);
-        struct mdt_object       *msrcdir;
-        struct mdt_object       *mtgtdir;
-        struct mdt_object       *mold;
-        struct mdt_object       *mnew = NULL;
-        struct mdt_lock_handle  *lh_srcdirp;
-        struct mdt_lock_handle  *lh_tgtdirp;
-        struct mdt_lock_handle  *lh_oldp;
-        struct mdt_lock_handle  *lh_newp;
-        struct lu_fid           *old_fid = &info->mti_tmp_fid1;
-        struct lu_fid           *new_fid = &info->mti_tmp_fid2;
-        struct lustre_handle     rename_lh = { 0 };
-        int                      rc;
-        ENTRY;
-
-        if (info->mti_dlm_req)
-                ldlm_request_cancel(req, info->mti_dlm_req, 0);
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct md_attr          *ma = &info->mti_attr;
+       struct ptlrpc_request   *req = mdt_info_req(info);
+       struct mdt_object       *msrcdir;
+       struct mdt_object       *mtgtdir;
+       struct mdt_object       *mold;
+       struct mdt_object       *mnew = NULL;
+       struct mdt_lock_handle  *lh_srcdirp;
+       struct mdt_lock_handle  *lh_tgtdirp;
+       struct mdt_lock_handle  *lh_oldp = NULL;
+       struct mdt_lock_handle  *lh_newp = NULL;
+       struct lu_fid           *old_fid = &info->mti_tmp_fid1;
+       struct lu_fid           *new_fid = &info->mti_tmp_fid2;
+       int                      rc;
+       ENTRY;
 
        DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
                  PFID(rr->rr_fid1), PNAME(&rr->rr_name),
                  PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
 
-       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
-           fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
-               RETURN(-EPERM);
-
-       rc = mdt_rename_lock(info, &rename_lh);
-       if (rc) {
-               CERROR("Can't lock FS for rename, rc %d\n", rc);
-               RETURN(rc);
-       }
-
-        lh_newp = &info->mti_lh[MDT_LH_NEW];
-
-        /* step 1: lock the source dir. */
-        lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
+       /* step 1: lock the source dir. */
+       lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
        mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
-        msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
-                                       MDS_INODELOCK_UPDATE);
-        if (IS_ERR(msrcdir))
-                GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
+       msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
+                                      MDS_INODELOCK_UPDATE);
+       if (IS_ERR(msrcdir))
+               RETURN(PTR_ERR(msrcdir));
 
-        rc = mdt_version_get_check_save(info, msrcdir, 0);
-        if (rc)
-                GOTO(out_unlock_source, rc);
+       rc = mdt_version_get_check_save(info, msrcdir, 0);
+       if (rc)
+               GOTO(out_unlock_source, rc);
 
-        /* step 2: find & lock the target dir. */
-        lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
+       /* step 2: find & lock the target dir. */
+       lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
        mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
-        if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
-                mdt_object_get(info->mti_env, msrcdir);
-                mtgtdir = msrcdir;
-                if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
-                         rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
-                                                 MDS_INODELOCK_UPDATE);
-                         if (rc)
-                                 GOTO(out_unlock_source, rc);
-                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
-                }
-        } else {
-                mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
-                                          rr->rr_fid2);
-                if (IS_ERR(mtgtdir))
-                        GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
+       if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
+               mdt_object_get(info->mti_env, msrcdir);
+               mtgtdir = msrcdir;
+               if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) {
+                       rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
+                                        MDS_INODELOCK_UPDATE);
+                       if (rc != 0)
+                               GOTO(out_unlock_source, rc);
+                       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
+               }
+       } else {
+               mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
+                                         rr->rr_fid2);
+               if (IS_ERR(mtgtdir))
+                       GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
 
-                /* check early, the real version will be saved after locking */
-                rc = mdt_version_get_check(info, mtgtdir, 1);
-                if (rc)
-                        GOTO(out_put_target, rc);
+               /* check early, the real version will be saved after locking */
+               rc = mdt_version_get_check(info, mtgtdir, 1);
+               if (rc)
+                       GOTO(out_put_target, rc);
 
                if (unlikely(mdt_object_remote(mtgtdir))) {
                        CDEBUG(D_INFO, "Source dir "DFID" target dir "DFID
@@ -1345,96 +1630,102 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
        if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
                GOTO(out_unlock_target, rc = -EINVAL);
 
-       if (fid_is_obf(old_fid) || fid_is_dot_lustre(old_fid))
+       if (!fid_is_md_operative(old_fid))
                GOTO(out_unlock_target, rc = -EPERM);
 
        mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
        if (IS_ERR(mold))
                GOTO(out_unlock_target, rc = PTR_ERR(mold));
 
-        lh_oldp = &info->mti_lh[MDT_LH_OLD];
-        mdt_lock_reg_init(lh_oldp, LCK_EX);
-       rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP |
-                            MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
-        if (rc != 0) {
-                mdt_object_put(info->mti_env, mold);
-                GOTO(out_unlock_target, rc);
-        }
-
        tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
-        /* save version after locking */
-        mdt_version_get_save(info, mold, 2);
-        mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
-
-        /* step 4: find & lock the new object. */
-        /* new target object may not exist now */
-        /* lookup with version checking */
-        fid_zero(new_fid);
+       /* save version after locking */
+       mdt_version_get_save(info, mold, 2);
+       mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
+
+       /* step 4: find & lock the new object. */
+       /* new target object may not exist now */
+       /* lookup with version checking */
+       fid_zero(new_fid);
        rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
                                      3);
-        if (rc == 0) {
-                /* the new_fid should have been filled at this moment */
-                if (lu_fid_eq(old_fid, new_fid))
-                       GOTO(out_unlock_old, rc);
+       if (rc == 0) {
+               /* the new_fid should have been filled at this moment */
+               if (lu_fid_eq(old_fid, new_fid))
+                       GOTO(out_put_old, rc);
 
-                if (lu_fid_eq(new_fid, rr->rr_fid1) ||
-                    lu_fid_eq(new_fid, rr->rr_fid2))
-                        GOTO(out_unlock_old, rc = -EINVAL);
+               if (lu_fid_eq(new_fid, rr->rr_fid1) ||
+                   lu_fid_eq(new_fid, rr->rr_fid2))
+                       GOTO(out_put_old, rc = -EINVAL);
 
-               if (fid_is_obf(new_fid) || fid_is_dot_lustre(new_fid))
-                       GOTO(out_unlock_old, rc = -EPERM);
+               if (!fid_is_md_operative(new_fid))
+                       GOTO(out_put_old, rc = -EPERM);
 
                if (mdt_object_remote(mold)) {
                        CDEBUG(D_INFO, "Src child "DFID" is on another MDT\n",
                               PFID(old_fid));
-                       GOTO(out_unlock_old, rc = -EXDEV);
+                       GOTO(out_put_old, rc = -EXDEV);
                }
 
-                mdt_lock_reg_init(lh_newp, LCK_EX);
-                mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
-                if (IS_ERR(mnew))
-                        GOTO(out_unlock_old, rc = PTR_ERR(mnew));
+               mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
+               if (IS_ERR(mnew))
+                       GOTO(out_put_old, rc = PTR_ERR(mnew));
 
                if (mdt_object_remote(mnew)) {
-                       mdt_object_put(info->mti_env, mnew);
                        CDEBUG(D_INFO, "src child "DFID" is on another MDT\n",
                               PFID(new_fid));
-                       GOTO(out_unlock_old, rc = -EXDEV);
+                       GOTO(out_put_new, rc = -EXDEV);
                }
 
+               lh_oldp = &info->mti_lh[MDT_LH_OLD];
+               mdt_lock_reg_init(lh_oldp, LCK_EX);
+               rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
+               if (rc != 0)
+                       GOTO(out_put_new, rc);
+
                /* We used to acquire MDS_INODELOCK_FULL here but we
                 * can't do this now because a running HSM restore on
                 * the rename onto victim will hold the layout
                 * lock. See LU-4002. */
+
+               lh_newp = &info->mti_lh[MDT_LH_NEW];
+               mdt_lock_reg_init(lh_newp, LCK_EX);
                rc = mdt_object_lock(info, mnew, lh_newp,
                                     MDS_INODELOCK_LOOKUP |
                                     MDS_INODELOCK_UPDATE,
                                     MDT_CROSS_LOCK);
-                if (rc != 0) {
-                        mdt_object_put(info->mti_env, mnew);
-                        GOTO(out_unlock_old, rc);
-                }
-                /* get and save version after locking */
-                mdt_version_get_save(info, mnew, 3);
-                mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
-        } else if (rc != -EREMOTE && rc != -ENOENT) {
-                GOTO(out_unlock_old, rc);
-        } else {
+               if (rc != 0)
+                       GOTO(out_unlock_old, rc);
+
+               /* get and save version after locking */
+               mdt_version_get_save(info, mnew, 3);
+               mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
+       } else if (rc != -EREMOTE && rc != -ENOENT) {
+               GOTO(out_put_old, rc);
+       } else {
                /* If mnew does not exist and mold are remote directory,
                 * it only allows rename if they are under same directory */
                if (mtgtdir != msrcdir && mdt_object_remote(mold)) {
                        CDEBUG(D_INFO, "Src child "DFID" is on another MDT\n",
                               PFID(old_fid));
-                       GOTO(out_unlock_old, rc = -EXDEV);
+                       GOTO(out_put_old, rc = -EXDEV);
                }
+
+               lh_oldp = &info->mti_lh[MDT_LH_OLD];
+               mdt_lock_reg_init(lh_oldp, LCK_EX);
+               rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP |
+                                    MDS_INODELOCK_XATTR, MDT_CROSS_LOCK);
+               if (rc != 0)
+                       GOTO(out_put_old, rc);
+
                mdt_enoent_version_save(info, 3);
-        }
+       }
 
-        /* step 5: rename it */
-        mdt_reint_init_ma(info, ma);
+       /* step 5: rename it */
+       mdt_reint_init_ma(info, ma);
 
-        mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
-                       OBD_FAIL_MDS_REINT_RENAME_WRITE);
+       mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
+                      OBD_FAIL_MDS_REINT_RENAME_WRITE);
 
        /* Check if @dst is subdir of @src. */
        rc = mdt_rename_sanity(info, old_fid);
@@ -1459,25 +1750,74 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                        mdt_handle_last_unlink(info, mnew, ma);
 
                mdt_rename_counter_tally(info, info->mti_mdt, req,
-                                         msrcdir, mtgtdir);
-        }
+                                        msrcdir, mtgtdir);
+       }
 
-        EXIT;
+       EXIT;
 out_unlock_new:
-        if (mnew)
-                mdt_object_unlock_put(info, mnew, lh_newp, rc);
+       if (mnew != NULL)
+               mdt_object_unlock(info, mnew, lh_newp, rc);
 out_unlock_old:
-        mdt_object_unlock_put(info, mold, lh_oldp, rc);
+       mdt_object_unlock(info, mold, lh_oldp, rc);
+out_put_new:
+       if (mnew != NULL)
+               mdt_object_put(info->mti_env, mnew);
+out_put_old:
+       mdt_object_put(info->mti_env, mold);
 out_unlock_target:
-        mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
+       mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
 out_put_target:
-        mdt_object_put(info->mti_env, mtgtdir);
+       mdt_object_put(info->mti_env, mtgtdir);
 out_unlock_source:
-        mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
-out_rename_lock:
+       mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
+       return rc;
+}
+
+static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info,
+                                      struct mdt_lock_handle *lhc,
+                                      bool  rename)
+{
+       struct mdt_reint_record *rr = &info->mti_rr;
+       struct ptlrpc_request   *req = mdt_info_req(info);
+       struct lustre_handle    rename_lh = { 0 };
+       int                     rc;
+       ENTRY;
+
+       if (info->mti_dlm_req)
+               ldlm_request_cancel(req, info->mti_dlm_req, 0);
+
+       if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) ||
+           fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2))
+               RETURN(-EPERM);
+
+       rc = mdt_rename_lock(info, &rename_lh);
+       if (rc != 0) {
+               CERROR("%s: can't lock FS for rename: rc  = %d\n",
+                      mdt_obd_name(info->mti_mdt), rc);
+               RETURN(rc);
+       }
+
+       if (rename)
+               rc = mdt_reint_rename_internal(info, lhc);
+       else
+               rc = mdt_reint_migrate_internal(info, lhc);
+
        if (lustre_handle_is_used(&rename_lh))
                mdt_rename_unlock(&rename_lh);
-       return rc;
+
+       RETURN(rc);
+}
+
+static int mdt_reint_rename(struct mdt_thread_info *info,
+                           struct mdt_lock_handle *lhc)
+{
+       return mdt_reint_rename_or_migrate(info, lhc, true);
+}
+
+static int mdt_reint_migrate(struct mdt_thread_info *info,
+                           struct mdt_lock_handle *lhc)
+{
+       return mdt_reint_rename_or_migrate(info, lhc, false);
 }
 
 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
@@ -1491,7 +1831,8 @@ static mdt_reinter reinters[REINT_MAX] = {
        [REINT_RENAME]   = mdt_reint_rename,
        [REINT_OPEN]     = mdt_reint_open,
        [REINT_SETXATTR] = mdt_reint_setxattr,
-       [REINT_RMENTRY]  = mdt_reint_unlink
+       [REINT_RMENTRY]  = mdt_reint_unlink,
+       [REINT_MIGRATE]   = mdt_reint_migrate,
 };
 
 int mdt_reint_rec(struct mdt_thread_info *info,
index a2bae59..24c9d22 100644 (file)
@@ -253,7 +253,7 @@ static int nidtbl_update_version(const struct lu_env *env,
                GOTO(out_put, rc = PTR_ERR(th));
 
        th->th_sync = 1; /* update table synchronously */
-       rc = dt_declare_record_write(env, fsdb, buf.lb_len, off, th);
+       rc = dt_declare_record_write(env, fsdb, &buf, off, th);
        if (rc)
                GOTO(out, rc);
 
index eed6fd2..18e3d16 100644 (file)
@@ -72,9 +72,8 @@ EXPORT_SYMBOL(linkea_init);
  * Numbers are always big-endian
  * \retval record length
  */
-static int linkea_entry_pack(struct link_ea_entry *lee,
-                            const struct lu_name *lname,
-                            const struct lu_fid *pfid)
+int linkea_entry_pack(struct link_ea_entry *lee, const struct lu_name *lname,
+                     const struct lu_fid *pfid)
 {
        struct lu_fid   tmpfid;
        int             reclen;
@@ -90,6 +89,7 @@ static int linkea_entry_pack(struct link_ea_entry *lee,
        lee->lee_reclen[1] = reclen & 0xff;
        return reclen;
 }
+EXPORT_SYMBOL(linkea_entry_pack);
 
 void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
                         struct lu_name *lname, struct lu_fid *pfid)
index a47d709..ec0c0d6 100644 (file)
@@ -287,8 +287,10 @@ static int llog_osd_declare_write_rec(const struct lu_env *env,
        o = loghandle->lgh_obj;
        LASSERT(o);
 
+       lgi->lgi_buf.lb_len = sizeof(struct llog_log_hdr);
+       lgi->lgi_buf.lb_buf = NULL;
        /* each time we update header */
-       rc = dt_declare_record_write(env, o, sizeof(struct llog_log_hdr), 0,
+       rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0,
                                     th);
        if (rc || idx == 0) /* if error or just header */
                RETURN(rc);
@@ -307,8 +309,10 @@ static int llog_osd_declare_write_rec(const struct lu_env *env,
                lgi->lgi_off = 0;
        }
 
+       lgi->lgi_buf.lb_len = 32 * 1024;
+       lgi->lgi_buf.lb_buf = NULL;
        /* XXX: implement declared window or multi-chunks approach */
-       rc = dt_declare_record_write(env, o, 32 * 1024, lgi->lgi_off, th);
+       rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th);
 
        RETURN(rc);
 }
@@ -909,7 +913,9 @@ static int llog_osd_declare_create(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
-       rc = dt_declare_record_write(env, o, LLOG_CHUNK_SIZE, 0, th);
+       lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE;
+       lgi->lgi_buf.lb_buf = NULL;
+       rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0, th);
        if (rc)
                RETURN(rc);
 
@@ -1302,7 +1308,9 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
-       rc = dt_declare_record_write(env, o, size, lgi->lgi_off, th);
+       lgi->lgi_buf.lb_len = size;
+       lgi->lgi_buf.lb_buf = idarray;
+       rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th);
        if (rc)
                GOTO(out, rc);
 
@@ -1310,8 +1318,6 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
        if (rc)
                GOTO(out_trans, rc);
 
-       lgi->lgi_buf.lb_buf = idarray;
-       lgi->lgi_buf.lb_len = size;
        rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
        if (rc)
                CDEBUG(D_INODE, "error writeing CATALOGS: rc = %d\n", rc);
index 75e36a8..f0856c2 100644 (file)
@@ -229,8 +229,10 @@ int local_object_declare_create(const struct lu_env *env,
        /* update fid generation file */
        if (los != NULL) {
                LASSERT(dt_object_exists(los->los_obj));
+               dti->dti_lb.lb_buf = NULL;
+               dti->dti_lb.lb_len = sizeof(struct los_ondisk);
                rc = dt_declare_record_write(env, los->los_obj,
-                                            sizeof(struct los_ondisk), 0, th);
+                                            &dti->dti_lb, 0, th);
                if (rc)
                        RETURN(rc);
        }
@@ -824,7 +826,13 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
                if (rc)
                        GOTO(out_trans, rc);
 
-               rc = dt_declare_record_write(env, o, sizeof(lastid), 0, th);
+               lastid = cpu_to_le64(first_oid);
+
+               dti->dti_off = 0;
+               dti->dti_lb.lb_buf = &lastid;
+               dti->dti_lb.lb_len = sizeof(lastid);
+               rc = dt_declare_record_write(env, o, &dti->dti_lb, dti->dti_off,
+                                            th);
                if (rc)
                        GOTO(out_trans, rc);
 
@@ -841,11 +849,6 @@ int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
                if (rc)
                        GOTO(out_lock, rc);
 
-               lastid = cpu_to_le64(first_oid);
-
-               dti->dti_off = 0;
-               dti->dti_lb.lb_buf = &lastid;
-               dti->dti_lb.lb_len = sizeof(lastid);
                rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th);
                if (rc)
                        GOTO(out_lock, rc);
index 80a5ae4..fa32ed8 100644 (file)
@@ -57,7 +57,7 @@ int ofd_record_write(const struct lu_env *env, struct ofd_device *ofd,
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
-       rc = dt_declare_record_write(env, dt, buf->lb_len, *off, th);
+       rc = dt_declare_record_write(env, dt, buf, *off, th);
        if (rc == 0) {
                rc = dt_trans_start_local(env, ofd->ofd_osd, th);
                if (rc == 0)
index e3491db..79aad45 100644 (file)
@@ -1113,7 +1113,7 @@ static int ofd_health_check(const struct lu_env *nul, struct obd_device *obd)
                GOTO(out, rc = PTR_ERR(th));
 
        rc = dt_declare_record_write(&env, ofd->ofd_health_check_file,
-                                    info->fti_buf.lb_len, info->fti_off, th);
+                                    &info->fti_buf, info->fti_off, th);
        if (rc == 0) {
                th->th_sync = 1; /* sync IO is needed */
                rc = dt_trans_start_local(&env, ofd->ofd_osd, th);
index 3262722..3917969 100644 (file)
@@ -232,7 +232,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
 
        th->th_sync |= sync;
 
-       rc = dt_declare_record_write(env, oseq->os_lastid_obj, sizeof(tmp),
+       rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
                                     info->fti_off, th);
        if (rc)
                GOTO(trans_stop, rc);
index a8022f0..c6e090d 100644 (file)
@@ -133,10 +133,6 @@ static int osd_object_invariant(const struct lu_object *l)
 /*
  * Concurrency: doesn't matter
  */
-static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
-{
-        return osd_oti_get(env)->oti_r_locks > 0;
-}
 
 /*
  * Concurrency: doesn't matter
@@ -3006,7 +3002,6 @@ static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
 
        LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
-        LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
                 return -EACCES;
@@ -4179,15 +4174,17 @@ struct osd_object *osd_object_find(const struct lu_env *env,
         * in the cache, otherwise lu_object_alloc() crashes
         * -bzzz
         */
-       luch = lu_object_find_at(env, ludev, fid, NULL);
-        if (!IS_ERR(luch)) {
-                if (lu_object_exists(luch)) {
-                        lo = lu_object_locate(luch->lo_header, ludev->ld_type);
-                        if (lo != NULL)
-                                child = osd_obj(lo);
-                        else
-                                LU_OBJECT_DEBUG(D_ERROR, env, luch,
-                                                "lu_object can't be located"
+       luch = lu_object_find_at(env, ludev->ld_site->ls_top_dev == NULL ?
+                                ludev : ludev->ld_site->ls_top_dev,
+                                fid, NULL);
+       if (!IS_ERR(luch)) {
+               if (lu_object_exists(luch)) {
+                       lo = lu_object_locate(luch->lo_header, ludev->ld_type);
+                       if (lo != NULL)
+                               child = osd_obj(lo);
+                       else
+                               LU_OBJECT_DEBUG(D_ERROR, env, luch,
+                                               "lu_object can't be located"
                                                DFID"\n", PFID(fid));
 
                         if (child == NULL) {
@@ -4232,7 +4229,7 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
        int                     rc;
        ENTRY;
 
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
+       LASSERT(!dt_object_remote(dt));
        LASSERT(handle != NULL);
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
index 374e5bd..8600bfa 100644 (file)
@@ -1315,8 +1315,8 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
 }
 
 static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
-                                 const loff_t size, loff_t pos,
-                                 struct thandle *handle)
+                                const struct lu_buf *buf, loff_t pos,
+                                struct thandle *handle)
 {
         struct osd_thandle *oh;
         int                 credits;
index d0c0fa2..c53d6f8 100644 (file)
@@ -897,7 +897,7 @@ static int truncate_quota_index(const struct lu_env *env, struct dt_object *dt,
        inode = osd_dt_obj(dt)->oo_inode;
        LASSERT(inode);
 
-       rc = dt_declare_record_write(env, dt, inode->i_sb->s_blocksize * 2, 0, th);
+       rc = dt_declare_record_write(env, dt, NULL, 0, th);
        if (rc)
                GOTO(out, rc);
 
index 6745ea5..1590e3d 100644 (file)
@@ -108,7 +108,7 @@ static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
 }
 
 static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
-                               const loff_t size, loff_t pos,
+                               const struct lu_buf *buf, loff_t pos,
                                struct thandle *th)
 {
        struct osd_object  *obj  = osd_dt_obj(dt);
@@ -137,7 +137,7 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
                dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
        }
 
-       dmu_tx_hold_write(oh->ot_tx, oid, pos, size);
+       dmu_tx_hold_write(oh->ot_tx, oid, pos, buf->lb_len);
 
        /* dt_declare_write() is usually called for system objects, such
         * as llog or last_rcvd files. We needn't enforce quota on those
index d27146d..d10f875 100644 (file)
@@ -140,7 +140,7 @@ static int osp_write_local_file(const struct lu_env *env,
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
-       rc = dt_declare_record_write(env, dt_obj, buf->lb_len, offset, th);
+       rc = dt_declare_record_write(env, dt_obj, buf, offset, th);
        if (rc)
                GOTO(out, rc);
        rc = dt_trans_start_local(env, osp->opd_storage, th);
index 78c9bbc..d405176 100644 (file)
@@ -251,6 +251,7 @@ struct osp_object {
 extern struct lu_object_operations osp_lu_obj_ops;
 extern const struct dt_device_operations osp_dt_ops;
 extern struct dt_object_operations osp_md_obj_ops;
+extern struct dt_body_operations osp_md_body_ops;
 
 struct osp_thread_info {
        struct lu_buf            osi_lb;
index 6a265dd..a4299e7 100644 (file)
@@ -222,7 +222,6 @@ static void osp_md_ah_init(const struct lu_env *env,
 {
        LASSERT(ah);
 
-       memset(ah, 0, sizeof(*ah));
        ah->dah_parent = parent;
        ah->dah_mode = child_mode;
 }
@@ -673,3 +672,47 @@ struct dt_object_operations osp_md_obj_ops = {
        .do_object_lock       = osp_md_object_lock,
        .do_object_unlock     = osp_md_object_unlock,
 };
+
+static ssize_t osp_md_declare_write(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   const struct lu_buf *buf,
+                                   loff_t pos, struct thandle *th)
+{
+       struct dt_update_request  *update;
+       struct lu_fid             *fid;
+       int                       sizes[2] = {buf->lb_len, sizeof(pos)};
+       const char                *bufs[2] = {(char *)buf->lb_buf,
+                                             (char *)&pos};
+       ssize_t                   rc;
+
+       update = out_find_create_update_loc(th, dt);
+       if (IS_ERR(update)) {
+               CERROR("%s: Get OSP update buf failed: rc = %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name,
+                      (int)PTR_ERR(update));
+               return PTR_ERR(update);
+       }
+
+       pos = cpu_to_le64(pos);
+       bufs[1] = (char *)&pos;
+       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+       rc = out_insert_update(env, update, OUT_WRITE, fid,
+                              ARRAY_SIZE(sizes), sizes, bufs);
+
+       return rc;
+
+}
+
+static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
+                           const struct lu_buf *buf, loff_t *pos,
+                           struct thandle *handle,
+                           struct lustre_capa *capa, int ignore_quota)
+{
+       return buf->lb_len;
+}
+
+/* These body operation will be used to write symlinks during migration etc */
+struct dt_body_operations osp_md_body_ops = {
+       .dbo_declare_write      = osp_md_declare_write,
+       .dbo_write              = osp_md_write,
+};
index 9aed2c2..78e2250 100644 (file)
@@ -906,9 +906,10 @@ static int osp_declare_object_create(const struct lu_env *env,
        if (unlikely(!fid_is_zero(fid))) {
                /* replay case: caller knows fid */
                osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
+               osi->osi_lb.lb_len = sizeof(osi->osi_id);
+               osi->osi_lb.lb_buf = NULL;
                rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
-                                            sizeof(osi->osi_id), osi->osi_off,
-                                            th);
+                                            &osi->osi_lb, osi->osi_off, th);
                RETURN(rc);
        }
 
@@ -930,9 +931,10 @@ static int osp_declare_object_create(const struct lu_env *env,
 
                /* common for all OSPs file hystorically */
                osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
+               osi->osi_lb.lb_len = sizeof(osi->osi_id);
+               osi->osi_lb.lb_buf = NULL;
                rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
-                                            sizeof(osi->osi_id), osi->osi_off,
-                                            th);
+                                            &osi->osi_lb, osi->osi_off, th);
        } else {
                /* not needed in the cache anymore */
                set_bit(LU_OBJECT_HEARD_BANSHEE,
@@ -1521,6 +1523,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o,
                struct lu_attr *la = &osp_env_info(env)->osi_attr;
 
                po->opo_obj.do_ops = &osp_md_obj_ops;
+               po->opo_obj.do_body_ops = &osp_md_body_ops;
                rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
                                                     la, NULL);
                if (rc == 0)
index 922af56..6eb22c7 100644 (file)
@@ -288,12 +288,12 @@ int osp_write_last_oid_seq_files(struct lu_env *env, struct osp_device *osp,
 
        th->th_sync |= sync;
        rc = dt_declare_record_write(env, osp->opd_last_used_oid_file,
-                                    lb_oid->lb_len, oid_off, th);
+                                    lb_oid, oid_off, th);
        if (rc != 0)
                GOTO(out, rc);
 
        rc = dt_declare_record_write(env, osp->opd_last_used_seq_file,
-                                    lb_oseq->lb_len, oseq_off, th);
+                                    lb_oseq, oseq_off, th);
        if (rc != 0)
                GOTO(out, rc);
 
index 896f2d0..5aa0fb6 100644 (file)
@@ -349,6 +349,7 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
        int rc = 0;
 
        LASSERT(tu != NULL);
+        LASSERT(tu != LP_POISON);
        /* Check whether there are updates related with this OSP */
        dt_update = out_find_update(tu, dt);
        if (dt_update == NULL) {
index 1a1b767..42d24d5 100644 (file)
@@ -197,7 +197,9 @@ void lustre_assert_wire_constants(void)
                 (long long)REINT_SETXATTR);
        LASSERTF(REINT_RMENTRY == 8, "found %lld\n",
                 (long long)REINT_RMENTRY);
-       LASSERTF(REINT_MAX == 9, "found %lld\n",
+       LASSERTF(REINT_MIGRATE == 9, "found %lld\n",
+                (long long)REINT_MIGRATE);
+       LASSERTF(REINT_MAX == 10, "found %lld\n",
                 (long long)REINT_MAX);
        LASSERTF(DISP_IT_EXECD == 0x00000001UL, "found 0x%.8xUL\n",
                (unsigned)DISP_IT_EXECD);
index 11be1cd..74e8f47 100644 (file)
@@ -58,96 +58,6 @@ struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
        return &ta->ta_args[i];
 }
 
-static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
-                       struct thandle_exec_args *ta, struct obd_export *exp)
-{
-       memset(ta, 0, sizeof(*ta));
-       ta->ta_handle = dt_trans_create(env, dt);
-       if (IS_ERR(ta->ta_handle)) {
-               int rc;
-
-               CERROR("%s: start handle error: rc = %ld\n",
-                      dt_obd_name(dt), PTR_ERR(ta->ta_handle));
-               rc = PTR_ERR(ta->ta_handle);
-               ta->ta_handle = NULL;
-               return rc;
-       }
-       ta->ta_dev = dt;
-       if (exp->exp_need_sync)
-               ta->ta_handle->th_sync = 1;
-
-       return 0;
-}
-
-static int out_trans_start(const struct lu_env *env,
-                          struct thandle_exec_args *ta)
-{
-       return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
-}
-
-static int out_trans_stop(const struct lu_env *env,
-                         struct thandle_exec_args *ta, int err)
-{
-       int i;
-       int rc;
-
-       ta->ta_handle->th_result = err;
-       rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
-       for (i = 0; i < ta->ta_argno; i++) {
-               if (ta->ta_args[i].object != NULL) {
-                       lu_object_put(env, &ta->ta_args[i].object->do_lu);
-                       ta->ta_args[i].object = NULL;
-               }
-       }
-
-       return rc;
-}
-
-int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
-{
-       struct tgt_session_info *tsi = tgt_ses_info(env);
-       int i = 0, rc;
-
-       LASSERT(ta->ta_dev);
-       LASSERT(ta->ta_handle);
-
-       if (ta->ta_err != 0 || ta->ta_argno == 0)
-               GOTO(stop, rc = ta->ta_err);
-
-       rc = out_trans_start(env, ta);
-       if (unlikely(rc))
-               GOTO(stop, rc);
-
-       for (i = 0; i < ta->ta_argno; i++) {
-               rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
-                                           &ta->ta_args[i]);
-               if (unlikely(rc)) {
-                       CDEBUG(D_INFO, "error during execution of #%u from"
-                              " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
-                              ta->ta_args[i].line, rc);
-                       while (--i >= 0) {
-                               LASSERTF(ta->ta_args[i].undo_fn != NULL,
-                                   "can't undo changes, hope for failover!\n");
-                               ta->ta_args[i].undo_fn(env, ta->ta_handle,
-                                                      &ta->ta_args[i]);
-                       }
-                       break;
-               }
-       }
-
-       /* Only fail for real update */
-       tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
-stop:
-       CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
-              dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
-       out_trans_stop(env, ta, rc);
-       ta->ta_handle = NULL;
-       ta->ta_argno = 0;
-       ta->ta_err = 0;
-
-       RETURN(rc);
-}
-
 static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
                            struct dt_object *obj,
                            struct object_update_reply *reply,
@@ -436,8 +346,15 @@ static int out_attr_get(struct tgt_session_info *tsi)
 
        ENTRY;
 
-       if (!lu_object_exists(&obj->do_lu))
+       if (!lu_object_exists(&obj->do_lu)) {
+               /* Usually, this will be called when the master MDT try
+                * to init a remote object(see osp_object_init), so if
+                * the object does not exist on slave, we need set BANSHEE flag,
+                * so the object can be removed from the cache immediately */
+               set_bit(LU_OBJECT_HEARD_BANSHEE,
+                       &obj->do_lu.lo_header->loh_flags);
                RETURN(-ENOENT);
+       }
 
        dt_read_lock(env, obj, MOR_TGT_CHILD);
        rc = dt_attr_get(env, obj, la, NULL);
@@ -617,6 +534,9 @@ static int out_tx_xattr_set_exec(const struct lu_env *env,
               dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
               arg->u.xattr_set.name, arg->u.xattr_set.flags);
 
+       if (!lu_object_exists(&dt_obj->do_lu))
+               GOTO(out, rc = -ENOENT);
+
        dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
        rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
                          arg->u.xattr_set.name, arg->u.xattr_set.flags,
@@ -627,7 +547,7 @@ static int out_tx_xattr_set_exec(const struct lu_env *env,
         **/
        if (unlikely(rc && !strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK)))
                rc = 0;
-
+out:
        CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
               dt_obd_name(th->th_dev), arg->reply, arg->index, rc);
 
@@ -940,17 +860,16 @@ static int __out_tx_index_insert(const struct lu_env *env,
 
        LASSERT(ta->ta_handle != NULL);
 
-       if (lu_object_exists(&dt_obj->do_lu)) {
-               if (dt_try_as_dir(env, dt_obj) == 0) {
-                       ta->ta_err = -ENOTDIR;
-                       return ta->ta_err;
-               }
-               ta->ta_err = dt_declare_insert(env, dt_obj,
-                                              (struct dt_rec *)fid,
-                                              (struct dt_key *)name,
-                                              ta->ta_handle);
+       if (dt_try_as_dir(env, dt_obj) == 0) {
+               ta->ta_err = -ENOTDIR;
+               return ta->ta_err;
        }
 
+       ta->ta_err = dt_declare_insert(env, dt_obj,
+                                      (struct dt_rec *)fid,
+                                      (struct dt_key *)name,
+                                      ta->ta_handle);
+
        if (ta->ta_err != 0)
                return ta->ta_err;
 
@@ -1163,6 +1082,91 @@ static int out_destroy(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 
+static int out_tx_write_exec(const struct lu_env *env, struct thandle *th,
+                            struct tx_arg *arg)
+{
+       struct dt_object *dt_obj = arg->object;
+       int rc;
+
+       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+       rc = dt_record_write(env, dt_obj, &arg->u.write.buf,
+                            &arg->u.write.pos, th);
+       dt_write_unlock(env, dt_obj);
+
+       if (rc == 0)
+               rc = arg->u.write.buf.lb_len;
+
+       object_update_result_insert(arg->reply, NULL, 0, arg->index, rc);
+
+       return rc > 0 ? 0 : rc;
+}
+
+static int __out_tx_write(const struct lu_env *env,
+                         struct dt_object *dt_obj,
+                         const struct lu_buf *buf,
+                         loff_t pos, struct thandle_exec_args *ta,
+                         struct object_update_reply *reply,
+                         int index, char *file, int line)
+{
+       struct tx_arg   *arg;
+
+       LASSERT(ta->ta_handle != NULL);
+       ta->ta_err = dt_declare_record_write(env, dt_obj, buf, pos,
+                                            ta->ta_handle);
+       if (ta->ta_err != 0)
+               return ta->ta_err;
+
+       arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line);
+       LASSERT(arg);
+       lu_object_get(&dt_obj->do_lu);
+       arg->object = dt_obj;
+       arg->u.write.buf = *buf;
+       arg->u.write.pos = pos;
+       arg->reply = reply;
+       arg->index = index;
+       return 0;
+}
+
+static int out_write(struct tgt_session_info *tsi)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(tsi->tsi_env);
+       struct object_update    *update = tti->tti_u.update.tti_update;
+       struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
+       struct lu_buf           *lbuf = &tti->tti_buf;
+       char                    *buf;
+       char                    *tmp;
+       int                     buf_len = 0;
+       loff_t                  pos;
+       int                      rc;
+       ENTRY;
+
+       buf = object_update_param_get(update, 0, &buf_len);
+       if (buf == NULL || buf_len == 0) {
+               CERROR("%s: empty buf for xattr set: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+       lbuf->lb_buf = buf;
+       lbuf->lb_len = buf_len;
+
+       tmp = (char *)object_update_param_get(update, 1, NULL);
+       if (tmp == NULL) {
+               CERROR("%s: empty flag for xattr set: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), -EPROTO);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+               __swab64s((__u64 *)tmp);
+       pos = *(loff_t *)tmp;
+
+       rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos,
+                         &tti->tti_tea,
+                         tti->tti_u.update.tti_update_reply,
+                         tti->tti_u.update.tti_update_reply_index);
+       RETURN(rc);
+}
+
 #define DEF_OUT_HNDL(opc, name, flags, fn)     \
 [opc - OUT_CREATE] = {                                 \
        .th_name    = name,                             \
@@ -1198,6 +1202,7 @@ static struct tgt_handler out_update_ops[] = {
                     MUTABOR | HABEO_REFERO, out_index_insert),
        DEF_OUT_HNDL(OUT_INDEX_DELETE, "out_index_delete",
                     MUTABOR | HABEO_REFERO, out_index_delete),
+       DEF_OUT_HNDL(OUT_WRITE, "out_write", MUTABOR | HABEO_REFERO, out_write),
 };
 
 struct tgt_handler *out_handler_find(__u32 opc)
@@ -1215,6 +1220,111 @@ struct tgt_handler *out_handler_find(__u32 opc)
        return h;
 }
 
+static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
+                       struct thandle_exec_args *ta, struct obd_export *exp)
+{
+       memset(ta, 0, sizeof(*ta));
+       ta->ta_handle = dt_trans_create(env, dt);
+       if (IS_ERR(ta->ta_handle)) {
+               int rc;
+
+               rc = PTR_ERR(ta->ta_handle);
+               ta->ta_handle = NULL;
+               CERROR("%s: start handle error: rc = %d\n",
+                      dt_obd_name(dt), rc);
+               return rc;
+       }
+       ta->ta_dev = dt;
+       if (exp->exp_need_sync)
+               ta->ta_handle->th_sync = 1;
+
+       return 0;
+}
+
+static int out_trans_start(const struct lu_env *env,
+                          struct thandle_exec_args *ta)
+{
+       return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
+}
+
+static int out_trans_stop(const struct lu_env *env,
+                         struct thandle_exec_args *ta, int err)
+{
+       int i;
+       int rc;
+
+       ta->ta_handle->th_result = err;
+       rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
+       for (i = 0; i < ta->ta_argno; i++) {
+               if (ta->ta_args[i].object != NULL) {
+                       struct dt_object *obj = ta->ta_args[i].object;
+
+                       /* If the object is being created during this
+                        * transaction, we need to remove them from the
+                        * cache immediately, because a few layers are
+                        * missing in OUT handler, i.e. the object might
+                        * not be initialized in all layers */
+                       if (ta->ta_args[i].exec_fn == out_tx_create_exec)
+                               set_bit(LU_OBJECT_HEARD_BANSHEE,
+                                       &obj->do_lu.lo_header->loh_flags);
+                       lu_object_put(env, &ta->ta_args[i].object->do_lu);
+                       ta->ta_args[i].object = NULL;
+               }
+       }
+
+       return rc;
+}
+
+int out_tx_end(const struct lu_env *env, struct thandle_exec_args *ta)
+{
+       struct tgt_session_info *tsi = tgt_ses_info(env);
+       int i = 0, rc;
+
+       LASSERT(ta->ta_dev);
+       LASSERT(ta->ta_handle);
+
+       if (ta->ta_err != 0 || ta->ta_argno == 0)
+               GOTO(stop, rc = ta->ta_err);
+
+       rc = out_trans_start(env, ta);
+       if (unlikely(rc))
+               GOTO(stop, rc);
+
+       for (i = 0; i < ta->ta_argno; i++) {
+               rc = ta->ta_args[i].exec_fn(env, ta->ta_handle,
+                                           &ta->ta_args[i]);
+               if (unlikely(rc != 0)) {
+                       CDEBUG(D_INFO, "error during execution of #%u from"
+                              " %s:%d: rc = %d\n", i, ta->ta_args[i].file,
+                              ta->ta_args[i].line, rc);
+                       while (--i >= 0) {
+                               if (ta->ta_args[i].undo_fn != NULL)
+                                       ta->ta_args[i].undo_fn(env,
+                                                              ta->ta_handle,
+                                                             &ta->ta_args[i]);
+                               else
+                                       CERROR("%s: undo for %s:%d: rc = %d\n",
+                                              dt_obd_name(ta->ta_dev),
+                                              ta->ta_args[i].file,
+                                              ta->ta_args[i].line, -ENOTSUPP);
+                       }
+                       break;
+               }
+       }
+
+       /* Only fail for real update */
+       tsi->tsi_reply_fail_id = OBD_FAIL_OUT_UPDATE_NET_REP;
+stop:
+       CDEBUG(D_INFO, "%s: executed %u/%u: rc = %d\n",
+              dt_obd_name(ta->ta_dev), i, ta->ta_argno, rc);
+       out_trans_stop(env, ta, rc);
+       ta->ta_handle = NULL;
+       ta->ta_argno = 0;
+       ta->ta_err = 0;
+
+       RETURN(rc);
+}
+
 /**
  * Object updates between Targets. Because all the updates has been
  * dis-assemblied into object updates at sender side, so OUT will
@@ -1314,7 +1424,7 @@ int out_handle(struct tgt_session_info *tsi)
                        /* Stop the current update transaction,
                         * create a new one */
                        rc = out_tx_end(env, ta);
-                       if (rc != 0)
+                       if (rc < 0)
                                RETURN(rc);
 
                        rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
index 070ec04..14ce886 100644 (file)
@@ -202,6 +202,9 @@ int out_handle(struct tgt_session_info *tsi);
 #define out_tx_destroy(info, obj, th, reply, idx) \
        __out_tx_destroy(info, obj, th, reply, idx, __FILE__, __LINE__)
 
+#define out_tx_write(info, obj, buf, pos, th, reply, idx) \
+       __out_tx_write(info, obj, buf, pos, th, reply, idx, __FILE__, __LINE__)
+
 extern struct page *tgt_page_to_corrupt;
 
 struct tgt_thread_big_cache {
index 3264c1f..53826ca 100644 (file)
@@ -162,8 +162,9 @@ int tgt_client_data_update(const struct lu_env *env, struct obd_export *exp)
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
+       tti_buf_lcd(tti);
        rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
-                                    sizeof(struct lsd_client_data),
+                                    &tti->tti_buf,
                                     ted->ted_lr_off, th);
        if (rc)
                GOTO(out, rc);
@@ -269,9 +270,9 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
 
        th->th_sync = sync;
 
+       tti_buf_lsd(tti);
        rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
-                                    sizeof(struct lr_server_data),
-                                    tti->tti_off, th);
+                                    &tti->tti_buf, tti->tti_off, th);
        if (rc)
                GOTO(out, rc);
 
@@ -1170,6 +1171,7 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
 {
        struct lu_target        *tgt = cookie;
        struct tgt_session_info *tsi;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
        int                      rc;
 
        /* if there is no session, then this transaction is not result of
@@ -1183,15 +1185,17 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
        if (tsi->tsi_exp == NULL)
                return 0;
 
+       tti_buf_lcd(tti);
        rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
-                                    sizeof(struct lsd_client_data),
+                                    &tti->tti_buf,
                                     tsi->tsi_exp->exp_target_data.ted_lr_off,
                                     th);
        if (rc)
                return rc;
 
+       tti_buf_lsd(tti);
        rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
-                                    sizeof(struct lr_server_data), 0, th);
+                                    &tti->tti_buf, 0, th);
        if (rc)
                return rc;
 
index c121790..27f6e62 100644 (file)
@@ -43,7 +43,7 @@ nobase_noinst_SCRIPTS += acl/make-tree acl/run cfg/ncli.sh
 nobase_noinst_SCRIPTS += racer/dir_create.sh racer/file_create.sh racer/file_list.sh
 nobase_noinst_SCRIPTS += racer/file_rm.sh racer/racer.sh racer/file_concat.sh racer/file_exec.sh
 nobase_noinst_SCRIPTS += racer/file_link.sh racer/file_rename.sh racer/file_symlink.sh
-nobase_noinst_SCRIPTS += racer/dir_remote.sh
+nobase_noinst_SCRIPTS += racer/dir_remote.sh racer/dir_migrate.sh
 nobase_noinst_SCRIPTS += rmtacl/make-tree rmtacl/run
 nobase_noinst_SCRIPTS += posix/posix.cfg
 nobase_noinst_DATA = acl/cp.test acl/getfacl-noacl.test acl/inheritance.test
diff --git a/lustre/tests/racer/dir_migrate.sh b/lustre/tests/racer/dir_migrate.sh
new file mode 100755 (executable)
index 0000000..190da48
--- /dev/null
@@ -0,0 +1,15 @@
+#!/bin/bash
+
+DIR=$1
+MAX=$2
+
+MDTCOUNT=${MDSCOUNT:-$(lfs df $DIR 2> /dev/null | grep -c MDT)}
+while /bin/true ; do
+       migrate_dir=$((RANDOM % MAX))
+       file=$((RANDOM % MAX))
+       mdt_idx=$((RANDOM % MDTCOUNT))
+       mkdir -p $DIR/$migrate_dir 2> /dev/null
+       lfs mv -M$mdt_idx $DIR/$migrate_dir 2> /dev/null
+       touch $DIR/$migrate_dir/$file 2> /dev/null
+       $LFS getdirstripe $DIR/$migrate_dir > /dev/null 2>&1
+done
index b2d28fb..77f9e58 100755 (executable)
@@ -8,8 +8,8 @@ while /bin/true ; do
        remote_dir=$((RANDOM % MAX))
        file=$((RANDOM % MAX))
        mdt_idx=$((RANDOM % MDTCOUNT))
-       mkdir -p $DIR
-       lfs mkdir -i$mdt_idx -c$MDTCOUNT $DIR/$remote_dir 2> /dev/null
-       echo "abcd" > $DIR/$remote_dir/$file 2> /dev/null
-       $LFS getdirstripe $DIR/$remote_dir 2> /dev/null
+       mkdir -p $DIR 2> /dev/null
+       $LFS mkdir -i$mdt_idx -c$MDTCOUNT $DIR/$remote_dir 2> /dev/null
+       touch $DIR/$remote_dir/$file 2> /dev/null
+       $LFS getdirstripe $DIR/$remote_dir > /dev/null 2>&1
 done
index 5fad750..6ba8b7c 100755 (executable)
@@ -16,7 +16,7 @@ RACER_PROGS="file_create dir_create file_rm file_rename file_link file_symlink \
 file_list file_concat file_exec"
 
 if [ $MDSCOUNT -gt 1 ]; then
-       RACER_PROGS="${RACER_PROGS} dir_remote"
+       RACER_PROGS="${RACER_PROGS} dir_remote dir_migrate"
 fi
 
 racer_cleanup()
index 4bc7301..33c7459 100755 (executable)
@@ -1773,6 +1773,30 @@ test_110f () {
 }
 run_test 110f "remove remote directory: drop slave rep"
 
+test_110g () {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+       local remote_dir=$DIR/$tdir/remote_dir
+       local MDTIDX=1
+
+       mkdir -p $remote_dir
+
+       createmany -o $remote_dir/f 5000
+
+       #define OBD_FAIL_MIGRATE_NET_REP        0x1702
+       do_facet mds$MDTIDX lctl set_param fail_loc=0x1702
+       $LFS mv -M $MDTIDX $remote_dir || error "migrate failed"
+       do_facet mds$MDTIDX lctl set_param fail_loc=0x0
+
+       for file in $(find $remote_dir); do
+               mdt_index=$($LFS getstripe -M $file)
+               [ $mdt_index == $MDTIDX ] ||
+                       error "$file is not on MDT${MDTIDX}"
+       done
+
+       rm -rf $DIR/$tdir || error "rmdir failed"
+}
+run_test 110g "drop reply during migration"
+
 # LU-2844 mdt prepare fail should not cause umount oops
 test_111 ()
 {
index 5f5e8b2..5efd974 100644 (file)
@@ -58,7 +58,7 @@ init_test_env $@
 . ${CONFIG:=$LUSTRE/tests/cfg/${NAME}.sh}
 init_logging
 
-[ "$SLOW" = "no" ] && EXCEPT_SLOW="24o 27m 64b 68 71 77f 78 115 124b"
+[ "$SLOW" = "no" ] && EXCEPT_SLOW="24o 27m 64b 68 71 77f 78 115 124b 230d"
 
 [ $(facet_fstype $SINGLEMDS) = "zfs" ] &&
 # bug number for skipped test:        LU-1593 LU-2610 LU-2833 LU-1957 LU-2805
@@ -650,14 +650,35 @@ test_17n() {
                        error "create files under remote dir failed $i"
        done
 
-       check_fs_consistency_17n || error "e2fsck report error"
+       check_fs_consistency_17n ||
+               error "e2fsck report error after create files under remote dir"
 
        for ((i=0;i<10;i++)); do
                rm -rf $DIR/$tdir/remote_dir_${i} ||
                        error "destroy remote dir error $i"
        done
 
-       check_fs_consistency_17n || error "e2fsck report error"
+       check_fs_consistency_17n ||
+               error "e2fsck report error after unlink files under remote dir"
+
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.4.50) ] &&
+               skip "lustre < 2.4.50 does not support migrate mv " && return
+
+       for ((i=0; i<10; i++)); do
+               mkdir -p $DIR/$tdir/remote_dir_${i}
+               createmany -o $DIR/$tdir/remote_dir_${i}/f 10 ||
+                       error "create files under remote dir failed $i"
+               $LFS mv -M 1 $DIR/$tdir/remote_dir_${i} ||
+                       error "migrate remote dir error $i"
+       done
+       check_fs_consistency_17n || error "e2fsck report error after migration"
+
+       for ((i=0;i<10;i++)); do
+               rm -rf $DIR/$tdir/remote_dir_${i} ||
+                       error "destroy remote dir error $i"
+       done
+
+       check_fs_consistency_17n || error "e2fsck report error after unlink"
 }
 run_test 17n "run e2fsck against master/slave MDT which contains remote dir"
 
@@ -11805,6 +11826,202 @@ test_230a() {
 }
 run_test 230a "Create remote directory and files under the remote directory"
 
+test_230b() {
+       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       local MDTIDX=1
+       local mdt_index
+       local i
+       local file
+       local pid
+       local stripe_count
+       local migrate_dir=$DIR/$tdir/migrate_dir
+       local other_dir=$DIR/$tdir/other_dir
+
+       mkdir -p $migrate_dir
+       mkdir -p $other_dir
+       for ((i=0; i<10; i++)); do
+               mkdir -p $migrate_dir/dir_${i}
+               createmany -o $migrate_dir/dir_${i}/f 10 ||
+                       error "create files under remote dir failed $i"
+       done
+
+       cp /etc/passwd $migrate_dir/$tfile
+       cp /etc/passwd $other_dir/$tfile
+       mkdir -p $migrate_dir/dir_default_stripe2
+       $LFS setstripe -c 2 $migrate_dir/dir_default_stripe2
+       $LFS setstripe -c 2 $migrate_dir/${tfile}_stripe2
+
+       mkdir -p $other_dir
+       ln $migrate_dir/$tfile $other_dir/luna
+       ln $migrate_dir/$tfile $migrate_dir/sofia
+       ln $other_dir/$tfile $migrate_dir/david
+       ln -s $migrate_dir/$tfile $other_dir/zachary
+       ln -s $migrate_dir/$tfile $migrate_dir/${tfile}_ln
+       ln -s $other_dir/$tfile $migrate_dir/${tfile}_ln_other
+
+       $LFS mv -v -M $MDTIDX $migrate_dir ||
+                       error "migrate remote dir error"
+
+       echo "migratate to MDT1, then checking.."
+       for ((i=0; i<10; i++)); do
+               for file in $(find $migrate_dir/dir_${i}); do
+                       mdt_index=$($LFS getstripe -M $file)
+                       [ $mdt_index == $MDTIDX ] ||
+                               error "$file is not on MDT${MDTIDX}"
+               done
+       done
+
+       # the multiple link file should still in MDT0
+       mdt_index=$($LFS getstripe -M $migrate_dir/$tfile)
+       [ $mdt_index == 0 ] ||
+               error "$file is not on MDT${MDTIDX}"
+
+       diff /etc/passwd $migrate_dir/$tfile ||
+               error "$tfile different after migration"
+
+       diff /etc/passwd $other_dir/luna ||
+               error "luna different after migration"
+
+       diff /etc/passwd $migrate_dir/sofia ||
+               error "sofia different after migration"
+
+       diff /etc/passwd $migrate_dir/david ||
+               error "david different after migration"
+
+       diff /etc/passwd $other_dir/zachary ||
+               error "zachary different after migration"
+
+       diff /etc/passwd $migrate_dir/${tfile}_ln ||
+               error "${tfile}_ln different after migration"
+
+       diff /etc/passwd $migrate_dir/${tfile}_ln_other ||
+               error "${tfile}_ln_other different after migration"
+
+       stripe_count=$($LFS getstripe -c $migrate_dir/dir_default_stripe2)
+       [ $stripe_count = 2 ] ||
+                       error "dir strpe_count $d != 2 after migration."
+
+       stripe_count=$($LFS getstripe -c $migrate_dir/${tfile}_stripe2)
+       [ $stripe_count = 2 ] ||
+                       error "file strpe_count $d != 2 after migration."
+
+       #migrate back to MDT0
+       MDTIDX=0
+       $LFS mv -v -M $MDTIDX $migrate_dir ||
+                       error "migrate remote dir error"
+
+       echo "migrate back to MDT0, checking.."
+       for file in $(find $migrate_dir); do
+               mdt_index=$($LFS getstripe -M $file)
+               [ $mdt_index == $MDTIDX ] ||
+                       error "$file is not on MDT${MDTIDX}"
+       done
+
+       diff /etc/passwd ${migrate_dir}/$tfile ||
+               error "$tfile different after migration"
+
+       diff /etc/passwd ${other_dir}/luna ||
+               error "luna different after migration"
+
+       diff /etc/passwd ${migrate_dir}/sofia ||
+               error "sofia different after migration"
+
+       diff /etc/passwd ${other_dir}/zachary ||
+               error "zachary different after migration"
+
+       diff /etc/passwd $migrate_dir/${tfile}_ln ||
+               error "${tfile}_ln different after migration"
+
+       diff /etc/passwd $migrate_dir/${tfile}_ln_other ||
+               error "${tfile}_ln_other different after migration"
+
+       stripe_count=$($LFS getstripe -c ${migrate_dir}/dir_default_stripe2)
+       [ $stripe_count = 2 ] ||
+               error "dir strpe_count $d != 2 after migration."
+
+       stripe_count=$($LFS getstripe -c ${migrate_dir}/${tfile}_stripe2)
+       [ $stripe_count = 2 ] ||
+               error "file strpe_count $d != 2 after migration."
+
+       rm -rf $DIR/$tdir || error "rm dir failed after migration"
+}
+run_test 230b "migrate directory"
+
+test_230c() {
+       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       local MDTIDX=1
+       local mdt_index
+       local file
+
+       #If migrating directory fails in the middle, all entries of
+       #the directory is still accessiable.
+       mkdir -p $DIR/$tdir
+       stat $DIR/$tdir
+       createmany -o $DIR/$tdir/f 10 ||
+               error "create files under ${tdir} failed"
+
+       #failed after migrating 5 entries
+       #OBD_FAIL_MIGRATE_ENTRIES       0x1801
+       do_facet mds1 lctl set_param fail_loc=0x20001801
+       do_facet mds1 lctl  set_param fail_val=5
+       local t=`ls $DIR/$tdir | wc -l`
+       $LFS mv -M $MDTIDX $DIR/$tdir &&
+               error "migrate should failed after 5 entries"
+       local u=`ls $DIR/$tdir | wc -l`
+       [ "$u" == "$t" ] || error "$u != $t during migration"
+
+       for file in $(find $DIR/$tdir); do
+               stat $file || error "stat $file failed"
+       done
+
+       do_facet mds1 lctl set_param fail_loc=0
+       do_facet mds1 lctl set_param fail_val=0
+
+       $LFS mv -M $MDTIDX $DIR/$tdir ||
+               error "migrate open files should failed with open files"
+
+       echo "Finish migration, then checking.."
+       for file in $(find $DIR/$tdir); do
+               mdt_index=$($LFS getstripe -M $file)
+               [ $mdt_index == $MDTIDX ] ||
+                       error "$file is not on MDT${MDTIDX}"
+       done
+
+       rm -rf $DIR/$tdir || error "rm dir failed after migration"
+}
+run_test 230c "check directory accessiblity if migration is failed"
+
+test_230d() {
+       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       local MDTIDX=1
+       local mdt_index
+       local i
+       local j
+
+       mkdir -p $DIR/$tdir
+
+       for ((i=0; i<100; i++)); do
+               mkdir -p $DIR/$tdir/dir_${i}
+               createmany -o $DIR/$tdir/dir_${i}/f 100 ||
+                       error "create files under remote dir failed $i"
+       done
+
+       $LFS mv -M $MDTIDX -v $DIR/$tdir || error "migrate remote dir error"
+
+       echo "Finish migration, then checking.."
+       for file in $(find $DIR/$tdir); do
+               mdt_index=$($LFS getstripe -M $file)
+               [ $mdt_index == $MDTIDX ] ||
+                       error "$file is not on MDT${MDTIDX}"
+       done
+
+       rm -rf $DIR/$tdir || error "rm dir failed after migration"
+}
+run_test 230d "check migrate big directory"
+
 test_231a()
 {
        # For simplicity this test assumes that max_pages_per_rpc
index 09524f8..e37c56a 100644 (file)
@@ -2623,6 +2623,48 @@ test_76() { #LU-946
 }
 run_test 76 "Verify open file for 2048 files"
 
+test_80() {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       local MDTIDX=1
+       local mdt_index
+       local i
+       local file
+       local pid
+
+       mkdir -p $DIR1/$tdir/dir
+       createmany -o $DIR1/$tdir/dir/f 10 ||
+               error "create files under remote dir failed $i"
+
+       cp /etc/passwd $DIR1/$tdir/$tfile
+
+       #migrate open file should fails
+       multiop_bg_pause $DIR2/$tdir/$tfile O_c || error "open $file failed"
+       pid=$!
+       # give multiop a chance to open
+       sleep 1
+
+       $LFS mv -M $MDTIDX $DIR1/$tdir &&
+               error "migrate open files should failed with open files"
+
+       kill -USR1 $pid
+
+       $LFS mv -M $MDTIDX $DIR1/$tdir ||
+                       error "migrate remote dir error"
+
+       echo "Finish migration, then checking.."
+       for file in $(find $DIR1/$tdir); do
+               mdt_index=$($LFS getstripe -M $file)
+               [ $mdt_index == $MDTIDX ] ||
+                       error "$file is not on MDT${MDTIDX}"
+       done
+
+       diff /etc/passwd $DIR1/$tdir/$tfile ||
+               error "file different after migration"
+
+       rm -rf $DIR1/$tdir || error "rm dir failed after migration"
+}
+run_test 80 "migrate directory when some children is being opened"
+
 log "cleanup: ======================================================"
 
 [ "$(mount | grep $MOUNT2)" ] && umount $MOUNT2
index a51ba82..5af9305 100644 (file)
@@ -120,6 +120,7 @@ static int lfs_hsm_release(int argc, char **argv);
 static int lfs_hsm_remove(int argc, char **argv);
 static int lfs_hsm_cancel(int argc, char **argv);
 static int lfs_swap_layouts(int argc, char **argv);
+static int lfs_mv(int argc, char **argv);
 
 #define SETSTRIPE_USAGE(_cmd, _tgt) \
        "usage: "_cmd" [--stripe-count|-c <stripe_count>]\n"\
@@ -324,13 +325,17 @@ command_t cmdlist[] = {
         "usage: hsm_cancel [--filelist FILELIST] [--data DATA] <file> ..."},
        {"swap_layouts", lfs_swap_layouts, 0, "Swap layouts between 2 files.\n"
         "usage: swap_layouts <path1> <path2>"},
-       {"migrate", lfs_setstripe, 0, "migrate file from one layout to "
+       {"migrate", lfs_setstripe, 0, "migrate file from one OST layout to "
         "another (may be not safe with concurent writes).\n"
         SETSTRIPE_USAGE("migrate  ", "<filename>")},
-        {"help", Parser_help, 0, "help"},
-        {"exit", Parser_quit, 0, "quit"},
-        {"quit", Parser_quit, 0, "quit"},
-        { 0, 0, 0, NULL }
+       {"mv", lfs_mv, 0,
+        "To move directories between MDTs.\n"
+        "usage: mv <directory|filename> [--mdt-index|-M] <mdt_index> "
+        "[--verbose|-v]\n"},
+       {"help", Parser_help, 0, "help"},
+       {"exit", Parser_quit, 0, "quit"},
+       {"quit", Parser_quit, 0, "quit"},
+       { 0, 0, 0, NULL }
 };
 
 #define MIGRATION_BLOCKS 1
@@ -1596,6 +1601,58 @@ static int lfs_rmentry(int argc, char **argv)
        return result;
 }
 
+static int lfs_mv(int argc, char **argv)
+{
+       struct  find_param param = { .maxdepth = -1, .mdtindex = -1};
+       char   *end;
+       int     c;
+       int     rc = 0;
+       struct option long_opts[] = {
+               {"--mdt-index", required_argument, 0, 'M'},
+               {"verbose",     no_argument,       0, 'v'},
+               {0, 0, 0, 0}
+       };
+
+       while ((c = getopt_long(argc, argv, "M:v", long_opts, NULL)) != -1) {
+               switch (c) {
+               case 'M': {
+                       param.mdtindex = strtoul(optarg, &end, 0);
+                       if (*end != '\0') {
+                               fprintf(stderr, "%s: invalid MDT index'%s'\n",
+                                       argv[0], optarg);
+                               return CMD_HELP;
+                       }
+                       break;
+               }
+               case 'v': {
+                       param.verbose = VERBOSE_DETAIL;
+                       break;
+               }
+               default:
+                       fprintf(stderr, "error: %s: unrecognized option '%s'\n",
+                               argv[0], argv[optind - 1]);
+                       return CMD_HELP;
+               }
+       }
+
+       if (param.mdtindex == -1) {
+               fprintf(stderr, "%s MDT index must be indicated\n", argv[0]);
+               return CMD_HELP;
+       }
+
+       if (optind >= argc) {
+               fprintf(stderr, "%s missing operand path\n", argv[0]);
+               return CMD_HELP;
+       }
+
+       param.migrate = 1;
+       rc = llapi_mv(argv[optind], &param);
+       if (rc != 0)
+               fprintf(stderr, "cannot migrate '%s' to MDT%04x: %s\n",
+                       argv[optind], param.mdtindex, strerror(-rc));
+       return rc;
+}
+
 static int lfs_osts(int argc, char **argv)
 {
         return lfs_tgts(argc, argv);
index 64d9595..ad1a993 100644 (file)
@@ -1423,7 +1423,7 @@ err:
         return rc;
 }
 
-typedef int (semantic_func_t)(char *path, DIR *parent, DIR *d,
+typedef int (semantic_func_t)(char *path, DIR *parent, DIR **d,
                              void *data, struct dirent64 *de);
 
 #define OBD_NOT_FOUND           (-1)
@@ -1456,7 +1456,8 @@ static int common_param_init(struct find_param *param, char *path)
        param->got_uuids = 0;
        param->obdindexes = NULL;
        param->obdindex = OBD_NOT_FOUND;
-       param->mdtindex = OBD_NOT_FOUND;
+       if (!param->migrate)
+               param->mdtindex = OBD_NOT_FOUND;
        return 0;
 }
 
@@ -1472,7 +1473,7 @@ static void find_param_fini(struct find_param *param)
                free(param->fp_lmv_md);
 }
 
-static int cb_common_fini(char *path, DIR *parent, DIR *d, void *data,
+static int cb_common_fini(char *path, DIR *parent, DIR **dirp, void *data,
                          struct dirent64 *de)
 {
         struct find_param *param = (struct find_param *)data;
@@ -1593,8 +1594,8 @@ static int llapi_semantic_traverse(char *path, int size, DIR *parent,
                }
         }
 
-        if (sem_init && (ret = sem_init(path, parent ?: p, d, data, de)))
-                goto err;
+       if (sem_init && (ret = sem_init(path, parent ?: p, &d, data, de)))
+               goto err;
 
        if (!d || (param->get_lmv && !param->recursive)) {
                ret = 0;
@@ -1661,8 +1662,8 @@ static int llapi_semantic_traverse(char *path, int size, DIR *parent,
 out:
         path[len] = 0;
 
-        if (sem_fini)
-                sem_fini(path, parent, d, data, de);
+       if (sem_fini)
+               sem_fini(path, parent, &d, data, de);
 err:
         if (d)
                 closedir(d);
@@ -2784,10 +2785,11 @@ static int print_failed_tgt(struct find_param *param, char *path, int type)
         return ret;
 }
 
-static int cb_find_init(char *path, DIR *parent, DIR *dir,
+static int cb_find_init(char *path, DIR *parent, DIR **dirp,
                        void *data, struct dirent64 *de)
 {
         struct find_param *param = (struct find_param *)data;
+       DIR *dir = dirp == NULL ? NULL : *dirp;
         int decision = 1; /* 1 is accepted; -1 is rejected. */
         lstat_t *st = &param->lmd->lmd_st;
         int lustre_fs = 1;
@@ -3101,6 +3103,82 @@ decided:
         return 0;
 }
 
+static int cb_mv_init(char *path, DIR *parent, DIR **dirp,
+                     void *param_data, struct dirent64 *de)
+{
+       struct find_param       *param = (struct find_param *)param_data;
+       DIR                     *dir = parent;
+       char                    raw[OBD_MAX_IOCTL_BUFFER] = {'\0'};
+       char                    *rawbuf = raw;
+       struct obd_ioctl_data   data = { 0 };
+       int                     fd;
+       int                     ret;
+       char                    *filename;
+
+       LASSERT(parent != NULL || dirp != NULL);
+       if (dirp != NULL)
+               closedir(*dirp);
+
+       if (parent == NULL) {
+               dir = opendir_parent(path);
+               if (dir == NULL) {
+                       ret = -errno;
+                       fprintf(stderr, "can not open %s ret %d\n",
+                               path, ret);
+                       return ret;
+               }
+       }
+
+       fd = dirfd(dir);
+
+       filename = basename(path);
+       data.ioc_inlbuf1 = (char *)filename;
+       data.ioc_inllen1 = strlen(filename) + 1;
+       data.ioc_inlbuf2 = (char *)&param->mdtindex;
+       data.ioc_inllen2 = sizeof(param->mdtindex);
+       ret = obd_ioctl_pack(&data, &rawbuf, sizeof(raw));
+       if (ret != 0) {
+               llapi_error(LLAPI_MSG_ERROR, ret,
+                           "llapi_obd_statfs: error packing ioctl data");
+               goto out;
+       }
+
+       ret = ioctl(fd, LL_IOC_MIGRATE, rawbuf);
+       if (ret != 0) {
+               ret = -errno;
+               fprintf(stderr, "%s migrate failed %d\n", path, ret);
+               goto out;
+       } else if (param->verbose & VERBOSE_DETAIL) {
+               fprintf(stdout, "migrate %s to MDT%d\n", path, param->mdtindex);
+       }
+
+out:
+       if (d