Whamcloud - gitweb
LU-3529 lod: create striped directory 96/7196/45
authorwang di <di.wang@intel.com>
Wed, 31 Jul 2013 07:00:40 +0000 (00:00 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 22 Feb 2014 06:30:42 +0000 (06:30 +0000)
1. Add "lfs setdirstripe -i -c" to create striped
directory.

2. client send create request to the master MDT, which
will allocate FIDs and create slaves. for all of slaves.

3. Client needs to revalidate slaves during intent getattr
and open request.

4. lmv_stripe_md will include attributes(size, nlink etc)
from all of stripe, which will be protected by UPDATE lock.
client needs to merge these attributes when update inode.

5. send create request to the MDT where the file is located,
which can help creating master stripe of striped directory.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I7ac560e39dcb415e310dc5e6ade531d76227ffae
Reviewed-on: http://review.whamcloud.com/7196
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
55 files changed:
lustre/include/cl_object.h
lustre/include/dt_object.h
lustre/include/lu_target.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre/lustre_user.h
lustre/include/lustre/lustreapi.h
lustre/include/lustre_lib.h
lustre/include/lustre_lmv.h
lustre/include/lustre_update.h
lustre/include/md_object.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/llite_nfs.c
lustre/llite/namei.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_internal.h
lustre/lmv/lmv_obd.c
lustre/lod/lod_dev.c
lustre/lod/lod_internal.h
lustre/lod/lod_lov.c
lustre/lod/lod_object.c
lustre/lod/lod_qos.c
lustre/mdc/mdc_locks.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdd/mdd_trans.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/obdclass/lprocfs_status.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-zfs/osd_handler.c
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_precreate.c
lustre/osp/osp_sync.c
lustre/osp/osp_trans.c
lustre/ptlrpc/pack_generic.c
lustre/target/out_handler.c
lustre/target/out_lib.c
lustre/tests/recovery-small.sh
lustre/tests/replay-dual.sh
lustre/tests/replay-single.sh
lustre/tests/sanity.sh
lustre/tests/sanityn.sh
lustre/utils/lfs.c
lustre/utils/liblustreapi.c

index 7dfb8d9..bcedc91 100644 (file)
@@ -193,6 +193,9 @@ struct cl_attr {
          * Group identifier for quota purposes.
          */
         gid_t  cat_gid;
+
+       /* nlink of the directory */
+       __u64  cat_nlink;
 };
 
 /**
index 6a09b34..2b21697 100644 (file)
@@ -49,7 +49,7 @@
  * @{
  */
 
-
+#include <obd_support.h>
 /*
  * super-class definitions.
  */
@@ -130,11 +130,11 @@ struct dt_device_operations {
          */
         int   (*dt_trans_start)(const struct lu_env *env,
                                 struct dt_device *dev, struct thandle *th);
-        /**
-         * Finish previously started transaction.
-         */
-        int   (*dt_trans_stop)(const struct lu_env *env,
-                               struct thandle *th);
+       /**
+        * Finish previously started transaction.
+        */
+       int   (*dt_trans_stop)(const struct lu_env *env, struct dt_device *dev,
+                              struct thandle *th);
         /**
          * Add commit callback to the transaction.
          */
@@ -229,8 +229,10 @@ extern const struct dt_index_features dt_quota_slv_features;
  * It can contain any allocation hint in the future.
  */
 struct dt_allocation_hint {
-        struct dt_object           *dah_parent;
-        __u32                       dah_mode;
+       struct dt_object        *dah_parent;
+       const void              *dah_eadata;
+       int                     dah_eadata_len;
+       __u32                   dah_mode;
 };
 
 /**
@@ -742,6 +744,19 @@ static inline struct dt_object *lu2dt_obj(struct lu_object *o)
        return container_of0(o, struct dt_object, do_lu);
 }
 
+struct thandle_update {
+       /* In DNE, one transaction can be disassembled into
+        * updates on several different MDTs, and these updates
+        * will be attached to tu_remote_update_list per target.
+        * Only single thread will access the list, no need lock
+        */
+       struct list_head        tu_remote_update_list;
+
+       /* sent after or before local transaction */
+       unsigned int            tu_sent_after_local_trans:1,
+                               tu_only_remote_trans:1;
+};
+
 /**
  * This is the general purpose transaction handle.
  * 1. Transaction Life Cycle
@@ -760,6 +775,10 @@ struct thandle {
        /** the dt device on which the transactions are executed */
        struct dt_device *th_dev;
 
+       atomic_t        th_refc;
+       /* the size of transaction */
+       int             th_alloc_size;
+
        /** context for this transaction, tag is LCT_TX_HANDLE */
        struct lu_context th_ctx;
 
@@ -776,15 +795,22 @@ struct thandle {
        /* local transation, no need to inform other layers */
        unsigned int            th_local:1;
 
-       /* In DNE, one transaction can be disassemblied into
-        * updates on several different MDTs, and these updates
-        * will be attached to th_remote_update_list per target.
-        * Only single thread will access the list, no need lock
-        */
-       cfs_list_t              th_remote_update_list;
-       struct update_request   *th_current_request;
+       struct thandle_update   *th_update;
 };
 
+static inline void thandle_get(struct thandle *thandle)
+{
+       atomic_inc(&thandle->th_refc);
+}
+
+static inline void thandle_put(struct thandle *thandle)
+{
+       if (atomic_dec_and_test(&thandle->th_refc)) {
+               if (thandle->th_update != NULL)
+                       OBD_FREE_PTR(thandle->th_update);
+               OBD_FREE(thandle, thandle->th_alloc_size);
+       }
+}
 /**
  * Transaction call-backs.
  *
@@ -978,8 +1004,8 @@ static inline int dt_trans_start_local(const struct lu_env *env,
 static inline int dt_trans_stop(const struct lu_env *env,
                                 struct dt_device *d, struct thandle *th)
 {
-        LASSERT(d->dd_ops->dt_trans_stop);
-        return d->dd_ops->dt_trans_stop(env, th);
+       LASSERT(d->dd_ops->dt_trans_stop);
+       return d->dd_ops->dt_trans_stop(env, d, th);
 }
 
 static inline int dt_trans_cb_add(struct thandle *th,
index a8d6984..f2ab070 100644 (file)
@@ -339,7 +339,7 @@ int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
 
 /* target/out_lib.c */
 struct update_request *
-out_find_update(struct thandle *th, struct dt_device *dt_dev);
+out_find_update(struct thandle_update *tu, struct dt_device *dt_dev);
 void out_destroy_update_req(struct update_request *update);
 struct update_request *out_create_update_req(struct dt_device *dt);
 struct update_request *out_find_create_update_loc(struct thandle *th,
index 4bb3e7c..0bee2f3 100644 (file)
@@ -1688,6 +1688,7 @@ static inline void lmm_oi_cpu_to_le(struct ost_id *dst_oi,
 #define XATTR_NAME_LOV          "trusted.lov"
 #define XATTR_NAME_LMA          "trusted.lma"
 #define XATTR_NAME_LMV          "trusted.lmv"
+#define XATTR_NAME_DEFALT_LMV  "trusted.dmv"
 #define XATTR_NAME_LINK         "trusted.link"
 #define XATTR_NAME_FID          "trusted.fid"
 #define XATTR_NAME_VERSION      "trusted.version"
@@ -2663,7 +2664,7 @@ struct lmv_desc {
         __u32 ld_tgt_count;                /* how many MDS's */
         __u32 ld_active_tgt_count;         /* how many active */
         __u32 ld_default_stripe_count;     /* how many objects are used */
-        __u32 ld_pattern;                  /* default MEA_MAGIC_* */
+       __u32 ld_pattern;                  /* default hash pattern */
         __u64 ld_default_hash_size;
         __u64 ld_padding_1;                /* also fix lustre_swab_lmv_desc */
         __u32 ld_padding_2;                /* also fix lustre_swab_lmv_desc */
@@ -2679,6 +2680,43 @@ extern void lustre_swab_lmv_desc (struct lmv_desc *ld);
 #define LMV_MAGIC_V1   0x0CD10CD0    /* normal stripe lmv magic */
 #define LMV_USER_MAGIC 0x0CD20CD0    /* default lmv magic*/
 #define LMV_MAGIC      LMV_MAGIC_V1
+
+enum lmv_hash_type {
+       LMV_HASH_TYPE_ALL_CHARS = 1,
+       LMV_HASH_TYPE_FNV_1A_64 = 2,
+};
+
+#define LMV_HASH_NAME_ALL_CHARS        "all_char"
+#define LMV_HASH_NAME_FNV_1A_64        "fnv_1a_64"
+
+/**
+ * The FNV-1a hash algorithm is as follows:
+ *     hash = FNV_offset_basis
+ *     for each octet_of_data to be hashed
+ *             hash = hash XOR octet_of_data
+ *             hash = hash × FNV_prime
+ *     return hash
+ * http://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash
+ *
+ * http://www.isthe.com/chongo/tech/comp/fnv/index.html#FNV-reference-source
+ * FNV_prime is 2^40 + 2^8 + 0xb3 = 0x100000001b3ULL
+ **/
+#define LUSTRE_FNV_1A_64_PRIME 0x100000001b3ULL
+#define LUSTRE_FNV_1A_64_OFFSET_BIAS 0xcbf29ce484222325ULL
+static inline __u64 lustre_hash_fnv_1a_64(const void *buf, size_t size)
+{
+       __u64 hash = LUSTRE_FNV_1A_64_OFFSET_BIAS;
+       const unsigned char *p = buf;
+       size_t i;
+
+       for (i = 0; i < size; i++) {
+               hash ^= p[i];
+               hash *= LUSTRE_FNV_1A_64_PRIME;
+       }
+
+       return hash;
+}
+
 struct lmv_mds_md_v1 {
        __u32 lmv_magic;
        __u32 lmv_stripe_count;         /* stripe count */
index e9b205b..3a02c4e 100644 (file)
@@ -401,19 +401,15 @@ struct lov_user_mds_data_v3 {
 } __attribute__((packed));
 #endif
 
-/* keep this to be the same size as lov_user_ost_data_v1 */
 struct lmv_user_mds_data {
        struct lu_fid   lum_fid;
        __u32           lum_padding;
        __u32           lum_mds;
 };
 
-/* lum_type */
-enum {
-       LMV_STRIPE_TYPE = 0,
-       LMV_DEFAULT_TYPE = 1,
-};
-
+/* Got this according to how get LOV_MAX_STRIPE_COUNT, see above,
+ * (max buffer size - lmv+rpc header) / sizeof(struct lmv_user_mds_data) */
+#define LMV_MAX_STRIPE_COUNT 2000  /* ((12 * 4096 - 256) / 24) */
 #define lmv_user_md lmv_user_md_v1
 struct lmv_user_md_v1 {
        __u32   lum_magic;       /* must be the first field */
@@ -426,7 +422,7 @@ struct lmv_user_md_v1 {
        __u32   lum_padding3;
        char    lum_pool_name[LOV_MAXPOOLNAME];
        struct  lmv_user_mds_data  lum_objects[0];
-};
+} __attribute__((packed));
 
 static inline int lmv_user_md_size(int stripes, int lmm_magic)
 {
index f4e92c2..9426014 100644 (file)
@@ -216,7 +216,7 @@ extern int llapi_find(char *path, struct find_param *param);
 extern int llapi_file_fget_mdtidx(int fd, int *mdtidx);
 extern int llapi_dir_create_pool(const char *name, int flags, int stripe_offset,
                                 int stripe_count, int stripe_pattern,
-                                char *poolname);
+                                const char *poolname);
 int llapi_direntry_remove(char *dname);
 extern int llapi_obd_statfs(char *path, __u32 type, __u32 index,
                      struct obd_statfs *stat_buf,
index 5a86378..2ea9e15 100644 (file)
@@ -574,6 +574,8 @@ static inline void obd_ioctl_freedata(char *buf, int len)
 #define LOVEA_DELETE_VALUES(size, count, offset) (size == 0 && count == 0 && \
                                                  offset == (typeof(offset))(-1))
 
+#define LMVEA_DELETE_VALUES(count, offset) ((count) == 0 && \
+                                           (offset) == (typeof(offset))(-1))
 /* #define POISON_BULK 0 */
 
 /*
index a85c6d7..ce0bfa6 100644 (file)
@@ -60,4 +60,60 @@ int lmv_alloc_md(union lmv_mds_md **lmmp, int stripe_count);
 void lmv_free_md(union lmv_mds_md *lmm);
 int lmv_alloc_memmd(struct lmv_stripe_md **lsmp, int stripe_count);
 void lmv_free_memmd(struct lmv_stripe_md *lsm);
+
+static inline void lmv1_cpu_to_le(struct lmv_mds_md_v1 *lmv_dst,
+                                 const struct lmv_mds_md_v1 *lmv_src)
+{
+       int i;
+
+       lmv_dst->lmv_magic = cpu_to_le32(lmv_src->lmv_magic);
+       lmv_dst->lmv_stripe_count = cpu_to_le32(lmv_src->lmv_stripe_count);
+       lmv_dst->lmv_master_mdt_index =
+                       cpu_to_le32(lmv_src->lmv_master_mdt_index);
+       lmv_dst->lmv_hash_type = cpu_to_le32(lmv_src->lmv_hash_type);
+       lmv_dst->lmv_layout_version = cpu_to_le32(lmv_src->lmv_layout_version);
+       for (i = 0; i < lmv_src->lmv_stripe_count; i++)
+               fid_cpu_to_le(&lmv_dst->lmv_stripe_fids[i],
+                             &lmv_src->lmv_stripe_fids[i]);
+}
+
+static inline void lmv1_le_to_cpu(struct lmv_mds_md_v1 *lmv_dst,
+                                 const struct lmv_mds_md_v1 *lmv_src)
+{
+       int i;
+
+       lmv_dst->lmv_magic = le32_to_cpu(lmv_src->lmv_magic);
+       lmv_dst->lmv_stripe_count = le32_to_cpu(lmv_src->lmv_stripe_count);
+       lmv_dst->lmv_master_mdt_index =
+                               le32_to_cpu(lmv_src->lmv_master_mdt_index);
+       lmv_dst->lmv_hash_type = le32_to_cpu(lmv_src->lmv_hash_type);
+       lmv_dst->lmv_layout_version = le32_to_cpu(lmv_src->lmv_layout_version);
+       for (i = 0; i < lmv_src->lmv_stripe_count; i++)
+               fid_le_to_cpu(&lmv_dst->lmv_stripe_fids[i],
+                             &lmv_src->lmv_stripe_fids[i]);
+}
+
+static inline void lmv_cpu_to_le(union lmv_mds_md *lmv_dst,
+                                const union lmv_mds_md *lmv_src)
+{
+       switch (lmv_src->lmv_magic) {
+       case LMV_MAGIC_V1:
+               lmv1_cpu_to_le(&lmv_dst->lmv_md_v1, &lmv_src->lmv_md_v1);
+               break;
+       default:
+               break;
+       }
+}
+
+static inline void lmv_le_to_cpu(union lmv_mds_md *lmv_dst,
+                                const union lmv_mds_md *lmv_src)
+{
+       switch (le32_to_cpu(lmv_src->lmv_magic)) {
+       case LMV_MAGIC_V1:
+               lmv1_le_to_cpu(&lmv_dst->lmv_md_v1, &lmv_src->lmv_md_v1);
+               break;
+       default:
+               break;
+       }
+}
 #endif
index e23c66e..f507bbc 100644 (file)
@@ -63,11 +63,13 @@ static inline void *update_param_buf(struct update *update, int index,
        if (index >= UPDATE_BUF_COUNT)
                return NULL;
 
-       ptr = (char *)update + cfs_size_round(offsetof(struct update,
-                                                      u_bufs[0]));
-       for (i = 0; i < index; i++) {
-               LASSERT(update->u_lens[i] > 0);
-               ptr += cfs_size_round(update->u_lens[i]);
+       if (unlikely(update->u_lens[index] == 0)) {
+               ptr = NULL;
+       } else {
+               ptr = (char *)update +
+                     cfs_size_round(offsetof(struct update, u_bufs[0]));
+               for (i = 0; i < index; i++)
+                       ptr += cfs_size_round(update->u_lens[i]);
        }
 
        if (size != NULL)
index 5f3f8f6..5839517 100644 (file)
@@ -136,7 +136,7 @@ struct md_attr {
         struct lu_fid           ma_pfid;
         struct md_hsm           ma_hsm;
         struct lov_mds_md      *ma_lmm;
-        struct lmv_stripe_md   *ma_lmv;
+       union lmv_mds_md       *ma_lmv;
         void                   *ma_acl;
         struct llog_cookie     *ma_cookie;
         struct lustre_capa     *ma_capa;
index c18052b..ac80412 100644 (file)
@@ -1201,14 +1201,6 @@ enum {
 };
 
 /* lmv structures */
-#define MEA_MAGIC_LAST_CHAR      0xb2221ca1
-#define MEA_MAGIC_ALL_CHARS      0xb222a11c
-#define MEA_MAGIC_HASH_SEGMENT   0xb222a11b
-
-#define MAX_HASH_SIZE_32         0x7fffffffUL
-#define MAX_HASH_SIZE            0x7fffffffffffffffULL
-#define MAX_HASH_HIGHEST_BIT     0x1000000000000000ULL
-
 struct lustre_md {
        struct mdt_body         *body;
        struct lov_stripe_md    *lsm;
@@ -1231,6 +1223,7 @@ struct md_open_data {
 };
 
 struct lookup_intent;
+struct cl_attr;
 
 struct md_ops {
        /* Every operation from MD_STATS_FIRST_OP up to and including
@@ -1333,6 +1326,13 @@ struct md_ops {
 
        int (*m_free_lustre_md)(struct obd_export *, struct lustre_md *);
 
+       int (*m_merge_attr)(struct obd_export *,
+                           const struct lmv_stripe_md *lsm,
+                           struct cl_attr *attr);
+
+       int (*m_update_lsm_md)(struct obd_export *, struct lmv_stripe_md *lsm,
+                              struct mdt_body *, ldlm_blocking_callback);
+
        int (*m_set_open_replay_data)(struct obd_export *,
                                      struct obd_client_handle *,
                                      struct lookup_intent *);
index da400c2..7a004ea 100644 (file)
@@ -2062,6 +2062,27 @@ static inline int md_free_lustre_md(struct obd_export *exp,
         RETURN(MDP(exp->exp_obd, free_lustre_md)(exp, md));
 }
 
+static inline int md_update_lsm_md(struct obd_export *exp,
+                                  struct lmv_stripe_md *lsm,
+                                  struct mdt_body *body,
+                                  ldlm_blocking_callback cb)
+{
+       ENTRY;
+       EXP_CHECK_MD_OP(exp, update_lsm_md);
+       EXP_MD_COUNTER_INCREMENT(exp, update_lsm_md);
+       RETURN(MDP(exp->exp_obd, update_lsm_md)(exp, lsm, body, cb));
+}
+
+static inline int md_merge_attr(struct obd_export *exp,
+                               const struct lmv_stripe_md *lsm,
+                               struct cl_attr *attr)
+{
+       ENTRY;
+       EXP_CHECK_MD_OP(exp, merge_attr);
+       EXP_MD_COUNTER_INCREMENT(exp, merge_attr);
+       RETURN(MDP(exp->exp_obd, merge_attr)(exp, lsm, attr));
+}
+
 static inline int md_setxattr(struct obd_export *exp,
                               const struct lu_fid *fid, struct obd_capa *oc,
                               obd_valid valid, const char *name,
@@ -2293,10 +2314,6 @@ int class_check_uuid(struct obd_uuid *uuid, __u64 nid);
 void class_init_uuidlist(void);
 void class_exit_uuidlist(void);
 
-/* mea.c */
-int mea_name2idx(struct lmv_stripe_md *mea, const char *name, int namelen);
-int raw_name2idx(int hashtype, int count, const char *name, int namelen);
-
 /* prng.c */
 #define ll_generate_random_uuid(uuid_out) cfs_get_random_bytes(uuid_out, sizeof(class_uuid_t))
 
index dc5d623..b97edda 100644 (file)
@@ -361,17 +361,37 @@ int ll_send_mgc_param(struct obd_export *mgc, char *string)
         return rc;
 }
 
-int ll_dir_setdirstripe(struct inode *dir, struct lmv_user_md *lump,
-                       char *filename)
+static int ll_dir_setdirstripe(struct inode *dir, struct lmv_user_md *lump,
+                              const char *filename)
 {
        struct ptlrpc_request *request = NULL;
        struct md_op_data *op_data;
        struct ll_sb_info *sbi = ll_i2sbi(dir);
        int mode;
        int err;
-
        ENTRY;
 
+       if (unlikely(lump->lum_magic != LMV_USER_MAGIC))
+               RETURN(-EINVAL);
+
+       if (lump->lum_stripe_offset == (__u32)-1) {
+               int mdtidx;
+
+               mdtidx = ll_get_mdt_idx(dir);
+               if (mdtidx < 0)
+                       RETURN(mdtidx);
+
+               lump->lum_stripe_offset = mdtidx;
+       }
+
+       CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p) name %s"
+              "stripe_offset %d, stripe_count: %u\n",
+              PFID(ll_inode2fid(dir)), dir, filename,
+              (int)lump->lum_stripe_offset, lump->lum_stripe_count);
+
+       if (lump->lum_magic != cpu_to_le32(LMV_USER_MAGIC))
+               lustre_swab_lmv_user_md(lump);
+
        mode = (0755 & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR;
        op_data = ll_prep_md_op_data(NULL, dir, NULL, filename,
                                     strlen(filename), mode, LUSTRE_OPC_MKDIR,
@@ -440,9 +460,6 @@ int ll_dir_setstripe(struct inode *inode, struct lov_user_md *lump,
         if (IS_ERR(op_data))
                 RETURN(PTR_ERR(op_data));
 
-       if (lump != NULL && lump->lmm_magic == cpu_to_le32(LMV_USER_MAGIC))
-               op_data->op_cli_flags |= CLI_SET_MEA;
-
         /* swabbing is done in lov_setstripe() on server side */
         rc = md_setattr(sbi->ll_md_exp, op_data, lump, lum_size,
                         NULL, 0, &req, NULL);
@@ -1118,7 +1135,6 @@ lmv_out_free:
                        GOTO(free_lmv, rc = -ENOMEM);
 
                memcpy(tmp, &lum, sizeof(lum));
-               tmp->lum_type = LMV_STRIPE_TYPE;
                tmp->lum_stripe_count = 1;
                mdtindex = ll_get_mdt_idx(inode);
                if (mdtindex < 0)
index a9f1314..109d261 100644 (file)
@@ -3188,6 +3188,27 @@ out:
         return rc;
 }
 
+static int ll_merge_md_attr(struct inode *inode)
+{
+       struct cl_attr attr = { 0 };
+       int rc;
+
+       LASSERT(ll_i2info(inode)->lli_lsm_md != NULL);
+       rc = md_merge_attr(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
+                          &attr);
+       if (rc != 0)
+               RETURN(rc);
+
+       ll_i2info(inode)->lli_stripe_dir_size = attr.cat_size;
+       ll_i2info(inode)->lli_stripe_dir_nlink = attr.cat_nlink;
+
+       ll_i2info(inode)->lli_lvb.lvb_atime = attr.cat_atime;
+       ll_i2info(inode)->lli_lvb.lvb_mtime = attr.cat_mtime;
+       ll_i2info(inode)->lli_lvb.lvb_ctime = attr.cat_ctime;
+
+       RETURN(0);
+}
+
 int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
                           __u64 ibits)
 {
@@ -3201,6 +3222,13 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
 
        /* if object isn't regular file, don't validate size */
        if (!S_ISREG(inode->i_mode)) {
+               if (S_ISDIR(inode->i_mode) &&
+                   ll_i2info(inode)->lli_lsm_md != NULL) {
+                       rc = ll_merge_md_attr(inode);
+                       if (rc != 0)
+                               RETURN(rc);
+               }
+
                LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
                LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
                LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
@@ -3233,23 +3261,29 @@ int ll_getattr_it(struct vfsmount *mnt, struct dentry *de,
         if (res)
                 return res;
 
-        stat->dev = inode->i_sb->s_dev;
-        if (ll_need_32bit_api(sbi))
-                stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
-        else
-                stat->ino = inode->i_ino;
-        stat->mode = inode->i_mode;
-        stat->nlink = inode->i_nlink;
-        stat->uid = inode->i_uid;
-        stat->gid = inode->i_gid;
+       stat->dev = inode->i_sb->s_dev;
+       if (ll_need_32bit_api(sbi))
+               stat->ino = cl_fid_build_ino(&lli->lli_fid, 1);
+       else
+               stat->ino = inode->i_ino;
+       stat->mode = inode->i_mode;
+       stat->uid = inode->i_uid;
+       stat->gid = inode->i_gid;
        stat->rdev = inode->i_rdev;
-        stat->atime = inode->i_atime;
-        stat->mtime = inode->i_mtime;
-        stat->ctime = inode->i_ctime;
+       stat->atime = inode->i_atime;
+       stat->mtime = inode->i_mtime;
+       stat->ctime = inode->i_ctime;
        stat->blksize = 1 << inode->i_blkbits;
+       stat->blocks = inode->i_blocks;
 
-        stat->size = i_size_read(inode);
-        stat->blocks = inode->i_blocks;
+       if (S_ISDIR(inode->i_mode) &&
+               ll_i2info(inode)->lli_lsm_md != NULL) {
+               stat->nlink = lli->lli_stripe_dir_nlink;
+               stat->size = lli->lli_stripe_dir_size;
+       } else {
+               stat->nlink = inode->i_nlink;
+               stat->size = i_size_read(inode);
+       }
 
         return 0;
 }
index 92b278e..c4e5f39 100644 (file)
@@ -44,6 +44,7 @@
 /* for struct cl_lock_descr and struct cl_io */
 #include <cl_object.h>
 #include <lclient.h>
+#include <lustre_lmv.h>
 #include <lustre_mdc.h>
 #include <linux/lustre_intent.h>
 #include <linux/compat.h>
@@ -199,7 +200,11 @@ struct ll_inode_info {
                         * -- I am the owner of dir statahead. */
                        pid_t                           d_opendir_pid;
                        /* directory stripe information */
-                       struct lmv_stripe_md            *d_lmv_md;
+                       struct lmv_stripe_md            *d_lsm_md;
+                       /* striped directory size */
+                       loff_t                          d_stripe_size;
+                       /* striped directory nlink */
+                       __u64                           d_stripe_nlink;
                } d;
 
 #define lli_readdir_mutex       u.d.d_readdir_mutex
@@ -208,7 +213,9 @@ struct ll_inode_info {
 #define lli_def_acl             u.d.d_def_acl
 #define lli_sa_lock             u.d.d_sa_lock
 #define lli_opendir_pid         u.d.d_opendir_pid
-#define lli_lmv_md             u.d.d_lmv_md
+#define lli_lsm_md             u.d.d_lsm_md
+#define lli_stripe_dir_size    u.d.d_stripe_size
+#define lli_stripe_dir_nlink   u.d.d_stripe_nlink
 
                /* for non-directory */
                struct {
@@ -736,6 +743,7 @@ int ll_objects_destroy(struct ptlrpc_request *request,
                        struct inode *dir);
 struct inode *ll_iget(struct super_block *sb, ino_t hash,
                       struct lustre_md *lic);
+int ll_test_inode_by_fid(struct inode *inode, void *opaque);
 int ll_md_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
                        void *data, int flag);
 #ifndef HAVE_IOP_ATOMIC_OPEN
index df421d6..49eaea8 100644 (file)
@@ -1214,6 +1214,185 @@ struct inode *ll_inode_from_lock(struct ldlm_lock *lock)
         return inode;
 }
 
+static void ll_dir_clear_lsm_md(struct inode *inode)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+
+       LASSERT(S_ISDIR(inode->i_mode));
+
+       if (lli->lli_lsm_md != NULL) {
+               lmv_free_memmd(lli->lli_lsm_md);
+               lli->lli_lsm_md = NULL;
+       }
+}
+
+static struct inode *ll_iget_anon_dir(struct super_block *sb,
+                                     const struct lu_fid *fid,
+                                     struct lustre_md *md)
+{
+       struct ll_sb_info       *sbi = ll_s2sbi(sb);
+       struct mdt_body         *body = md->body;
+       struct inode            *inode;
+       ino_t                   ino;
+       ENTRY;
+
+       ino = cl_fid_build_ino(fid, sbi->ll_flags & LL_SBI_32BIT_API);
+       inode = iget_locked(sb, ino);
+       if (inode == NULL) {
+               CERROR("%s: failed get simple inode "DFID": rc = -ENOENT\n",
+                      ll_get_fsname(sb, NULL, 0), PFID(fid));
+               RETURN(ERR_PTR(-ENOENT));
+       }
+
+       if (inode->i_state & I_NEW) {
+               struct ll_inode_info *lli = ll_i2info(inode);
+               struct lmv_stripe_md *lsm = md->lmv;
+
+               inode->i_mode = (inode->i_mode & ~S_IFMT) |
+                               (body->mode & S_IFMT);
+               LASSERTF(S_ISDIR(inode->i_mode), "Not slave inode "DFID"\n",
+                        PFID(fid));
+
+               LTIME_S(inode->i_mtime) = 0;
+               LTIME_S(inode->i_atime) = 0;
+               LTIME_S(inode->i_ctime) = 0;
+               inode->i_rdev = 0;
+
+               /* initializing backing dev info. */
+               inode->i_mapping->backing_dev_info =
+                                               &s2lsi(inode->i_sb)->lsi_bdi;
+               inode->i_op = &ll_dir_inode_operations;
+               inode->i_fop = &ll_dir_operations;
+               lli->lli_fid = *fid;
+               ll_lli_init(lli);
+
+               LASSERT(lsm != NULL);
+               /* master stripe FID */
+               lli->lli_pfid = lsm->lsm_md_oinfo[0].lmo_fid;
+               CDEBUG(D_INODE, "lli %p master "DFID" slave "DFID"\n",
+                      lli, PFID(fid), PFID(&lli->lli_pfid));
+               unlock_new_inode(inode);
+       }
+
+       RETURN(inode);
+}
+
+static int ll_init_lsm_md(struct inode *inode, struct lustre_md *md)
+{
+       struct lu_fid *fid;
+       struct lmv_stripe_md *lsm = md->lmv;
+       int i;
+
+       LASSERT(lsm != NULL);
+       /* XXX sigh, this lsm_root initialization should be in
+        * LMV layer, but it needs ll_iget right now, so we
+        * put this here right now. */
+       for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
+               fid = &lsm->lsm_md_oinfo[i].lmo_fid;
+               LASSERT(lsm->lsm_md_oinfo[i].lmo_root == NULL);
+               if (i == 0) {
+                       lsm->lsm_md_oinfo[i].lmo_root = inode;
+               } else {
+                       /* Unfortunately ll_iget will call ll_update_inode,
+                        * where the initialization of slave inode is slightly
+                        * different, so it reset lsm_md to NULL to avoid
+                        * initializing lsm for slave inode. */
+                       lsm->lsm_md_oinfo[i].lmo_root =
+                                       ll_iget_anon_dir(inode->i_sb, fid, md);
+                       if (IS_ERR(lsm->lsm_md_oinfo[i].lmo_root)) {
+                               int rc = PTR_ERR(lsm->lsm_md_oinfo[i].lmo_root);
+
+                               lsm->lsm_md_oinfo[i].lmo_root = NULL;
+                               return rc;
+                       }
+               }
+       }
+
+       /* Here is where the lsm is being initialized(fill lmo_info) after
+        * client retrieve MD stripe information from MDT. */
+       return md_update_lsm_md(ll_i2mdexp(inode), lsm, md->body,
+                               ll_md_blocking_ast);
+}
+
+static inline int lli_lsm_md_eq(const struct lmv_stripe_md *lsm_md1,
+                               const struct lmv_stripe_md *lsm_md2)
+{
+       return lsm_md1->lsm_md_magic == lsm_md2->lsm_md_magic &&
+              lsm_md1->lsm_md_stripe_count == lsm_md2->lsm_md_stripe_count &&
+              lsm_md1->lsm_md_master_mdt_index ==
+                                       lsm_md2->lsm_md_master_mdt_index &&
+              lsm_md1->lsm_md_hash_type == lsm_md2->lsm_md_hash_type &&
+              lsm_md1->lsm_md_layout_version ==
+                                       lsm_md2->lsm_md_layout_version &&
+              strcmp(lsm_md1->lsm_md_pool_name,
+                     lsm_md2->lsm_md_pool_name) == 0;
+}
+
+static void ll_update_lsm_md(struct inode *inode, struct lustre_md *md)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct lmv_stripe_md *lsm = md->lmv;
+       int idx;
+
+       LASSERT(lsm != NULL);
+       LASSERT(S_ISDIR(inode->i_mode));
+       if (lli->lli_lsm_md == NULL) {
+               int rc;
+
+               rc = ll_init_lsm_md(inode, md);
+               if (rc != 0) {
+                       CERROR("%s: init "DFID" failed: rc = %d\n",
+                              ll_get_fsname(inode->i_sb, NULL, 0),
+                              PFID(&lli->lli_fid), rc);
+                       return;
+               }
+               lli->lli_lsm_md = lsm;
+               /* set lsm_md to NULL, so the following free lustre_md
+                * will not free this lsm */
+               md->lmv = NULL;
+               return;
+       }
+
+       /* Compare the old and new stripe information */
+       if (!lli_lsm_md_eq(lli->lli_lsm_md, lsm)) {
+               CERROR("inode %p %lu mismatch\n"
+                      "    new(%p)     vs     lli_lsm_md(%p):\n"
+                      "    magic:      %x                   %x\n"
+                      "    count:      %x                   %x\n"
+                      "    master:     %x                   %x\n"
+                      "    hash_type:  %x                   %x\n"
+                      "    layout:     %x                   %x\n"
+                      "    pool:       %s                   %s\n",
+                      inode, inode->i_ino, lsm, lli->lli_lsm_md,
+                      lsm->lsm_md_magic, lli->lli_lsm_md->lsm_md_magic,
+                      lsm->lsm_md_stripe_count,
+                      lli->lli_lsm_md->lsm_md_stripe_count,
+                      lsm->lsm_md_master_mdt_index,
+                      lli->lli_lsm_md->lsm_md_master_mdt_index,
+                      lsm->lsm_md_hash_type, lli->lli_lsm_md->lsm_md_hash_type,
+                      lsm->lsm_md_layout_version,
+                      lli->lli_lsm_md->lsm_md_layout_version,
+                      lsm->lsm_md_pool_name,
+                      lli->lli_lsm_md->lsm_md_pool_name);
+               return;
+       }
+
+       for (idx = 0; idx < lli->lli_lsm_md->lsm_md_stripe_count; idx++) {
+               if (!lu_fid_eq(&lli->lli_lsm_md->lsm_md_oinfo[idx].lmo_fid,
+                              &lsm->lsm_md_oinfo[idx].lmo_fid)) {
+                       CERROR("%s: FID in lsm mismatch idx %d, old: "DFID
+                              "new:"DFID"\n",
+                              ll_get_fsname(inode->i_sb, NULL, 0), idx,
+                            PFID(&lli->lli_lsm_md->lsm_md_oinfo[idx].lmo_fid),
+                              PFID(&lsm->lsm_md_oinfo[idx].lmo_fid));
+                       return;
+               }
+       }
+
+       md_update_lsm_md(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
+                        md->body, ll_md_blocking_ast);
+}
+
 void ll_clear_inode(struct inode *inode)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
@@ -1271,15 +1450,17 @@ void ll_clear_inode(struct inode *inode)
 #endif
         lli->lli_inode_magic = LLI_INODE_DEAD;
 
-        ll_clear_inode_capas(inode);
-        if (!S_ISDIR(inode->i_mode))
-                LASSERT(cfs_list_empty(&lli->lli_agl_list));
+       ll_clear_inode_capas(inode);
+       if (S_ISDIR(inode->i_mode))
+               ll_dir_clear_lsm_md(inode);
+       else
+               LASSERT(list_empty(&lli->lli_agl_list));
 
-        /*
-         * XXX This has to be done before lsm is freed below, because
-         * cl_object still uses inode lsm.
-         */
-        cl_inode_fini(inode);
+       /*
+        * XXX This has to be done before lsm is freed below, because
+        * cl_object still uses inode lsm.
+        */
+       cl_inode_fini(inode);
        lli->lli_has_smd = false;
 
        EXIT;
@@ -1739,7 +1920,10 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
                        lli->lli_maxbytes = MAX_LFS_FILESIZE;
        }
 
-        if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
+       if (S_ISDIR(inode->i_mode) && md->lmv != NULL)
+               ll_update_lsm_md(inode, md);
+
+       if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
                 if (body->valid & OBD_MD_FLRMTPERM)
                         ll_update_remote_perm(inode, md->remote_perm);
         }
@@ -1754,7 +1938,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
 #endif
        inode->i_ino = cl_fid_build_ino(&body->fid1,
                                        sbi->ll_flags & LL_SBI_32BIT_API);
-        inode->i_generation = cl_fid_build_gen(&body->fid1);
+       inode->i_generation = cl_fid_build_gen(&body->fid1);
 
         if (body->valid & OBD_MD_FLATIME) {
                 if (body->atime > LTIME_S(inode->i_atime))
@@ -1796,17 +1980,18 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md)
        if (body->valid & OBD_MD_FLRDEV)
                inode->i_rdev = old_decode_dev(body->rdev);
 
-        if (body->valid & OBD_MD_FLID) {
-                /* FID shouldn't be changed! */
-                if (fid_is_sane(&lli->lli_fid)) {
-                        LASSERTF(lu_fid_eq(&lli->lli_fid, &body->fid1),
-                                 "Trying to change FID "DFID
+       if (body->valid & OBD_MD_FLID) {
+               /* FID shouldn't be changed! */
+               if (fid_is_sane(&lli->lli_fid)) {
+                       LASSERTF(lu_fid_eq(&lli->lli_fid, &body->fid1),
+                                "Trying to change FID "DFID
                                 " to the "DFID", inode "DFID"(%p)\n",
                                 PFID(&lli->lli_fid), PFID(&body->fid1),
                                 PFID(ll_inode2fid(inode)), inode);
-                } else
-                        lli->lli_fid = body->fid1;
-        }
+               } else {
+                       lli->lli_fid = body->fid1;
+               }
+       }
 
         LASSERT(fid_seq(&lli->lli_fid) != 0);
 
@@ -2152,9 +2337,9 @@ int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
                  struct super_block *sb, struct lookup_intent *it)
 {
        struct ll_sb_info *sbi = NULL;
-       struct lustre_md md;
-        int rc;
-        ENTRY;
+       struct lustre_md md = { 0 };
+       int rc;
+       ENTRY;
 
         LASSERT(*inode || sb);
         sbi = sb ? ll_s2sbi(sb) : ll_i2sbi(*inode);
@@ -2319,13 +2504,13 @@ struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data,
        op_data->op_fid1 = *ll_inode2fid(i1);
        op_data->op_capa1 = ll_mdscapa_get(i1);
        if (S_ISDIR(i1->i_mode))
-               op_data->op_mea1 = ll_i2info(i1)->lli_lmv_md;
+               op_data->op_mea1 = ll_i2info(i1)->lli_lsm_md;
 
        if (i2) {
                op_data->op_fid2 = *ll_inode2fid(i2);
                op_data->op_capa2 = ll_mdscapa_get(i2);
                if (S_ISDIR(i2->i_mode))
-                       op_data->op_mea2 = ll_i2info(i2)->lli_lmv_md;
+                       op_data->op_mea2 = ll_i2info(i2)->lli_lsm_md;
        } else {
                fid_zero(&op_data->op_fid2);
                op_data->op_capa2 = NULL;
index 68616e9..f96d17e 100644 (file)
@@ -58,12 +58,6 @@ __u32 get_uuid2int(const char *name, int len)
         return (key0 << 1);
 }
 
-static int ll_nfs_test_inode(struct inode *inode, void *opaque)
-{
-        return lu_fid_eq(&ll_i2info(inode)->lli_fid,
-                         (struct lu_fid *)opaque);
-}
-
 struct inode *search_inode_for_lustre(struct super_block *sb,
                                      const struct lu_fid *fid)
 {
@@ -79,9 +73,9 @@ struct inode *search_inode_for_lustre(struct super_block *sb,
 
         CDEBUG(D_INFO, "searching inode for:(%lu,"DFID")\n", hash, PFID(fid));
 
-        inode = ilookup5(sb, hash, ll_nfs_test_inode, (void *)fid);
-        if (inode)
-                RETURN(inode);
+       inode = ilookup5(sb, hash, ll_test_inode_by_fid, (void *)fid);
+       if (inode)
+               RETURN(inode);
 
         rc = ll_get_max_mdsize(sbi, &eadatalen);
         if (rc)
index b3780e2..da02c9f 100644 (file)
@@ -195,6 +195,11 @@ static void ll_invalidate_negative_children(struct inode *dir)
        ll_unlock_dcache(dir);
 }
 
+int ll_test_inode_by_fid(struct inode *inode, void *opaque)
+{
+       return lu_fid_eq(&ll_i2info(inode)->lli_fid, opaque);
+}
+
 int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                       void *data, int flag)
 {
@@ -291,10 +296,40 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                }
 
                if ((bits & MDS_INODELOCK_UPDATE) && S_ISDIR(inode->i_mode)) {
-                       CDEBUG(D_INODE, "invalidating inode "DFID"\n",
-                              PFID(ll_inode2fid(inode)));
+                       struct ll_inode_info *lli = ll_i2info(inode);
+
+                       CDEBUG(D_INODE, "invalidating inode "DFID" lli = %p, "
+                              "pfid  = "DFID"\n", PFID(ll_inode2fid(inode)),
+                              lli, PFID(&lli->lli_pfid));
                        truncate_inode_pages(inode->i_mapping, 0);
-                       ll_invalidate_negative_children(inode);
+
+                       if (unlikely(!fid_is_zero(&lli->lli_pfid))) {
+                               struct inode *master_inode = NULL;
+                               unsigned long hash;
+
+                               /* This is slave inode, since all of the child
+                                * dentry is connected on the master inode, so
+                                * we have to invalidate the negative children
+                                * on master inode */
+                               CDEBUG(D_INODE, "Invalidate s"DFID" m"DFID"\n",
+                                      PFID(ll_inode2fid(inode)),
+                                      PFID(&lli->lli_pfid));
+
+                               hash = cl_fid_build_ino(&lli->lli_pfid,
+                                       ll_need_32bit_api(ll_i2sbi(inode)));
+
+                               master_inode = ilookup5(inode->i_sb, hash,
+                                                       ll_test_inode_by_fid,
+                                                       (void *)&lli->lli_pfid);
+                               if (master_inode != NULL &&
+                                       !IS_ERR(master_inode)) {
+                                       ll_invalidate_negative_children(
+                                                               master_inode);
+                                       iput(master_inode);
+                               }
+                       } else {
+                               ll_invalidate_negative_children(inode);
+                       }
                }
 
                if ((bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM)) &&
index d59200f..0eae5a8 100644 (file)
@@ -157,6 +157,174 @@ out:
        return rc;
 }
 
+#ifdef __KERNEL__
+int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody,
+                         struct lmv_stripe_md *lsm,
+                         ldlm_blocking_callback cb_blocking,
+                         int extra_lock_flags)
+{
+       struct obd_device      *obd = exp->exp_obd;
+       struct lmv_obd         *lmv = &obd->u.lmv;
+       struct mdt_body         *body;
+       struct md_op_data      *op_data;
+       unsigned long           size = 0;
+       unsigned long           nlink = 0;
+       obd_time                atime = 0;
+       obd_time                ctime = 0;
+       obd_time                mtime = 0;
+       int                     i;
+       int                     rc = 0;
+
+       ENTRY;
+
+       /**
+        * revalidate slaves has some problems, temporarily return,
+        * we may not need that
+        */
+       if (lsm->lsm_md_stripe_count <= 1)
+               RETURN(0);
+
+       OBD_ALLOC_PTR(op_data);
+       if (op_data == NULL)
+               RETURN(-ENOMEM);
+
+       /**
+        * Loop over the stripe information, check validity and update them
+        * from MDS if needed.
+        */
+       for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
+               struct lu_fid           fid;
+               struct lookup_intent    it = { .it_op = IT_GETATTR };
+               struct ptlrpc_request   *req = NULL;
+               struct lustre_handle    *lockh = NULL;
+               struct lmv_tgt_desc     *tgt = NULL;
+               struct inode            *inode;
+
+               fid = lsm->lsm_md_oinfo[i].lmo_fid;
+               inode = lsm->lsm_md_oinfo[i].lmo_root;
+               if (i == 0) {
+                       if (mbody != NULL) {
+                               body = mbody;
+                               goto update;
+                       } else {
+                               goto release_lock;
+                       }
+               }
+
+               /*
+                * Prepare op_data for revalidating. Note that @fid2 shluld be
+                * defined otherwise it will go to server and take new lock
+                * which is not needed here.
+                */
+               memset(op_data, 0, sizeof(*op_data));
+               op_data->op_fid1 = fid;
+               op_data->op_fid2 = fid;
+
+               tgt = lmv_locate_mds(lmv, op_data, &fid);
+               if (IS_ERR(tgt))
+                       GOTO(cleanup, rc = PTR_ERR(tgt));
+
+               CDEBUG(D_INODE, "Revalidate slave "DFID" -> mds #%d\n",
+                      PFID(&fid), tgt->ltd_idx);
+
+               rc = md_intent_lock(tgt->ltd_exp, op_data, NULL, 0, &it, 0,
+                                   &req, cb_blocking, extra_lock_flags);
+               if (rc < 0)
+                       GOTO(cleanup, rc);
+
+               lockh = (struct lustre_handle *)&it.d.lustre.it_lock_handle;
+               if (rc > 0 && req == NULL) {
+                       /* slave inode is still valid */
+                       CDEBUG(D_INODE, "slave "DFID" is still valid.\n",
+                              PFID(&fid));
+                       rc = 0;
+               } else {
+                       /* refresh slave from server */
+                       body = req_capsule_server_get(&req->rq_pill,
+                                                     &RMF_MDT_BODY);
+                       LASSERT(body != NULL);
+update:
+                       if (unlikely(body->nlink < 2)) {
+                               CERROR("%s: nlink %d < 2 corrupt stripe %d "DFID
+                                      ":" DFID"\n", obd->obd_name, body->nlink,
+                                      i, PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
+                                      PFID(&lsm->lsm_md_oinfo[0].lmo_fid));
+
+                               if (req != NULL)
+                                       ptlrpc_req_finished(req);
+
+                               GOTO(cleanup, rc = -EIO);
+                       }
+
+                       if (i != 0)
+                               md_set_lock_data(tgt->ltd_exp, &lockh->cookie,
+                                                inode, NULL);
+
+                       i_size_write(inode, body->size);
+                       set_nlink(inode, body->nlink);
+                       LTIME_S(inode->i_atime) = body->atime;
+                       LTIME_S(inode->i_ctime) = body->ctime;
+                       LTIME_S(inode->i_mtime) = body->mtime;
+
+                       if (req != NULL)
+                               ptlrpc_req_finished(req);
+               }
+release_lock:
+               size += i_size_read(inode);
+
+               if (i != 0)
+                       nlink += inode->i_nlink - 2;
+               else
+                       nlink += inode->i_nlink;
+
+               atime = LTIME_S(inode->i_atime) > atime ?
+                               LTIME_S(inode->i_atime) : atime;
+               ctime = LTIME_S(inode->i_ctime) > ctime ?
+                               LTIME_S(inode->i_ctime) : ctime;
+               mtime = LTIME_S(inode->i_mtime) > mtime ?
+                               LTIME_S(inode->i_mtime) : mtime;
+
+               if (it.d.lustre.it_lock_mode != 0 && lockh != NULL) {
+                       ldlm_lock_decref(lockh, it.d.lustre.it_lock_mode);
+                       it.d.lustre.it_lock_mode = 0;
+               }
+
+               CDEBUG(D_INODE, "i %d "DFID" size %llu, nlink %u, atime "
+                      "%lu, mtime %lu, ctime %lu.\n", i, PFID(&fid),
+                      i_size_read(inode), inode->i_nlink,
+                      LTIME_S(inode->i_atime), LTIME_S(inode->i_mtime),
+                      LTIME_S(inode->i_ctime));
+       }
+
+       /*
+        * update attr of master request.
+        */
+       CDEBUG(D_INODE, "Return refreshed attrs: size = %lu nlink %lu atime "
+              LPU64 "ctime "LPU64" mtime "LPU64" for " DFID"\n", size, nlink,
+              atime, ctime, mtime, PFID(&lsm->lsm_md_oinfo[0].lmo_fid));
+
+       if (mbody != NULL) {
+               mbody->atime = atime;
+               mbody->ctime = ctime;
+               mbody->mtime = mtime;
+       }
+cleanup:
+       OBD_FREE_PTR(op_data);
+       RETURN(rc);
+}
+
+#else
+
+int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody,
+                         struct lmv_stripe_md *lsm,
+                         ldlm_blocking_callback cb_blocking,
+                         int extra_lock_flags)
+{
+       return 0;
+}
+
+#endif
+
 /*
  * IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
  * may be split dir.
@@ -176,13 +344,26 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
 
        /* Note: client might open with some random flags(sanity 33b), so we can
         * not make sure op_fid2 is being initialized with BY_FID flag */
-       if (it->it_flags & MDS_OPEN_BY_FID && fid_is_sane(&op_data->op_fid2))
-               tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
-       else
+       if (it->it_flags & MDS_OPEN_BY_FID && fid_is_sane(&op_data->op_fid2)) {
+               if (op_data->op_mea1 != NULL) {
+                       struct lmv_stripe_md    *lsm = op_data->op_mea1;
+                       const struct lmv_oinfo  *oinfo;
+
+                       oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
+                                                       op_data->op_namelen);
+                       op_data->op_fid1 = oinfo->lmo_fid;
+               }
+
+               tgt = lmv_find_target(lmv, &op_data->op_fid2);
+               if (IS_ERR(tgt))
+                       RETURN(PTR_ERR(tgt));
+
+               op_data->op_mds = tgt->ltd_idx;
+       } else {
                tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
-
-       if (IS_ERR(tgt))
-               RETURN(PTR_ERR(tgt));
+               if (IS_ERR(tgt))
+                       RETURN(PTR_ERR(tgt));
+       }
 
        /* If it is ready to open the file by FID, do not need
         * allocate FID at all, otherwise it will confuse MDT */
@@ -218,31 +399,18 @@ int lmv_intent_open(struct obd_export *exp, struct md_op_data *op_data,
        body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
        if (body == NULL)
                RETURN(-EPROTO);
-       /*
-        * Not cross-ref case, just get out of here.
-        */
-       if (likely(!(body->valid & OBD_MD_MDS)))
-               RETURN(0);
 
-       /*
-        * Okay, MDS has returned success. Probably name has been resolved in
-        * remote inode.
-        */
-       rc = lmv_intent_remote(exp, lmm, lmmsize, it, &op_data->op_fid1, flags,
-                              reqp, cb_blocking, extra_lock_flags);
-       if (rc != 0) {
-               LASSERT(rc < 0);
-               /*
-                * This is possible, that some userspace application will try to
-                * open file as directory and we will have -ENOTDIR here. As
-                * this is normal situation, we should not print error here,
-                * only debug info.
-                */
-               CDEBUG(D_INODE, "Can't handle remote %s: dir "DFID"("DFID"):"
-                      "%*s: %d\n", LL_IT2STR(it), PFID(&op_data->op_fid2),
-                      PFID(&op_data->op_fid1), op_data->op_namelen,
-                      op_data->op_name, rc);
-               RETURN(rc);
+       /* Not cross-ref case, just get out of here. */
+       if (unlikely((body->valid & OBD_MD_MDS))) {
+               rc = lmv_intent_remote(exp, lmm, lmmsize, it, &op_data->op_fid1,
+                                      flags, reqp, cb_blocking,
+                                      extra_lock_flags);
+               if (rc != 0)
+                       RETURN(rc);
+
+               body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
+               if (body == NULL)
+                       RETURN(-EPROTO);
        }
 
        RETURN(rc);
@@ -282,9 +450,21 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
        rc = md_intent_lock(tgt->ltd_exp, op_data, lmm, lmmsize, it,
                             flags, reqp, cb_blocking, extra_lock_flags);
 
-       if (rc < 0 || *reqp == NULL)
+       if (rc < 0)
                RETURN(rc);
 
+       if (*reqp == NULL) {
+               /* If RPC happens, lsm information will be revalidated
+                * during update_inode process (see ll_update_lsm_md) */
+               if (op_data->op_mea2 != NULL) {
+                       rc = lmv_revalidate_slaves(exp, NULL, op_data->op_mea2,
+                                               cb_blocking, extra_lock_flags);
+                       if (rc != 0)
+                               RETURN(rc);
+               }
+               RETURN(rc);
+       }
+
        /*
         * MDS has returned success. Probably name has been resolved in
         * remote inode. Let's check this.
@@ -292,12 +472,17 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
        body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
        if (body == NULL)
                RETURN(-EPROTO);
-       /* Not cross-ref case, just get out of here. */
-       if (likely(!(body->valid & OBD_MD_MDS)))
-               RETURN(0);
 
-       rc = lmv_intent_remote(exp, lmm, lmmsize, it, NULL, flags, reqp,
-                              cb_blocking, extra_lock_flags);
+       /* Not cross-ref case, just get out of here. */
+       if (unlikely((body->valid & OBD_MD_MDS))) {
+               rc = lmv_intent_remote(exp, lmm, lmmsize, it, NULL, flags,
+                                      reqp, cb_blocking, extra_lock_flags);
+               if (rc != 0)
+                       RETURN(rc);
+               body = req_capsule_server_get(&(*reqp)->rq_pill, &RMF_MDT_BODY);
+               if (body == NULL)
+                       RETURN(-EPROTO);
+       }
 
        RETURN(rc);
 }
index de07e16..78e4009 100644 (file)
@@ -78,6 +78,14 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
 int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
                   struct md_op_data *op_data);
 
+int lmv_unpack_md(struct obd_export *exp, struct lmv_stripe_md **lsmp,
+                 const union lmv_mds_md *lmm, int stripe_count);
+
+int lmv_revalidate_slaves(struct obd_export *exp, struct mdt_body *mbody,
+                         struct lmv_stripe_md *lsm,
+                         ldlm_blocking_callback cb_blocking,
+                         int extra_lock_flags);
+
 static inline struct lmv_tgt_desc *
 lmv_get_target(struct lmv_obd *lmv, mdsno_t mds)
 {
@@ -110,41 +118,31 @@ lmv_find_target(struct lmv_obd *lmv, const struct lu_fid *fid)
         return lmv_get_target(lmv, mds);
 }
 
-static inline unsigned int
-mea_last_char_hash(unsigned int count, const char *name, int namelen)
+static inline int lmv_stripe_md_size(int stripe_count)
 {
-       unsigned int c;
-
-       c = name[namelen - 1];
-       if (c == 0)
-               CWARN("invalid name %.*s\n", namelen, name);
-
-       c = c % count;
+       struct lmv_stripe_md *lsm;
 
-       return c;
+       return sizeof(*lsm) + stripe_count * sizeof(lsm->lsm_md_oinfo[0]);
 }
 
-static inline unsigned int
-mea_all_chars_hash(unsigned int count, const char *name, int namelen)
-{
-       unsigned int c = 0;
-
-       while (--namelen >= 0)
-               c += name[namelen];
-
-       c = c % count;
-
-       return c;
-}
+int lmv_name_to_stripe_index(enum lmv_hash_type hashtype,
+                            unsigned int max_mdt_index,
+                            const char *name, int namelen);
 
-static inline int lmv_stripe_md_size(int stripe_count)
+static inline const struct lmv_oinfo *
+lsm_name_to_stripe_info(const struct lmv_stripe_md *lsm, const char *name,
+                       int namelen)
 {
-       struct lmv_stripe_md *lsm;
+       int stripe_index;
 
-       return sizeof(*lsm) + stripe_count * sizeof(lsm->lsm_md_oinfo[0]);
+       stripe_index = lmv_name_to_stripe_index(lsm->lsm_md_hash_type,
+                                               lsm->lsm_md_stripe_count,
+                                               name, namelen);
+       LASSERT(stripe_index < lsm->lsm_md_stripe_count);
+
+       return &lsm->lsm_md_oinfo[stripe_index];
 }
 
-int raw_name2idx(int hashtype, int count, const char *name, int namelen);
 
 struct lmv_tgt_desc
 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
index e0941c9..4e1fb39 100644 (file)
 #include <lustre_fid.h>
 #include "lmv_internal.h"
 
-int raw_name2idx(int hashtype, int count, const char *name, int namelen)
+/* This hash is only for testing purpose */
+static inline unsigned int
+lmv_hash_all_chars(unsigned int count, const char *name, int namelen)
 {
-       unsigned int    c = 0;
-       int             idx;
+       unsigned int c = 0;
+       const unsigned char *p = (const unsigned char *)name;
 
-       LASSERT(namelen > 0);
+       while (--namelen >= 0)
+               c += p[namelen];
 
-       if (filename_is_volatile(name, namelen, &idx)) {
-               if (idx >= 0 && idx < count)
-                       return idx;
-               goto choose_hash;
-       }
+       c = c % count;
+
+       return c;
+}
+
+static inline unsigned int
+lmv_hash_fnv1a(unsigned int count, const char *name, int namelen)
+{
+       __u64   hash;
+
+       hash = lustre_hash_fnv_1a_64(name, namelen);
+
+       hash = hash % count;
+
+       return hash;
+}
 
-       if (count <= 1)
+int lmv_name_to_stripe_index(enum lmv_hash_type hashtype,
+                            unsigned int max_mdt_index,
+                            const char *name, int namelen)
+{
+       int     idx;
+
+       LASSERT(namelen > 0);
+       if (max_mdt_index <= 1)
                return 0;
 
-choose_hash:
        switch (hashtype) {
-       case MEA_MAGIC_LAST_CHAR:
-               c = mea_last_char_hash(count, name, namelen);
+       case LMV_HASH_TYPE_ALL_CHARS:
+               idx = lmv_hash_all_chars(max_mdt_index, name, namelen);
                break;
-       case MEA_MAGIC_ALL_CHARS:
-               c = mea_all_chars_hash(count, name, namelen);
-               break;
-       case MEA_MAGIC_HASH_SEGMENT:
-               CERROR("Unsupported hash type MEA_MAGIC_HASH_SEGMENT\n");
+       case LMV_HASH_TYPE_FNV_1A_64:
+               idx = lmv_hash_fnv1a(max_mdt_index, name, namelen);
                break;
        default:
                CERROR("Unknown hash type 0x%x\n", hashtype);
+               return -EINVAL;
        }
 
-       LASSERT(c < count);
-       return c;
+       LASSERT(idx < max_mdt_index);
+       return idx;
 }
 
 static void lmv_activate_target(struct lmv_obd *lmv,
@@ -1322,27 +1340,17 @@ static int lmv_placement_policy(struct obd_device *obd,
         * If stripe_offset is provided during setdirstripe
         * (setdirstripe -i xx), xx MDS will be choosen.
         */
-       if (op_data->op_cli_flags & CLI_SET_MEA) {
+       if (op_data->op_cli_flags & CLI_SET_MEA && op_data->op_data != NULL) {
                struct lmv_user_md *lum;
 
-               lum = (struct lmv_user_md *)op_data->op_data;
-               if (lum->lum_type == LMV_STRIPE_TYPE &&
-                   lum->lum_stripe_offset != -1) {
-                       if (lum->lum_stripe_offset >= lmv->desc.ld_tgt_count) {
-                               CERROR("%s: Stripe_offset %d > MDT count %d:"
-                                      " rc = %d\n", obd->obd_name,
-                                      lum->lum_stripe_offset,
-                                      lmv->desc.ld_tgt_count, -ERANGE);
-                               RETURN(-ERANGE);
-                       }
-                       *mds = lum->lum_stripe_offset;
-                       RETURN(0);
-               }
+               lum = op_data->op_data;
+               *mds = lum->lum_stripe_offset;
+       } else {
+               /* Allocate new fid on target according to operation type and
+                * parent home mds. */
+               *mds = op_data->op_mds;
        }
 
-       /* Allocate new fid on target according to operation type and parent
-        * home mds. */
-       *mds = op_data->op_mds;
        RETURN(0);
 }
 
@@ -1751,17 +1759,37 @@ static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
         RETURN(rc);
 }
 
+/**
+ * Choosing the MDT by name or FID in @op_data.
+ * For non-striped directory, it will locate MDT by fid.
+ * For striped-directory, it will locate MDT by name. And also
+ * it will reset op_fid1 with the FID of the choosen stripe.
+ **/
 struct lmv_tgt_desc
 *lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
                struct lu_fid *fid)
 {
-       struct lmv_tgt_desc *tgt;
+       struct lmv_stripe_md    *lsm = op_data->op_mea1;
+       struct lmv_tgt_desc     *tgt;
+       const struct lmv_oinfo  *oinfo;
 
-       tgt = lmv_find_target(lmv, fid);
-       if (IS_ERR(tgt))
+       if (lsm == NULL || lsm->lsm_md_stripe_count <= 1 ||
+           op_data->op_namelen == 0) {
+               tgt = lmv_find_target(lmv, fid);
+               if (IS_ERR(tgt))
+                       return tgt;
+
+               op_data->op_mds = tgt->ltd_idx;
                return tgt;
+       }
 
-       op_data->op_mds = tgt->ltd_idx;
+       oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
+                                       op_data->op_namelen);
+       *fid = oinfo->lmo_fid;
+       op_data->op_mds = oinfo->lmo_mds;
+       tgt = lmv_get_target(lmv, op_data->op_mds);
+
+       CDEBUG(D_INFO, "locate on mds %u\n", op_data->op_mds);
 
        return tgt;
 }
@@ -1788,18 +1816,28 @@ int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
 
+       CDEBUG(D_INODE, "CREATE name '%.*s' on "DFID" -> mds #%x\n",
+              op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
+              op_data->op_mds);
+
        rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
        if (rc)
                RETURN(rc);
 
-       CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
-              op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
-              op_data->op_mds);
+       /* Send the create request to the MDT where the object
+        * will be located */
+       tgt = lmv_find_target(lmv, &op_data->op_fid2);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
+       op_data->op_mds = tgt->ltd_idx;
+
+       CDEBUG(D_INODE, "CREATE obj "DFID" -> mds #%x\n",
+              PFID(&op_data->op_fid2), op_data->op_mds);
 
        op_data->op_flags |= MF_MDC_CANCEL_FID1;
        rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
                       cap_effective, rdev, request);
-
        if (rc == 0) {
                if (*request == NULL)
                        RETURN(rc);
@@ -2047,6 +2085,15 @@ static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
        op_data->op_fsuid = current_fsuid();
        op_data->op_fsgid = current_fsgid();
        op_data->op_cap = cfs_curproc_cap_pack();
+       if (op_data->op_mea2 != NULL) {
+               struct lmv_stripe_md    *lsm = op_data->op_mea2;
+               const struct lmv_oinfo  *oinfo;
+
+               oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
+                                               op_data->op_namelen);
+               op_data->op_fid2 = oinfo->lmo_fid;
+       }
+
        tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
@@ -2069,33 +2116,54 @@ static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
                       const char *old, int oldlen, const char *new, int newlen,
                       struct ptlrpc_request **request)
 {
-        struct obd_device       *obd = exp->exp_obd;
-        struct lmv_obd          *lmv = &obd->u.lmv;
-        struct lmv_tgt_desc     *src_tgt;
-       struct lmv_tgt_desc     *tgt_tgt;
+       struct obd_device       *obd = exp->exp_obd;
+       struct lmv_obd          *lmv = &obd->u.lmv;
+       struct lmv_tgt_desc     *src_tgt;
        int                     rc;
        ENTRY;
 
-        LASSERT(oldlen != 0);
+       LASSERT(oldlen != 0);
 
-        CDEBUG(D_INODE, "RENAME %*s in "DFID" to %*s in "DFID"\n",
-               oldlen, old, PFID(&op_data->op_fid1),
-               newlen, new, PFID(&op_data->op_fid2));
+       CDEBUG(D_INODE, "RENAME %.*s in "DFID":%d to %.*s in "DFID":%d\n",
+              oldlen, old, PFID(&op_data->op_fid1),
+              op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0,
+              newlen, new, PFID(&op_data->op_fid2),
+              op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0);
 
-        rc = lmv_check_connect(obd);
-        if (rc)
-                RETURN(rc);
+       rc = lmv_check_connect(obd);
+       if (rc)
+               RETURN(rc);
 
        op_data->op_fsuid = current_fsuid();
        op_data->op_fsgid = current_fsgid();
        op_data->op_cap = cfs_curproc_cap_pack();
-       src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
-       if (IS_ERR(src_tgt))
-               RETURN(PTR_ERR(src_tgt));
 
-       tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
-       if (IS_ERR(tgt_tgt))
-               RETURN(PTR_ERR(tgt_tgt));
+       if (op_data->op_mea1 != NULL) {
+               struct lmv_stripe_md    *lsm = op_data->op_mea1;
+               const struct lmv_oinfo  *oinfo;
+
+               oinfo = lsm_name_to_stripe_info(lsm, old, oldlen);
+               op_data->op_fid1 = oinfo->lmo_fid;
+               op_data->op_mds = oinfo->lmo_mds;
+               src_tgt = lmv_get_target(lmv, op_data->op_mds);
+               if (IS_ERR(src_tgt))
+                       RETURN(PTR_ERR(src_tgt));
+       } else {
+               src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
+               if (IS_ERR(src_tgt))
+                       RETURN(PTR_ERR(src_tgt));
+
+               op_data->op_mds = src_tgt->ltd_idx;
+       }
+
+       if (op_data->op_mea2) {
+               struct lmv_stripe_md    *lsm = op_data->op_mea2;
+               const struct lmv_oinfo  *oinfo;
+
+               oinfo = lsm_name_to_stripe_info(lsm, new, newlen);
+               op_data->op_fid2 = oinfo->lmo_fid;
+       }
+
        /*
         * LOOKUP lock on src child (fid3) should also be cancelled for
         * src_tgt in mdc_rename.
@@ -3023,7 +3091,7 @@ int lmv_intent_getattr_async(struct obd_export *exp,
        if (rc)
                RETURN(rc);
 
-       tgt = lmv_find_target(lmv, &op_data->op_fid1);
+       tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
 
@@ -3131,6 +3199,51 @@ int lmv_quotacheck(struct obd_device *unused, struct obd_export *exp,
         RETURN(rc);
 }
 
+int lmv_update_lsm_md(struct obd_export *exp, struct lmv_stripe_md *lsm,
+                     struct mdt_body *body, ldlm_blocking_callback cb_blocking)
+{
+       if (lsm->lsm_md_stripe_count <= 1)
+               return 0;
+
+       return lmv_revalidate_slaves(exp, body, lsm, cb_blocking, 0);
+}
+
+int lmv_merge_attr(struct obd_export *exp, const struct lmv_stripe_md *lsm,
+                  struct cl_attr *attr)
+{
+#ifdef __KERNEL__
+       int i;
+
+       for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
+               struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root;
+
+               CDEBUG(D_INFO, ""DFID" size %llu, nlink %u, atime %lu ctime"
+                      "%lu, mtime %lu.\n", PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
+                      i_size_read(inode), inode->i_nlink,
+                      LTIME_S(inode->i_atime), LTIME_S(inode->i_ctime),
+                      LTIME_S(inode->i_mtime));
+
+               /* for slave stripe, it needs to subtract nlink for . and .. */
+               if (i != 0)
+                       attr->cat_nlink += inode->i_nlink - 2;
+               else
+                       attr->cat_nlink = inode->i_nlink;
+
+               attr->cat_size += i_size_read(inode);
+
+               if (attr->cat_atime < LTIME_S(inode->i_atime))
+                       attr->cat_atime = LTIME_S(inode->i_atime);
+
+               if (attr->cat_ctime < LTIME_S(inode->i_ctime))
+                       attr->cat_ctime = LTIME_S(inode->i_ctime);
+
+               if (attr->cat_mtime < LTIME_S(inode->i_mtime))
+                       attr->cat_mtime = LTIME_S(inode->i_mtime);
+       }
+#endif
+       return 0;
+}
+
 struct obd_ops lmv_obd_ops = {
         .o_owner                = THIS_MODULE,
         .o_setup                = lmv_setup,
@@ -3174,8 +3287,10 @@ struct md_ops lmv_md_ops = {
         .m_cancel_unused        = lmv_cancel_unused,
         .m_set_lock_data        = lmv_set_lock_data,
         .m_lock_match           = lmv_lock_match,
-        .m_get_lustre_md        = lmv_get_lustre_md,
-        .m_free_lustre_md       = lmv_free_lustre_md,
+       .m_get_lustre_md        = lmv_get_lustre_md,
+       .m_free_lustre_md       = lmv_free_lustre_md,
+       .m_update_lsm_md        = lmv_update_lsm_md,
+       .m_merge_attr           = lmv_merge_attr,
         .m_set_open_replay_data = lmv_set_open_replay_data,
         .m_clear_open_replay_data = lmv_clear_open_replay_data,
         .m_renew_capa           = lmv_renew_capa,
index 02a1e1e..0a77113 100644 (file)
@@ -418,71 +418,53 @@ static struct thandle *lod_trans_create(const struct lu_env *env,
        if (IS_ERR(th))
                return th;
 
-       CFS_INIT_LIST_HEAD(&th->th_remote_update_list);
        return th;
 }
 
-static int lod_remote_sync(const struct lu_env *env, struct dt_device *dev,
-                          struct thandle *th)
-{
-       struct update_request *update;
-       int    rc = 0;
-       ENTRY;
-
-       if (cfs_list_empty(&th->th_remote_update_list))
-               RETURN(0);
-
-       cfs_list_for_each_entry(update, &th->th_remote_update_list,
-                               ur_list) {
-               /* In DNE phase I, there should be only one OSP
-                * here, so we will do send/receive one by one,
-                * instead of sending them parallel, will fix this
-                * in Phase II */
-               th->th_current_request = update;
-               rc = dt_trans_start(env, update->ur_dt, th);
-               if (rc != 0) {
-                       /* FIXME how to revert the partial results
-                        * once error happened? Resolved by 2 Phase commit */
-                       update->ur_rc = rc;
-                       break;
-               }
-       }
-
-       RETURN(rc);
-}
-
 static int lod_trans_start(const struct lu_env *env, struct dt_device *dev,
                           struct thandle *th)
 {
        struct lod_device *lod = dt2lod_dev((struct dt_device *) dev);
-       int rc;
+       int rc = 0;
 
-       rc = lod_remote_sync(env, dev, th);
-       if (rc)
-               return rc;
+       if (unlikely(th->th_update != NULL)) {
+               struct thandle_update *tu = th->th_update;
+               struct update_request *update;
 
+               list_for_each_entry(update, &tu->tu_remote_update_list,
+                                   ur_list) {
+                       LASSERT(update->ur_dt != NULL);
+                       rc = dt_trans_start(env, update->ur_dt, th);
+                       if (rc != 0)
+                               return rc;
+               }
+       }
        return dt_trans_start(env, lod->lod_child, th);
 }
 
-static int lod_trans_stop(const struct lu_env *env, struct thandle *th)
+static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt,
+                         struct thandle *th)
 {
-       struct update_request *update;
-       struct update_request *tmp;
-       int rc = 0;
-       int rc2 = 0;
+       struct thandle_update   *tu = th->th_update;
+       struct update_request   *update;
+       struct update_request   *tmp;
+       int                     rc2 = 0;
+       int                     rc;
+       ENTRY;
+
+       rc = dt_trans_stop(env, th->th_dev, th);
+       if (likely(tu == NULL))
+               RETURN(rc);
 
-       cfs_list_for_each_entry_safe(update, tmp,
-                                    &th->th_remote_update_list,
-                                    ur_list) {
-               th->th_current_request = update;
+       list_for_each_entry_safe(update, tmp, &tu->tu_remote_update_list,
+                                ur_list) {
+               /* update will be freed inside dt_trans_stop */
                rc2 = dt_trans_stop(env, update->ur_dt, th);
                if (unlikely(rc2 != 0 && rc == 0))
                        rc = rc2;
        }
 
-       rc2 = dt_trans_stop(env, th->th_dev, th);
-
-       return rc2 != 0 ? rc2 : rc;
+       RETURN(rc);
 }
 
 static void lod_conf_get(const struct lu_env *env,
index 6d99ad5..3647791 100644 (file)
@@ -129,6 +129,7 @@ struct lod_tgt_desc_idx {
         TGT_PTRS_PER_BLOCK]->ldi_tgt[(index) % TGT_PTRS_PER_BLOCK])
 
 #define OST_TGT(lod, index)   LTD_TGT(&lod->lod_ost_descs, index)
+#define MDT_TGT(lod, index)   LTD_TGT(&lod->lod_mdt_descs, index)
 struct lod_tgt_descs {
        /* list of known TGTs */
        struct lod_tgt_desc_idx *ltd_tgt_idx[TGT_PTRS];
@@ -198,6 +199,25 @@ struct lod_device {
 #define ltd_ost                ltd_tgt
 #define lod_ost_desc   lod_tgt_desc
 
+#define lod_mdts               lod_mdt_descs.ltd_tgts
+#define lod_mdt_bitmap         lod_mdt_descs.ltd_tgt_bitmap
+#define lod_remote_mdt_count   lod_mdt_descs.ltd_tgtnr
+#define lod_mdts_size          lod_mdt_descs.ltd_tgts_size
+#define ltd_mdt                        ltd_tgt
+#define lod_mdt_desc           lod_tgt_desc
+
+struct lod_dir_stripe_info {
+       __u32   ldsi_stripe_offset;
+       __u32   ldsi_def_stripenr;
+       __u32   ldsi_def_stripe_offset;
+       __u32   ldsi_def_hash_type;
+       __u32   ldsi_hash_type;
+
+       unsigned int ldsi_striping_cached:1,
+                    ldsi_def_striping_set:1,
+                    ldsi_striped:1;
+};
+
 /*
  * XXX: shrink this structure, currently it's 72bytes on 32bit arch,
  *      so, slab will be allocating 128bytes
@@ -206,6 +226,7 @@ struct lod_object {
        struct dt_object   ldo_obj;
 
        /* if object is striped, then the next fields describe stripes */
+       /* For striped directory, ldo_stripenr == slave stripe count */
        __u16              ldo_stripenr;
        __u16              ldo_layout_gen;
        __u32              ldo_stripe_size;
@@ -218,13 +239,25 @@ struct lod_object {
         * is cached in stripenr/stripe_size */
        unsigned int       ldo_stripes_allocated:16,
                           ldo_striping_cached:1,
-                          ldo_def_striping_set:1;
+                          ldo_def_striping_set:1,
+       /* ldo_dir_slave_stripe indicate this is a slave stripe of
+        * a striped dir */
+                          ldo_dir_slave_stripe:1;
        __u32              ldo_def_stripe_size;
        __u16              ldo_def_stripenr;
        __u16              ldo_def_stripe_offset;
+       struct lod_dir_stripe_info      *ldo_dir_stripe;
        mdsno_t            ldo_mds_num;
 };
 
+#define ldo_dir_stripe_offset  ldo_dir_stripe->ldsi_stripe_offset
+#define ldo_dir_def_stripenr   ldo_dir_stripe->ldsi_def_stripenr
+#define ldo_dir_hash_type      ldo_dir_stripe->ldsi_hash_type
+#define ldo_dir_def_hash_type  ldo_dir_stripe->ldsi_def_hash_type
+#define ldo_dir_striping_cached        ldo_dir_stripe->ldsi_striping_cached
+#define ldo_dir_striped                ldo_dir_stripe->ldsi_striped
+#define ldo_dir_def_striping_set       ldo_dir_stripe->ldsi_def_striping_set
+#define ldo_dir_def_stripe_offset      ldo_dir_stripe->ldsi_def_stripe_offset
 
 struct lod_it {
        struct dt_object        *lit_obj; /* object from the layer below */
@@ -235,6 +268,7 @@ struct lod_thread_info {
        /* per-thread buffer for LOV EA */
        void             *lti_ea_store;
        int               lti_ea_store_size;
+       /* per-thread buffer for LMV EA */
        struct lu_buf     lti_buf;
        struct ost_id     lti_ostid;
        struct lu_fid     lti_fid;
@@ -335,7 +369,27 @@ int lod_del_device(const struct lu_env *env, struct lod_device *lod,
 int lod_fini_tgt(const struct lu_env *env, struct lod_device *lod,
                 struct lod_tgt_descs *ltd, bool for_ost);
 int lod_load_striping(const struct lu_env *env, struct lod_object *mo);
-int lod_get_lov_ea(const struct lu_env *env, struct lod_object *mo);
+
+int lod_get_ea(const struct lu_env *env, struct lod_object *lo,
+              const char *name);
+static inline int
+lod_get_lov_ea(const struct lu_env *env, struct lod_object *lo)
+{
+       return lod_get_ea(env, lo, XATTR_NAME_LOV);
+}
+
+static inline int
+lod_get_lmv_ea(const struct lu_env *env, struct lod_object *lo)
+{
+       return lod_get_ea(env, lo, XATTR_NAME_LMV);
+}
+
+static inline int
+lod_get_default_lmv_ea(const struct lu_env *env, struct lod_object *lo)
+{
+       return lod_get_ea(env, lo, XATTR_NAME_DEFALT_LMV);
+}
+
 void lod_fix_desc(struct lov_desc *desc);
 void lod_fix_desc_qos_maxage(__u32 *val);
 void lod_fix_desc_pattern(__u32 *val);
@@ -345,6 +399,8 @@ int lod_pools_init(struct lod_device *m, struct lustre_cfg *cfg);
 int lod_pools_fini(struct lod_device *m);
 int lod_parse_striping(const struct lu_env *env, struct lod_object *mo,
                       const struct lu_buf *buf);
+int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
+                          const struct lu_buf *buf);
 int lod_initialize_objects(const struct lu_env *env, struct lod_object *mo,
                           struct lov_ost_data_v1 *objs);
 int lod_store_def_striping(const struct lu_env *env, struct dt_object *dt,
@@ -352,7 +408,7 @@ int lod_store_def_striping(const struct lu_env *env, struct dt_object *dt,
 int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, int specific);
 int lod_generate_and_set_lovea(const struct lu_env *env,
                               struct lod_object *mo, struct thandle *th);
-
+int lod_ea_store_resize(struct lod_thread_info *info, int size);
 /* lod_pool.c */
 int lod_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int min_count);
 int lod_ost_pool_remove(struct ost_pool *op, __u32 idx);
index 693d94b..5ae4a43 100644 (file)
@@ -483,7 +483,8 @@ int lod_ea_store_resize(struct lod_thread_info *info, int size)
 {
        int round = size_roundup_power2(size);
 
-       LASSERT(round <= lov_mds_md_size(LOV_MAX_STRIPE_COUNT, LOV_MAGIC_V3));
+       LASSERT(round <=
+               lov_mds_md_size(LOV_MAX_STRIPE_COUNT, LOV_MAGIC_V3));
        if (info->lti_ea_store) {
                LASSERT(info->lti_ea_store_size);
                LASSERT(info->lti_ea_store_size < round);
@@ -568,9 +569,9 @@ int lod_generate_and_set_lovea(const struct lu_env *env,
                objs[i].l_ost_gen    = cpu_to_le32(0);
                rc = lod_fld_lookup(env, lod, fid, &index, LU_SEQ_RANGE_OST);
                if (rc < 0) {
-                       lod_object_free_striping(env, lo);
                        CERROR("%s: Can not locate "DFID": rc = %d\n",
                               lod2obd(lod)->obd_name, PFID(fid), rc);
+                       lod_object_free_striping(env, lo);
                        RETURN(rc);
                }
                objs[i].l_ost_idx = cpu_to_le32(index);
@@ -584,34 +585,34 @@ int lod_generate_and_set_lovea(const struct lu_env *env,
        RETURN(rc);
 }
 
-int lod_get_lov_ea(const struct lu_env *env, struct lod_object *lo)
+int lod_get_ea(const struct lu_env *env, struct lod_object *lo,
+              const char *name)
 {
-       struct lod_thread_info *info = lod_env_info(env);
-       struct dt_object       *next = dt_object_child(&lo->ldo_obj);
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct dt_object        *next = dt_object_child(&lo->ldo_obj);
        int                     rc;
        ENTRY;
 
        LASSERT(info);
 
-       if (unlikely(info->lti_ea_store_size == 0)) {
+       if (unlikely(info->lti_ea_store == NULL)) {
                /* just to enter in allocation block below */
                rc = -ERANGE;
        } else {
 repeat:
                info->lti_buf.lb_buf = info->lti_ea_store;
                info->lti_buf.lb_len = info->lti_ea_store_size;
-               rc = dt_xattr_get(env, next, &info->lti_buf, XATTR_NAME_LOV,
-                                 BYPASS_CAPA);
+               rc = dt_xattr_get(env, next, &info->lti_buf, name, BYPASS_CAPA);
        }
        /* if object is not striped or inaccessible */
-       if (rc == -ENODATA)
+       if (rc == -ENODATA || rc == -ENOENT)
                RETURN(0);
 
        if (rc == -ERANGE) {
                /* EA doesn't fit, reallocate new buffer */
-               rc = dt_xattr_get(env, next, &LU_BUF_NULL, XATTR_NAME_LOV,
+               rc = dt_xattr_get(env, next, &LU_BUF_NULL, name,
                                  BYPASS_CAPA);
-               if (rc == -ENODATA)
+               if (rc == -ENODATA || rc == -ENOENT)
                        RETURN(0);
                else if (rc < 0)
                        RETURN(rc);
@@ -634,11 +635,10 @@ int lod_store_def_striping(const struct lu_env *env, struct dt_object *dt,
        struct dt_object        *next = dt_object_child(dt);
        struct lov_user_md_v3   *v3;
        int                      rc;
-       int                      cplen = 0;
        ENTRY;
 
-       LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr));
-
+       if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
+               RETURN(-ENOTDIR);
        /*
         * store striping defaults into new directory
         * used to implement defaults inheritance
@@ -652,32 +652,26 @@ int lod_store_def_striping(const struct lu_env *env, struct dt_object *dt,
                                lo->ldo_def_stripe_offset))
                RETURN(0);
 
-       /* XXX: use thread info */
-       OBD_ALLOC_PTR(v3);
-       if (v3 == NULL)
-               RETURN(-ENOMEM);
-
-       v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3);
-       v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
-       v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
+       v3 = info->lti_ea_store;
+       if (info->lti_ea_store_size < sizeof(*v3)) {
+               rc = lod_ea_store_resize(info, sizeof(*v3));
+               if (rc != 0)
+                       RETURN(rc);
+               v3 = info->lti_ea_store;
+       }
+       memset(v3, 0, sizeof(*v3));
+       v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
        v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
        v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
-       if (lo->ldo_pool) {
-               cplen = strlcpy(v3->lmm_pool_name, lo->ldo_pool,
-                               sizeof(v3->lmm_pool_name));
-               if (cplen >= sizeof(v3->lmm_pool_name)) {
-                       OBD_FREE_PTR(v3);
-                       RETURN(-E2BIG);
-               }
-       }
-
+       v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
+       if (lo->ldo_pool)
+               strncpy(v3->lmm_pool_name, lo->ldo_pool,
+                       sizeof(v3->lmm_pool_name));
        info->lti_buf.lb_buf = v3;
        info->lti_buf.lb_len = sizeof(*v3);
        rc = dt_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, 0, th,
                        BYPASS_CAPA);
 
-       OBD_FREE_PTR(v3);
-
        RETURN(rc);
 }
 
@@ -838,7 +832,7 @@ int lod_load_striping(const struct lu_env *env, struct lod_object *lo)
 {
        struct lod_thread_info  *info = lod_env_info(env);
        struct dt_object        *next = dt_object_child(&lo->ldo_obj);
-       int                      rc;
+       int                      rc = 0;
        ENTRY;
 
        /*
@@ -850,24 +844,34 @@ int lod_load_striping(const struct lu_env *env, struct lod_object *lo)
        if (lo->ldo_stripe != NULL)
                GOTO(out, rc = 0);
 
-       if (!dt_object_exists(next))
+       /* Do not load stripe for slaves of striped dir */
+       if (!dt_object_exists(next) || lo->ldo_dir_slave_stripe)
                GOTO(out, rc = 0);
 
        /* only regular files can be striped */
-       if (!(lu_object_attr(lod2lu_obj(lo)) & S_IFREG))
-               GOTO(out, rc = 0);
-
-       rc = lod_get_lov_ea(env, lo);
-       if (rc <= 0)
-               GOTO(out, rc);
-
-       /*
-        * there is LOV EA (striping information) in this object
-        * let's parse it and create in-core objects for the stripes
-        */
-       info->lti_buf.lb_buf = info->lti_ea_store;
-       info->lti_buf.lb_len = info->lti_ea_store_size;
-       rc = lod_parse_striping(env, lo, &info->lti_buf);
+       if (lu_object_attr(lod2lu_obj(lo)) & S_IFREG) {
+               rc = lod_get_lov_ea(env, lo);
+               if (rc <= 0)
+                       GOTO(out, rc);
+               /*
+                * there is LOV EA (striping information) in this object
+                * let's parse it and create in-core objects for the stripes
+                */
+               info->lti_buf.lb_buf = info->lti_ea_store;
+               info->lti_buf.lb_len = info->lti_ea_store_size;
+               rc = lod_parse_striping(env, lo, &info->lti_buf);
+       } else if (lu_object_attr(lod2lu_obj(lo)) & S_IFDIR) {
+               rc = lod_get_lmv_ea(env, lo);
+               if (rc <= 0)
+                       GOTO(out, rc);
+               /*
+                * there is LOV EA (striping information) in this object
+                * let's parse it and create in-core objects for the stripes
+                */
+               info->lti_buf.lb_buf = info->lti_ea_store;
+               info->lti_buf.lb_len = info->lti_ea_store_size;
+               rc = lod_parse_dir_striping(env, lo, &info->lti_buf);
+       }
 out:
        dt_write_unlock(env, next);
        RETURN(rc);
index afd9373..3f2a5e5 100644 (file)
 #include <lustre_fid.h>
 #include <lustre_param.h>
 #include <lustre_fid.h>
+#include <lustre_lmv.h>
 #include <obd_lov.h>
 
 #include "lod_internal.h"
 
+static const char dot[] = ".";
+static const char dotdot[] = "..";
+
 extern struct kmem_cache *lod_object_kmem;
 static const struct dt_body_operations lod_body_lnk_ops;
 
@@ -295,9 +299,14 @@ static int lod_declare_attr_set(const struct lu_env *env,
         * Therefore we need not load striping unless ownership is
         * changing.  This should save memory and (we hope) speed up
         * rename(). */
-       if (!(attr->la_valid & (LA_UID | LA_GID)))
-               RETURN(rc);
-
+       if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
+               if (!(attr->la_valid & (LA_UID | LA_GID)))
+                       RETURN(rc);
+       } else {
+               if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
+                                       LA_ATIME | LA_MTIME | LA_CTIME)))
+                       RETURN(rc);
+       }
        /*
         * load striping information, notice we don't do this when object
         * is being initialized as we don't need this information till
@@ -307,12 +316,38 @@ static int lod_declare_attr_set(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
+       if (lo->ldo_stripenr == 0)
+               RETURN(0);
+
+       if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) {
+               struct lu_attr   *la = &lod_env_info(env)->lti_attr;
+               bool             setattr_time = false;
+
+               rc = dt_attr_get(env, dt_object_child(dt), la,
+                                BYPASS_CAPA);
+               if (rc != 0)
+                       RETURN(rc);
+
+               /* If it will only setattr time, it will only set
+                * time < current_time */
+               if ((attr->la_valid & LA_ATIME &&
+                    attr->la_atime < la->la_atime) ||
+                   (attr->la_valid & LA_CTIME &&
+                    attr->la_ctime < la->la_ctime) ||
+                   (attr->la_valid & LA_MTIME &&
+                    attr->la_mtime < la->la_mtime))
+                       setattr_time = true;
+
+               if (!setattr_time)
+                       RETURN(0);
+       }
        /*
         * if object is striped declare changes on the stripes
         */
-       LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
+       LASSERT(lo->ldo_stripe);
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
+
                rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
                if (rc) {
                        CERROR("failed declaration: %d\n", rc);
@@ -341,13 +376,45 @@ static int lod_attr_set(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
-       if (!(attr->la_valid & (LA_UID | LA_GID)))
-               RETURN(rc);
+       if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
+               if (!(attr->la_valid & (LA_UID | LA_GID)))
+                       RETURN(rc);
+       } else {
+               if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE |
+                                       LA_ATIME | LA_MTIME | LA_CTIME)))
+                       RETURN(rc);
+       }
+
+       if (lo->ldo_stripenr == 0)
+               RETURN(0);
+
+       if (!(attr->la_valid & ~(LA_ATIME | LA_MTIME | LA_CTIME))) {
+               struct lu_attr   *la = &lod_env_info(env)->lti_attr;
+               bool             setattr_time = false;
+
+               rc = dt_attr_get(env, dt_object_child(dt), la,
+                                BYPASS_CAPA);
+               if (rc != 0)
+                       RETURN(rc);
+
+               /* If it will only setattr time, it will only set
+                * time < current_time */
+               if ((attr->la_valid & LA_ATIME &&
+                    attr->la_atime < la->la_atime) ||
+                   (attr->la_valid & LA_CTIME &&
+                    attr->la_atime < la->la_ctime) ||
+                   (attr->la_valid & LA_MTIME &&
+                    attr->la_atime < la->la_mtime))
+                       setattr_time = true;
+
+               if (!setattr_time)
+                       RETURN(0);
+       }
 
        /*
         * if object is striped, apply changes to all the stripes
         */
-       LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
+       LASSERT(lo->ldo_stripe);
        for (i = 0; i < lo->ldo_stripenr; i++) {
                LASSERT(lo->ldo_stripe[i]);
                rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
@@ -409,6 +476,388 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt,
        RETURN(rc);
 }
 
+static int lod_verify_md_striping(struct lod_device *lod,
+                                 const struct lmv_user_md_v1 *lum)
+{
+       int     rc = 0;
+       ENTRY;
+
+       if (unlikely(le32_to_cpu(lum->lum_magic) != LMV_USER_MAGIC))
+               GOTO(out, rc = -EINVAL);
+
+       if (unlikely(le32_to_cpu(lum->lum_stripe_count) == 0))
+               GOTO(out, rc = -EINVAL);
+
+       if (unlikely(le32_to_cpu(lum->lum_stripe_count) >
+                               lod->lod_remote_mdt_count + 1))
+               GOTO(out, rc = -EINVAL);
+out:
+       if (rc != 0)
+               CERROR("%s: invalid lmv_user_md: magic = %x, "
+                      "stripe_offset = %d, stripe_count = %u: rc = %d\n",
+                      lod2obd(lod)->obd_name, le32_to_cpu(lum->lum_magic),
+                      (int)le32_to_cpu(lum->lum_stripe_offset),
+                      le32_to_cpu(lum->lum_stripe_count), rc);
+       return rc;
+}
+
+int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt,
+                   struct lu_buf *lmv_buf)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lod_object       *lo = lod_dt_obj(dt);
+       struct lmv_mds_md_v1    *lmm1;
+       int                     stripe_count;
+       int                     lmm_size;
+       int                     i;
+       int                     rc;
+       __u32                   mdtidx;
+       ENTRY;
+
+       LASSERT(lo->ldo_dir_striped != 0);
+       LASSERT(lo->ldo_stripenr > 0);
+       stripe_count = lo->ldo_stripenr + 1;
+       lmm_size = lmv_mds_md_size(stripe_count, LMV_MAGIC);
+       if (info->lti_ea_store_size < lmm_size) {
+               rc = lod_ea_store_resize(info, lmm_size);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store;
+       lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC);
+       lmm1->lmv_stripe_count = cpu_to_le32(stripe_count);
+       lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type);
+       rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu),
+                           &mdtidx, LU_SEQ_RANGE_MDT);
+       if (rc != 0)
+               RETURN(rc);
+
+       lmm1->lmv_master_mdt_index = cpu_to_le32(mdtidx);
+       fid_cpu_to_le(&lmm1->lmv_stripe_fids[0], lu_object_fid(&dt->do_lu));
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               struct dt_object *dto;
+
+               dto = lo->ldo_stripe[i];
+               LASSERT(dto != NULL);
+               fid_cpu_to_le(&lmm1->lmv_stripe_fids[i + 1],
+                             lu_object_fid(&dto->do_lu));
+       }
+
+       lmv_buf->lb_buf = info->lti_ea_store;
+       lmv_buf->lb_len = lmm_size;
+       lo->ldo_dir_striping_cached = 1;
+
+       RETURN(rc);
+}
+
+int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo,
+                          const struct lu_buf *buf)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lod_device       *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+       struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
+       struct dt_object        **stripe;
+       union lmv_mds_md        *lmm = buf->lb_buf;
+       struct lmv_mds_md_v1    *lmv1 = &lmm->lmv_md_v1;
+       struct lu_fid           *fid = &info->lti_fid;
+       int                     i;
+       int                     rc = 0;
+       ENTRY;
+
+       if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
+               RETURN(-EINVAL);
+
+       if (le32_to_cpu(lmv1->lmv_stripe_count) <= 1)
+               RETURN(0);
+
+       fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[0]);
+       /* Do not load striping information for slave inode */
+       if (!lu_fid_eq(fid, lu_object_fid(&lo->ldo_obj.do_lu))) {
+               lo->ldo_dir_slave_stripe = 1;
+               RETURN(0);
+       }
+
+       LASSERT(lo->ldo_stripe == NULL);
+       OBD_ALLOC(stripe, sizeof(stripe[0]) *
+                 (le32_to_cpu(lmv1->lmv_stripe_count) - 1));
+       if (stripe == NULL)
+               RETURN(-ENOMEM);
+
+       /* skip master stripe */
+       for (i = 1; i < le32_to_cpu(lmv1->lmv_stripe_count); i++) {
+               struct lod_tgt_desc     *tgt;
+               int                     idx;
+               struct dt_object        *dto;
+
+               fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]);
+               rc = lod_fld_lookup(env, lod, fid,
+                                   &idx, LU_SEQ_RANGE_MDT);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               tgt = LTD_TGT(ltd, idx);
+               if (tgt == NULL)
+                       GOTO(out, rc = -ESTALE);
+
+               dto = dt_locate_at(env, tgt->ltd_tgt, fid,
+                                 lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev,
+                                 NULL);
+               if (IS_ERR(dto))
+                       GOTO(out, rc = PTR_ERR(dto));
+
+               stripe[i - 1] = dto;
+       }
+out:
+       lo->ldo_stripe = stripe;
+       lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count) - 1;
+       lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count) - 1;
+       if (rc != 0)
+               lod_object_free_striping(env, lo);
+
+       RETURN(rc);
+}
+
+static int lod_prep_md_striped_create(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     struct lu_attr *attr,
+                                     const struct lmv_user_md_v1 *lum,
+                                     struct thandle *th)
+{
+       struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lod_tgt_descs    *ltd = &lod->lod_mdt_descs;
+       struct lod_object       *lo = lod_dt_obj(dt);
+       struct dt_object        **stripe;
+       struct lu_buf           lmv_buf;
+       int                     stripe_count;
+       int                     *idx_array;
+       int                     rc = 0;
+       int                     i;
+       int                     j;
+       ENTRY;
+
+       /* The lum has been verifed in lod_verify_md_striping */
+       LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC);
+       LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0);
+
+       /* Do not need allocated master stripe */
+       stripe_count = le32_to_cpu(lum->lum_stripe_count);
+       OBD_ALLOC(stripe, sizeof(stripe[0]) * (stripe_count - 1));
+       if (stripe == NULL)
+               RETURN(-ENOMEM);
+
+       OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count);
+       if (idx_array == NULL)
+               GOTO(out_free, rc = -ENOMEM);
+
+       idx_array[0] = le32_to_cpu(lum->lum_stripe_offset);
+       for (i = 1; i < stripe_count; i++) {
+               struct lod_tgt_desc     *tgt;
+               struct dt_object        *dto;
+               struct lu_fid           fid;
+               int                     idx;
+               struct lu_object_conf   conf = { 0 };
+
+               idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
+
+               for (j = 0; j < lod->lod_remote_mdt_count;
+                    j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
+                       bool already_allocated = false;
+                       int k;
+
+                       CDEBUG(D_INFO, "try idx %d, mdt cnt %d,"
+                              " allocated %d, last allocated %d\n", idx,
+                              lod->lod_remote_mdt_count, i, idx_array[i - 1]);
+
+                       /* Find next avaible target */
+                       if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
+                               continue;
+
+                       /* check whether the idx already exists
+                        * in current allocated array */
+                       for (k = 0; k < i; k++) {
+                               if (idx_array[k] == idx) {
+                                       already_allocated = true;
+                                       break;
+                               }
+                       }
+
+                       if (already_allocated)
+                               continue;
+
+                       break;
+               }
+
+               /* Can not allocate more stripes */
+               if (j == lod->lod_remote_mdt_count) {
+                       CDEBUG(D_INFO, "%s: require stripes %d only get %d\n",
+                              lod2obd(lod)->obd_name, stripe_count, i - 1);
+                       break;
+               }
+
+               CDEBUG(D_INFO, "idx %d, mdt cnt %d,"
+                      " allocated %d, last allocated %d\n", idx,
+                      lod->lod_remote_mdt_count, i, idx_array[i - 1]);
+
+               tgt = LTD_TGT(ltd, idx);
+               LASSERT(tgt != NULL);
+
+               rc = obd_fid_alloc(tgt->ltd_exp, &fid, NULL);
+               if (rc < 0)
+                       GOTO(out_put, rc);
+               rc = 0;
+
+               conf.loc_flags = LOC_F_NEW;
+               dto = dt_locate_at(env, tgt->ltd_tgt, &fid,
+                                 dt->do_lu.lo_dev->ld_site->ls_top_dev, &conf);
+               if (IS_ERR(dto))
+                       GOTO(out_put, rc = PTR_ERR(dto));
+               stripe[i - 1] = dto;
+               idx_array[i] = idx;
+       }
+
+       lo->ldo_dir_striped = 1;
+       lo->ldo_stripe = stripe;
+       lo->ldo_stripenr = i - 1;
+       lo->ldo_stripes_allocated = stripe_count - 1;
+
+       if (lo->ldo_stripenr == 0)
+               GOTO(out_put, rc = -ENOSPC);
+
+       rc = lod_prep_lmv_md(env, dt, &lmv_buf);
+       if (rc != 0)
+               GOTO(out_put, rc);
+
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               struct dt_object *dto;
+
+               dto = stripe[i];
+               /* only create slave striped object */
+               rc = dt_declare_create(env, dto, attr, NULL, NULL, th);
+               if (rc != 0)
+                       GOTO(out_put, rc);
+
+               if (!dt_try_as_dir(env, dto))
+                       GOTO(out_put, rc = -EINVAL);
+
+               rc = dt_declare_insert(env, dto,
+                    (const struct dt_rec *)lu_object_fid(&dto->do_lu),
+                    (const struct dt_key *)dot, th);
+               if (rc != 0)
+                       GOTO(out_put, rc);
+
+               /* master stripe FID will be put to .. */
+               rc = dt_declare_insert(env, dto,
+                    (const struct dt_rec *)lu_object_fid(&dt->do_lu),
+                    (const struct dt_key *)dotdot, th);
+               if (rc != 0)
+                       GOTO(out_put, rc);
+
+               /* probably nothing to inherite */
+               if (lo->ldo_striping_cached &&
+                   !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
+                                        lo->ldo_def_stripenr,
+                                        lo->ldo_def_stripe_offset)) {
+                       struct lod_thread_info  *info;
+                       struct lov_user_md_v3   *v3;
+
+                       /* sigh, lti_ea_store has been used for lmv_buf,
+                        * so we have to allocate buffer for default
+                        * stripe EA */
+                       OBD_ALLOC_PTR(v3);
+                       if (v3 == NULL)
+                               GOTO(out_put, rc = -ENOMEM);
+
+                       memset(v3, 0, sizeof(*v3));
+                       v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
+                       v3->lmm_stripe_count =
+                               cpu_to_le32(lo->ldo_def_stripenr);
+                       v3->lmm_stripe_offset =
+                               cpu_to_le32(lo->ldo_def_stripe_offset);
+                       v3->lmm_stripe_size =
+                               cpu_to_le32(lo->ldo_def_stripe_size);
+                       if (lo->ldo_pool)
+                               strncpy(v3->lmm_pool_name, lo->ldo_pool,
+                                       LOV_MAXPOOLNAME);
+
+                       info = lod_env_info(env);
+                       info->lti_buf.lb_buf = v3;
+                       info->lti_buf.lb_len = sizeof(*v3);
+                       rc = dt_declare_xattr_set(env, dto,
+                                                 &info->lti_buf,
+                                                 XATTR_NAME_LOV,
+                                                 0, th);
+                       OBD_FREE_PTR(v3);
+                       if (rc != 0)
+                               GOTO(out_put, rc);
+               }
+               rc = dt_declare_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, 0,
+                                         th);
+               if (rc != 0)
+                       GOTO(out_put, rc);
+       }
+
+       rc = dt_declare_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, 0, th);
+       if (rc != 0)
+               GOTO(out_put, rc);
+
+out_put:
+       if (rc < 0) {
+               for (i = 0; i < stripe_count - 1; i++)
+                       if (stripe[i] != NULL)
+                               lu_object_put(env, &stripe[i]->do_lu);
+               OBD_FREE(stripe, sizeof(stripe[0]) * (stripe_count - 1));
+       }
+
+out_free:
+       if (idx_array != NULL)
+               OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count);
+
+       RETURN(rc);
+}
+
+/**
+ * Declare create striped md object.
+ */
+static int lod_declare_xattr_set_lmv(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct lu_attr *attr,
+                                    const struct lu_buf *lum_buf,
+                                    struct thandle *th)
+{
+       struct lod_object       *lo = lod_dt_obj(dt);
+       struct lod_device       *lod = lu2lod_dev(dt->do_lu.lo_dev);
+       struct lmv_user_md_v1   *lum;
+       int                     rc;
+       ENTRY;
+
+       lum = lum_buf->lb_buf;
+       LASSERT(lum != NULL);
+
+       CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n",
+              le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count),
+              (int)le32_to_cpu(lum->lum_stripe_offset));
+
+       if (le32_to_cpu(lum->lum_stripe_count) <= 1)
+               GOTO(out, rc = 0);
+
+       rc = lod_verify_md_striping(lod, lum);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       /* prepare dir striped objects */
+       rc = lod_prep_md_striped_create(env, dt, attr, lum, th);
+       if (rc != 0) {
+               /* failed to create striping, let's reset
+                * config so that others don't get confused */
+               lod_object_free_striping(env, lo);
+               GOTO(out, rc);
+       }
+out:
+       RETURN(rc);
+}
+
 /*
  * LOV xattr is a storage for striping, and LOD owns this xattr.
  * but LOD allows others to control striping to some extent
@@ -438,7 +887,7 @@ static int lod_declare_xattr_set(const struct lu_env *env,
         * LU_XATTR_REPLACE is set to indicate a layout swap
         */
        mode = dt->do_lu.lo_header->loh_attr & S_IFMT;
-       if ((S_ISREG(mode) || !mode) && !strcmp(name, XATTR_NAME_LOV) &&
+       if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 &&
             !(fl & LU_XATTR_REPLACE)) {
                /*
                 * this is a request to manipulate object's striping
@@ -455,13 +904,22 @@ static int lod_declare_xattr_set(const struct lu_env *env,
                rc = lod_declare_striped_object(env, dt, attr, buf, th);
                if (rc)
                        RETURN(rc);
+       } else {
+               rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
        }
 
-       rc = dt_declare_xattr_set(env, next, buf, name, fl, th);
-
        RETURN(rc);
 }
 
+static void lod_lov_stripe_cache_clear(struct lod_object *lo)
+{
+       lo->ldo_striping_cached = 0;
+       lo->ldo_def_striping_set = 0;
+       lod_object_set_pool(lo, NULL);
+       lo->ldo_def_stripe_size = 0;
+       lo->ldo_def_stripenr = 0;
+}
+
 static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
                                    struct dt_object *dt,
                                    const struct lu_buf *buf,
@@ -477,13 +935,11 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
        int                      rc;
        ENTRY;
 
-       LASSERT(l->ldo_stripe == NULL);
-       l->ldo_striping_cached = 0;
-       l->ldo_def_striping_set = 0;
-       lod_object_set_pool(l, NULL);
-       l->ldo_def_stripe_size = 0;
-       l->ldo_def_stripenr = 0;
-
+       /* If it is striped dir, we should clear the stripe cache for
+        * slave stripe as well, but there are no effective way to
+        * notify the LOD on the slave MDT, so we do not cache stripe
+        * information for slave stripe for now. XXX*/
+       lod_lov_stripe_cache_clear(l);
        LASSERT(buf != NULL && buf->lb_buf != NULL);
        lum = buf->lb_buf;
 
@@ -518,6 +974,98 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env,
        RETURN(rc);
 }
 
+static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
+                            const struct lu_buf *buf, const char *name,
+                            int fl, struct thandle *th,
+                            struct lustre_capa *capa)
+{
+       struct lod_object       *lo = lod_dt_obj(dt);
+       struct lu_buf           lmv_buf;
+       int                     i;
+       int                     rc;
+       ENTRY;
+
+       if (!S_ISDIR(dt->do_lu.lo_header->loh_attr))
+               RETURN(-ENOTDIR);
+
+       /* The stripes are supposed to be allocated in declare phase,
+        * if there are no stripes being allocated, it will skip */
+       if (lo->ldo_stripenr == 0)
+               RETURN(0);
+
+       rc = lod_prep_lmv_md(env, dt, &lmv_buf);
+       if (rc != 0)
+               RETURN(rc);
+
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               struct dt_object *dto;
+               struct lu_attr  *attr = &lod_env_info(env)->lti_attr;
+
+               dto = lo->ldo_stripe[i];
+               memset(attr, 0, sizeof(*attr));
+               attr->la_valid = LA_TYPE | LA_MODE;
+               attr->la_mode = S_IFDIR;
+               rc = dt_create(env, dto, attr, NULL, NULL, th);
+               if (rc != 0)
+                       RETURN(rc);
+
+               rc = dt_insert(env, dto,
+                             (const struct dt_rec *)lu_object_fid(&dto->do_lu),
+                             (const struct dt_key *)dot, th, capa, 0);
+               if (rc != 0)
+                       RETURN(rc);
+
+               rc = dt_insert(env, dto,
+                             (struct dt_rec *)lu_object_fid(&dt->do_lu),
+                             (const struct dt_key *)dotdot, th, capa, 0);
+               if (rc != 0)
+                       RETURN(rc);
+
+               if (lo->ldo_striping_cached &&
+                   !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
+                                        lo->ldo_def_stripenr,
+                                        lo->ldo_def_stripe_offset)) {
+                       struct lod_thread_info  *info;
+                       struct lov_user_md_v3   *v3;
+
+                       /* sigh, lti_ea_store has been used for lmv_buf,
+                        * so we have to allocate buffer for default
+                        * stripe EA */
+                       OBD_ALLOC_PTR(v3);
+                       if (v3 == NULL)
+                               RETURN(-ENOMEM);
+
+                       memset(v3, 0, sizeof(*v3));
+                       v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
+                       v3->lmm_stripe_count =
+                               cpu_to_le32(lo->ldo_def_stripenr);
+                       v3->lmm_stripe_offset =
+                               cpu_to_le32(lo->ldo_def_stripe_offset);
+                       v3->lmm_stripe_size =
+                               cpu_to_le32(lo->ldo_def_stripe_size);
+                       if (lo->ldo_pool)
+                               strncpy(v3->lmm_pool_name, lo->ldo_pool,
+                                       LOV_MAXPOOLNAME);
+
+                       info = lod_env_info(env);
+                       info->lti_buf.lb_buf = v3;
+                       info->lti_buf.lb_len = sizeof(*v3);
+                       rc = dt_xattr_set(env, dto, &info->lti_buf,
+                                         XATTR_NAME_LOV, 0, th, capa);
+                       OBD_FREE_PTR(v3);
+                       if (rc != 0)
+                               RETURN(rc);
+               }
+
+               rc = dt_xattr_set(env, dto, &lmv_buf, XATTR_NAME_LMV, fl, th,
+                                 capa);
+       }
+
+       rc = dt_xattr_set(env, dt, &lmv_buf, XATTR_NAME_LMV, fl, th, capa);
+
+       RETURN(rc);
+}
+
 static int lod_xattr_set(const struct lu_env *env,
                         struct dt_object *dt, const struct lu_buf *buf,
                         const char *name, int fl, struct thandle *th,
@@ -529,13 +1077,8 @@ static int lod_xattr_set(const struct lu_env *env,
        ENTRY;
 
        attr = dt->do_lu.lo_header->loh_attr & S_IFMT;
-       if (S_ISDIR(attr)) {
-               if (strcmp(name, XATTR_NAME_LOV) == 0)
-                       rc = lod_xattr_set_lov_on_dir(env, dt, buf, name,
-                                                     fl, th, capa);
-               else
-                       rc = dt_xattr_set(env, next, buf, name, fl, th, capa);
-
+       if (S_ISDIR(attr) && strcmp(name, XATTR_NAME_LOV) == 0) {
+               rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th, capa);
        } else if (S_ISREG(attr) && !strcmp(name, XATTR_NAME_LOV)) {
                /* in case of lov EA swap, just set it
                 * if not, it is a replay so check striping match what we
@@ -607,20 +1150,19 @@ static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fi
        return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE);
 }
 
-static int lod_cache_parent_striping(const struct lu_env *env,
-                                    struct lod_object *lp)
+
+static int lod_cache_parent_lov_striping(const struct lu_env *env,
+                                        struct lod_object *lp)
 {
+       struct lod_thread_info  *info = lod_env_info(env);
        struct lov_user_md_v1   *v1 = NULL;
        struct lov_user_md_v3   *v3 = NULL;
        int                      rc;
        ENTRY;
 
-       /* dt_ah_init() is called from MDD without parent being write locked
+       /* called from MDD without parent being write locked,
         * lock it here */
        dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
-       if (lp->ldo_striping_cached)
-               GOTO(unlock, rc = 0);
-
        rc = lod_get_lov_ea(env, lp);
        if (rc < 0)
                GOTO(unlock, rc);
@@ -635,7 +1177,8 @@ static int lod_cache_parent_striping(const struct lu_env *env,
                GOTO(unlock, rc = 0);
        }
 
-       v1 = (struct lov_user_md_v1 *)lod_env_info(env)->lti_ea_store;
+       rc = 0;
+       v1 = info->lti_ea_store;
        if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1))
                lustre_swab_lov_user_md_v1(v1);
        else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3))
@@ -652,18 +1195,53 @@ static int lod_cache_parent_striping(const struct lu_env *env,
        lp->ldo_def_stripe_offset = v1->lmm_stripe_offset;
        lp->ldo_striping_cached = 1;
        lp->ldo_def_striping_set = 1;
-
        if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
                /* XXX: sanity check here */
                v3 = (struct lov_user_md_v3 *) v1;
                if (v3->lmm_pool_name[0])
                        lod_object_set_pool(lp, v3->lmm_pool_name);
        }
+       EXIT;
+unlock:
+       dt_write_unlock(env, dt_object_child(&lp->ldo_obj));
+       return rc;
+}
+
+
+static int lod_cache_parent_lmv_striping(const struct lu_env *env,
+                                        struct lod_object *lp)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lmv_user_md_v1   *v1 = NULL;
+       int                      rc;
+       ENTRY;
+
+       /* called from MDD without parent being write locked,
+        * lock it here */
+       dt_write_lock(env, dt_object_child(&lp->ldo_obj), 0);
+       rc = lod_get_default_lmv_ea(env, lp);
+       if (rc < 0)
+               GOTO(unlock, rc);
+
+       if (rc < sizeof(struct lmv_user_md)) {
+               /* don't lookup for non-existing or invalid striping */
+               lp->ldo_dir_def_striping_set = 0;
+               lp->ldo_dir_striping_cached = 1;
+               lp->ldo_dir_def_stripenr = 0;
+               lp->ldo_dir_def_stripe_offset =
+                                       (typeof(v1->lum_stripe_offset))(-1);
+               lp->ldo_dir_def_hash_type = LMV_HASH_TYPE_FNV_1A_64;
+               GOTO(unlock, rc = 0);
+       }
+
+       rc = 0;
+       v1 = info->lti_ea_store;
 
-       CDEBUG(D_OTHER, "def. striping: # %d, sz %d, off %d %s%s on "DFID"\n",
-              lp->ldo_def_stripenr, lp->ldo_def_stripe_size,
-              lp->ldo_def_stripe_offset, v3 ? "from " : "",
-              v3 ? lp->ldo_pool : "", PFID(lu_object_fid(&lp->ldo_obj.do_lu)));
+       lp->ldo_dir_def_stripenr = le32_to_cpu(v1->lum_stripe_count) - 1;
+       lp->ldo_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset);
+       lp->ldo_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type);
+       lp->ldo_dir_def_striping_set = 1;
+       lp->ldo_dir_striping_cached = 1;
 
        EXIT;
 unlock:
@@ -671,6 +1249,31 @@ unlock:
        return rc;
 }
 
+static int lod_cache_parent_striping(const struct lu_env *env,
+                                    struct lod_object *lp,
+                                    umode_t child_mode)
+{
+       int rc = 0;
+       ENTRY;
+
+       rc = lod_load_striping(env, lp);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (!lp->ldo_striping_cached) {
+               /* we haven't tried to get default striping for
+                * the directory yet, let's cache it in the object */
+               rc = lod_cache_parent_lov_striping(env, lp);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       if (S_ISDIR(child_mode) && !lp->ldo_dir_striping_cached)
+               rc = lod_cache_parent_lmv_striping(env, lp);
+
+       RETURN(rc);
+}
+
 /**
  * used to transfer default striping data to the object being created
  */
@@ -711,11 +1314,24 @@ static void lod_ah_init(const struct lu_env *env,
                                          NULL : nextp, nextc, child_mode);
 
        if (S_ISDIR(child_mode)) {
-               if (lp->ldo_striping_cached == 0) {
-                       /* we haven't tried to get default striping for
-                        * the directory yet, let's cache it in the object */
-                       lod_cache_parent_striping(env, lp);
+               int rc;
+
+               if (lc->ldo_dir_stripe == NULL) {
+                       OBD_ALLOC_PTR(lc->ldo_dir_stripe);
+                       if (lc->ldo_dir_stripe == NULL)
+                               return;
                }
+
+               if (lp->ldo_dir_stripe == NULL) {
+                       OBD_ALLOC_PTR(lp->ldo_dir_stripe);
+                       if (lp->ldo_dir_stripe == NULL)
+                               return;
+               }
+
+               rc = lod_cache_parent_striping(env, lp, child_mode);
+               if (rc != 0)
+                       return;
+
                /* transfer defaults to new directory */
                if (lp->ldo_striping_cached) {
                        if (lp->ldo_pool)
@@ -726,11 +1342,66 @@ static void lod_ah_init(const struct lu_env *env,
                        lc->ldo_striping_cached = 1;
                        lc->ldo_def_striping_set = 1;
                        CDEBUG(D_OTHER, "inherite EA sz:%d off:%d nr:%d\n",
-                              (int)lc->ldo_def_stripenr,
                               (int)lc->ldo_def_stripe_size,
-                              (int)lc->ldo_def_stripe_offset);
+                              (int)lc->ldo_def_stripe_offset,
+                              (int)lc->ldo_def_stripenr);
                }
-               return;
+
+               /* transfer dir defaults to new directory */
+               if (lp->ldo_dir_striping_cached) {
+                       lc->ldo_dir_def_stripenr = lp->ldo_dir_def_stripenr;
+                       lc->ldo_dir_def_stripe_offset =
+                                                 lp->ldo_dir_def_stripe_offset;
+                       lc->ldo_dir_def_hash_type =
+                                                 lp->ldo_dir_def_hash_type;
+                       lc->ldo_dir_striping_cached = 1;
+                       lc->ldo_dir_def_striping_set = 1;
+                       CDEBUG(D_INFO, "inherit default EA nr:%d off:%d t%u\n",
+                              (int)lc->ldo_dir_def_stripenr,
+                              (int)lc->ldo_dir_def_stripe_offset,
+                              lc->ldo_dir_def_hash_type);
+               }
+
+               /* If the directory is specified with certain stripes */
+               if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0) {
+                       const struct lmv_user_md_v1 *lum1 = ah->dah_eadata;
+                       int rc;
+
+                       rc = lod_verify_md_striping(d, lum1);
+                       if (rc == 0 &&
+                               le32_to_cpu(lum1->lum_stripe_count) > 1) {
+                               /* Directory will be striped only if
+                                * stripe_count > 1 */
+                               lc->ldo_stripenr =
+                                       le32_to_cpu(lum1->lum_stripe_count) - 1;
+                               lc->ldo_dir_stripe_offset =
+                                       le32_to_cpu(lum1->lum_stripe_offset);
+                               lc->ldo_dir_hash_type =
+                                       le32_to_cpu(lum1->lum_hash_type);
+                               CDEBUG(D_INFO, "set stripe EA nr:%hu off:%d\n",
+                                      lc->ldo_stripenr,
+                                      (int)lc->ldo_dir_stripe_offset);
+                       }
+               } else if (lp->ldo_dir_def_striping_set) {
+                       /* If there are default dir stripe from parent */
+                       lc->ldo_stripenr = lp->ldo_dir_def_stripenr;
+                       lc->ldo_dir_stripe_offset =
+                                       lp->ldo_dir_def_stripe_offset;
+                       lc->ldo_dir_hash_type =
+                                       lp->ldo_dir_def_hash_type;
+                       CDEBUG(D_INFO, "inherit EA nr:%hu off:%d\n",
+                              lc->ldo_stripenr,
+                              (int)lc->ldo_dir_stripe_offset);
+               } else {
+                       /* set default stripe for this directory */
+                       lc->ldo_stripenr = 0;
+                       lc->ldo_dir_stripe_offset = -1;
+               }
+
+               CDEBUG(D_INFO, "final striping count:%hu, offset:%d\n",
+                      lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset);
+
+               goto out;
        }
 
        /*
@@ -740,17 +1411,12 @@ static void lod_ah_init(const struct lu_env *env,
         */
        if (!lod_object_will_be_striped(S_ISREG(child_mode),
                                        lu_object_fid(&child->do_lu)))
-               return;
-
+               goto out;
        /*
         * try from the parent
         */
        if (likely(parent)) {
-               if (lp->ldo_striping_cached == 0) {
-                       /* we haven't tried to get default striping for
-                        * the directory yet, let's cache it in the object */
-                       lod_cache_parent_striping(env, lp);
-               }
+               lod_cache_parent_striping(env, lp, child_mode);
 
                lc->ldo_def_stripe_offset = (__u16) -1;
 
@@ -778,6 +1444,12 @@ static void lod_ah_init(const struct lu_env *env,
               lc->ldo_stripenr, lc->ldo_stripe_size,
               lc->ldo_pool ? lc->ldo_pool : "");
 
+out:
+       /* we do not cache stripe information for slave stripe, see
+        * lod_xattr_set_lov_on_dir */
+       if (lp != NULL && lp->ldo_dir_slave_stripe)
+               lod_lov_stripe_cache_clear(lp);
+
        EXIT;
 }
 
@@ -827,7 +1499,6 @@ static int lod_declare_init_size(const struct lu_env *env,
        RETURN(rc);
 }
 
-
 /**
  * Create declaration of striped object
  */
@@ -879,6 +1550,146 @@ out:
        RETURN(rc);
 }
 
+int lod_dir_striping_create_internal(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct lu_attr *attr,
+                                    const struct dt_object_format *dof,
+                                    struct thandle *th,
+                                    bool declare)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct dt_object        *next = dt_object_child(dt);
+       struct lod_object       *lo = lod_dt_obj(dt);
+       int                     rc;
+       ENTRY;
+
+       if (lo->ldo_dir_def_striping_set &&
+           !LMVEA_DELETE_VALUES(lo->ldo_stripenr,
+                                lo->ldo_dir_stripe_offset)) {
+               struct lmv_user_md_v1 *v1 = info->lti_ea_store;
+               int stripe_count = lo->ldo_stripenr + 1;
+
+               if (info->lti_ea_store_size < sizeof(*v1)) {
+                       rc = lod_ea_store_resize(info, sizeof(*v1));
+                       if (rc != 0)
+                               RETURN(rc);
+                       v1 = info->lti_ea_store;
+               }
+
+               memset(v1, 0, sizeof(*v1));
+               v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
+               v1->lum_stripe_count = cpu_to_le32(stripe_count);
+               v1->lum_stripe_offset =
+                               cpu_to_le32(lo->ldo_dir_stripe_offset);
+
+               info->lti_buf.lb_buf = v1;
+               info->lti_buf.lb_len = sizeof(*v1);
+
+               if (declare)
+                       rc = lod_declare_xattr_set_lmv(env, dt, attr,
+                                                      &info->lti_buf, th);
+               else
+                       rc = lod_xattr_set_lmv(env, dt, &info->lti_buf,
+                                              XATTR_NAME_LMV, 0, th,
+                                              BYPASS_CAPA);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       /* Transfer default LMV striping from the parent */
+       if (lo->ldo_dir_striping_cached &&
+           !LMVEA_DELETE_VALUES(lo->ldo_dir_def_stripenr,
+                                lo->ldo_dir_def_stripe_offset)) {
+               struct lmv_user_md_v1 *v1 = info->lti_ea_store;
+               int def_stripe_count = lo->ldo_dir_def_stripenr + 1;
+
+               if (info->lti_ea_store_size < sizeof(*v1)) {
+                       rc = lod_ea_store_resize(info, sizeof(*v1));
+                       if (rc != 0)
+                               RETURN(rc);
+                       v1 = info->lti_ea_store;
+               }
+
+               memset(v1, 0, sizeof(*v1));
+               v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
+               v1->lum_stripe_count = cpu_to_le32(def_stripe_count);
+               v1->lum_stripe_offset =
+                               cpu_to_le32(lo->ldo_dir_def_stripe_offset);
+               v1->lum_hash_type =
+                               cpu_to_le32(lo->ldo_dir_def_hash_type);
+
+               info->lti_buf.lb_buf = v1;
+               info->lti_buf.lb_len = sizeof(*v1);
+               if (declare)
+                       rc = dt_declare_xattr_set(env, next, &info->lti_buf,
+                                                 XATTR_NAME_DEFALT_LMV, 0,
+                                                 th);
+               else
+                       rc = dt_xattr_set(env, next, &info->lti_buf,
+                                          XATTR_NAME_DEFALT_LMV, 0, th,
+                                          BYPASS_CAPA);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       /* Transfer default LOV striping from the parent */
+       if (lo->ldo_striping_cached &&
+           !LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
+                                lo->ldo_def_stripenr,
+                                lo->ldo_def_stripe_offset)) {
+               struct lov_user_md_v3 *v3 = info->lti_ea_store;
+
+               if (info->lti_ea_store_size < sizeof(*v3)) {
+                       rc = lod_ea_store_resize(info, sizeof(*v3));
+                       if (rc != 0)
+                               RETURN(rc);
+                       v3 = info->lti_ea_store;
+               }
+
+               memset(v3, 0, sizeof(*v3));
+               v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3);
+               v3->lmm_stripe_count = cpu_to_le16(lo->ldo_def_stripenr);
+               v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
+               v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
+               if (lo->ldo_pool)
+                       strncpy(v3->lmm_pool_name, lo->ldo_pool,
+                               LOV_MAXPOOLNAME);
+
+               info->lti_buf.lb_buf = v3;
+               info->lti_buf.lb_len = sizeof(*v3);
+
+               if (declare)
+                       rc = dt_declare_xattr_set(env, next, &info->lti_buf,
+                                                 XATTR_NAME_LOV, 0, th);
+               else
+                       rc = dt_xattr_set(env, next, &info->lti_buf,
+                                         XATTR_NAME_LOV, 0, th,
+                                         BYPASS_CAPA);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       RETURN(0);
+}
+
+static int lod_declare_dir_striping_create(const struct lu_env *env,
+                                          struct dt_object *dt,
+                                          struct lu_attr *attr,
+                                          struct dt_object_format *dof,
+                                          struct thandle *th)
+{
+       return lod_dir_striping_create_internal(env, dt, attr, dof, th, true);
+}
+
+static int lod_dir_striping_create(const struct lu_env *env,
+                                  struct dt_object *dt,
+                                  struct lu_attr *attr,
+                                  struct dt_object_format *dof,
+                                  struct thandle *th)
+{
+       return lod_dir_striping_create_internal(env, dt, attr, dof, th, false);
+}
+
 static int lod_declare_object_create(const struct lu_env *env,
                                     struct dt_object *dt,
                                     struct lu_attr *attr,
@@ -918,40 +1729,9 @@ static int lod_declare_object_create(const struct lu_env *env,
                if (lo->ldo_stripenr > 0)
                        rc = lod_declare_striped_object(env, dt, attr,
                                                        NULL, th);
-       } else if (dof->dof_type == DFT_DIR && lo->ldo_striping_cached) {
-               struct lod_thread_info *info = lod_env_info(env);
-
-               struct lov_user_md_v3 *v3;
-
-               if (LOVEA_DELETE_VALUES(lo->ldo_def_stripe_size,
-                                       lo->ldo_def_stripenr,
-                                       lo->ldo_def_stripe_offset))
-                       RETURN(0);
-
-               OBD_ALLOC_PTR(v3);
-               if (v3 == NULL)
-                       RETURN(-ENOMEM);
-
-               v3->lmm_magic = cpu_to_le32(LOV_MAGIC_V3);
-               v3->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0);
-               fid_to_lmm_oi(lu_object_fid(&dt->do_lu), &v3->lmm_oi);
-               lmm_oi_cpu_to_le(&v3->lmm_oi, &v3->lmm_oi);
-               v3->lmm_stripe_size = cpu_to_le32(lo->ldo_def_stripe_size);
-               v3->lmm_stripe_count = cpu_to_le32(lo->ldo_def_stripenr);
-               v3->lmm_stripe_offset = cpu_to_le16(lo->ldo_def_stripe_offset);
-               if (lo->ldo_pool)
-                       strncpy(v3->lmm_pool_name, lo->ldo_pool,
-                               LOV_MAXPOOLNAME);
-
-               info->lti_buf.lb_buf = v3;
-               info->lti_buf.lb_len = sizeof(*v3);
-
-               /* to transfer default striping from the parent */
-               rc = dt_declare_xattr_set(env, next, &info->lti_buf,
-                                         XATTR_NAME_LOV, 0, th);
-               OBD_FREE_PTR(v3);
+       } else if (dof->dof_type == DFT_DIR) {
+               rc = lod_declare_dir_striping_create(env, dt, attr, dof, th);
        }
-
 out:
        RETURN(rc);
 }
@@ -995,8 +1775,8 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt,
 
        if (rc == 0) {
                if (S_ISDIR(dt->do_lu.lo_header->loh_attr))
-                       rc = lod_store_def_striping(env, dt, th);
-               else if (lo->ldo_stripe)
+                       rc = lod_dir_striping_create(env, dt, attr, dof, th);
+               else if (lo->ldo_stripe && dof->u.dof_reg.striped != 0)
                        rc = lod_striping_create(env, dt, attr, dof, th);
        }
 
@@ -1221,6 +2001,11 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
 {
        int i;
 
+       if (lo->ldo_dir_stripe != NULL) {
+               OBD_FREE_PTR(lo->ldo_dir_stripe);
+               lo->ldo_dir_stripe = NULL;
+       }
+
        if (lo->ldo_stripe) {
                LASSERT(lo->ldo_stripes_allocated > 0);
 
index 269e4ca..b4b8d93 100644 (file)
@@ -624,8 +624,6 @@ static int inline lod_qos_dev_is_full(struct obd_statfs *msfs)
        return (msfs->os_bavail < used);
 }
 
-int lod_ea_store_resize(struct lod_thread_info *info, int size);
-
 static inline int lod_qos_ost_in_use_clear(const struct lu_env *env, int stripes)
 {
        struct lod_thread_info *info = lod_env_info(env);
index 8920c6c..e0cb962 100644 (file)
@@ -366,6 +366,9 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
                       lmmsize);
 
+       req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                            obddev->u.cli.cl_max_mds_easize);
+
         /* for remote client, fetch remote perm for current user */
         if (client_is_remote(exp))
                 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
index 37ece1f..b993174 100644 (file)
@@ -1664,7 +1664,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
                RETURN(rc);
 
        /* calling ->ah_make_hint() is used to transfer information from parent */
-       mdd_object_make_hint(env, mdd_pobj, son, attr);
+       mdd_object_make_hint(env, mdd_pobj, son, attr, spec);
 
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
@@ -1678,8 +1678,7 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
               spec->u.sp_ea.eadata, spec->u.sp_ea.eadatalen,
               spec->sp_cr_flags, spec->no_create);
 
-       if (spec->no_create || spec->sp_cr_flags & MDS_OPEN_HAS_EA) {
-               /* replay case or lfs setstripe */
+       if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
                buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
                                        spec->u.sp_ea.eadatalen);
        } else {
@@ -1925,13 +1924,14 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                GOTO(out, rc);
 
        /* replay case, create LOV EA from client data */
-       if (spec->no_create || (spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
+       if (spec->no_create ||
+           (spec->sp_cr_flags & MDS_OPEN_HAS_EA && S_ISREG(attr->la_mode))) {
                const struct lu_buf *buf;
 
                buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
                                        spec->u.sp_ea.eadatalen);
-               rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV,
-                                          0, handle);
+               rc = mdo_declare_xattr_set(env, c, buf, XATTR_NAME_LOV, 0,
+                                          handle);
                if (rc)
                        GOTO(out, rc);
        }
@@ -1958,8 +1958,14 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
         if (rc)
                 return rc;
 
+       /* XXX: For remote create, it should indicate the remote RPC
+        * will be sent after local transaction is finished, which
+        * is not very nice, but it will be removed once we fully support
+        * async update */
+       if (mdd_object_remote(p) && handle->th_update != NULL)
+               handle->th_update->tu_sent_after_local_trans = 1;
 out:
-        return rc;
+       return rc;
 }
 
 static int mdd_acl_init(const struct lu_env *env, struct mdd_object *pobj,
@@ -2082,7 +2088,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        if (rc < 0)
                GOTO(out_free, rc);
 
-       mdd_object_make_hint(env, mdd_pobj, son, attr);
+       mdd_object_make_hint(env, mdd_pobj, son, attr, spec);
 
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
@@ -2143,13 +2149,14 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
         *      probably this way we code can be made better.
         */
        if (rc == 0 && (spec->no_create ||
-                       (spec->sp_cr_flags & MDS_OPEN_HAS_EA))) {
+                       (spec->sp_cr_flags & MDS_OPEN_HAS_EA &&
+                        S_ISREG(attr->la_mode)))) {
                const struct lu_buf *buf;
 
                buf = mdd_buf_get_const(env, spec->u.sp_ea.eadata,
                                spec->u.sp_ea.eadatalen);
                rc = mdo_xattr_set(env, son, buf, XATTR_NAME_LOV, 0, handle,
-                               BYPASS_CAPA);
+                                  BYPASS_CAPA);
        }
 
        if (rc == 0 && spec->sp_cr_flags & MDS_OPEN_VOLATILE)
index daa1dcb..0bfb2a2 100644 (file)
@@ -139,6 +139,8 @@ struct mdd_thread_info {
        struct lu_attr            mti_tattr;
        /** used to set c/mtime */
        struct lu_attr            mti_la_for_fix;
+       /* Only used in mdd_object_start */
+       struct lu_attr            mti_la_for_start;
        struct md_attr            mti_ma;
        struct obd_info           mti_oi;
        /* mti_ent and mti_key must be conjoint,
@@ -390,7 +392,8 @@ int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
                     struct mdd_object *obj, struct lu_attr *la);
 
 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
-                         struct mdd_object *child, struct lu_attr *attr);
+                         struct mdd_object *child, const struct lu_attr *attr,
+                         const struct md_op_spec *spec);
 
 static inline void mdd_object_get(struct mdd_object *o)
 {
index a959dea..b27306f 100644 (file)
@@ -189,7 +189,7 @@ static int mdd_object_start(const struct lu_env *env, struct lu_object *o)
 
        if (lu_object_exists(o)) {
                struct mdd_object *mdd_obj = lu2mdd_obj(o);
-               struct lu_attr *attr = MDD_ENV_VAR(env, cattr);
+               struct lu_attr *attr = MDD_ENV_VAR(env, la_for_start);
 
                rc = mdd_la_get(env, mdd_obj, attr, BYPASS_CAPA);
                if (rc == 0)
@@ -1559,12 +1559,28 @@ stop:
 }
 
 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
-               struct mdd_object *child, struct lu_attr *attr)
+                         struct mdd_object *child, const struct lu_attr *attr,
+                         const struct md_op_spec *spec)
 {
        struct dt_allocation_hint *hint = &mdd_env_info(env)->mti_hint;
-       struct dt_object *np = parent ? mdd_object_child(parent) : NULL;
+       struct dt_object *np = parent ?  mdd_object_child(parent) : NULL;
        struct dt_object *nc = mdd_object_child(child);
 
+       memset(hint, 0, sizeof(*hint));
+
+       /* For striped directory, give striping EA to lod_ah_init, which will
+        * decide the stripe_offset and stripe count by it. */
+       if (S_ISDIR(attr->la_mode) &&
+           unlikely(spec != NULL && spec->sp_cr_flags & MDS_OPEN_HAS_EA)) {
+               hint->dah_eadata = spec->u.sp_ea.eadata;
+               hint->dah_eadata_len = spec->u.sp_ea.eadatalen;
+       } else {
+               hint->dah_eadata = NULL;
+               hint->dah_eadata_len = 0;
+       }
+
+       CDEBUG(D_INFO, DFID" eadata %p, len %d\n", PFID(mdd_object_fid(child)),
+              hint->dah_eadata, hint->dah_eadata_len);
        /* @hint will be initialized by underlying device. */
        nc->do_ops->do_ah_init(env, hint, np, nc, attr->la_mode & S_IFMT);
 }
index b71cafe..8c2b74e 100644 (file)
@@ -63,6 +63,6 @@ int mdd_trans_start(const struct lu_env *env, struct mdd_device *mdd,
 void mdd_trans_stop(const struct lu_env *env, struct mdd_device *mdd,
                     int result, struct thandle *handle)
 {
-        handle->th_result = result;
-        mdd_child_ops(mdd)->dt_trans_stop(env, handle);
+       handle->th_result = result;
+       mdd_child_ops(mdd)->dt_trans_stop(env, mdd->mdd_child, handle);
 }
index b46fab4..22326e6 100644 (file)
@@ -531,6 +531,7 @@ int mdt_attr_get_lov(struct mdt_thread_info *info,
        buf->lb_buf = ma->ma_lmm;
        buf->lb_len = ma->ma_lmm_size;
        rc = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_LOV);
+
        if (rc > 0) {
                ma->ma_lmm_size = rc;
                ma->ma_valid |= MA_LOV;
@@ -754,7 +755,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                ma->ma_lmv = buffer->lb_buf;
                ma->ma_lmv_size = buffer->lb_len;
                ma->ma_need = MA_INODE;
-               if (ma->ma_lmm_size > 0)
+               if (ma->ma_lmv_size > 0)
                        ma->ma_need |= MA_LMV;
        } else {
                ma->ma_lmm = buffer->lb_buf;
@@ -2671,7 +2672,6 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_opdata = 0;
        info->mti_big_lmm_used = 0;
 
-        /* To not check for split by default. */
         info->mti_spec.no_create = 0;
        info->mti_spec.sp_rm_entry = 0;
 }
index aa4c2cb..08d4809 100644 (file)
@@ -648,13 +648,21 @@ int mdt_fix_reply(struct mdt_thread_info *info)
                         /* don't return transno along with error */
                         lustre_msg_set_transno(pill->rc_req->rq_repmsg, 0);
                 } else {
-                        /* now we need to pack right LOV EA */
-                        lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
-                        LASSERT(req_capsule_get_size(pill, &RMF_MDT_MD,
-                                                     RCL_SERVER) ==
-                                info->mti_attr.ma_lmm_size);
-                        memcpy(lmm, info->mti_attr.ma_lmm,
-                               info->mti_attr.ma_lmm_size);
+                       /* now we need to pack right LOV/LMV EA */
+                       lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
+                       if (info->mti_attr.ma_valid & MA_LOV) {
+                               LASSERT(req_capsule_get_size(pill, &RMF_MDT_MD,
+                                                            RCL_SERVER) ==
+                                               info->mti_attr.ma_lmm_size);
+                               memcpy(lmm, info->mti_attr.ma_lmm,
+                                      info->mti_attr.ma_lmm_size);
+                       } else if (info->mti_attr.ma_valid & MA_LMV) {
+                               LASSERT(req_capsule_get_size(pill, &RMF_MDT_MD,
+                                                            RCL_SERVER) ==
+                                               info->mti_attr.ma_lmv_size);
+                               memcpy(lmm, info->mti_attr.ma_lmv,
+                                      info->mti_attr.ma_lmv_size);
+                       }
                 }
                 /* update mdt_max_mdsize so clients will be aware about that */
                 if (info->mti_mdt->mdt_max_mdsize < info->mti_attr.ma_lmm_size)
@@ -1046,10 +1054,19 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                         RETURN(-EFAULT);
         } else {
                 req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
-        }
+               if (S_ISDIR(attr->la_mode) &&
+                   req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT) > 0) {
+                       sp->u.sp_ea.eadata =
+                               req_capsule_client_get(pill, &RMF_EADATA);
+                       sp->u.sp_ea.eadatalen =
+                               req_capsule_get_size(pill, &RMF_EADATA,
+                                                    RCL_CLIENT);
+                       sp->sp_cr_flags |= MDS_OPEN_HAS_EA;
+               }
+       }
 
-        rc = mdt_dlmreq_unpack(info);
-        RETURN(rc);
+       rc = mdt_dlmreq_unpack(info);
+       RETURN(rc);
 }
 
 static int mdt_link_unpack(struct mdt_thread_info *info)
index f5a244a..52354ee 100644 (file)
@@ -663,6 +663,33 @@ void mdt_mfd_set_mode(struct mdt_file_data *mfd, __u64 mode)
        mfd->mfd_mode = mode;
 }
 
+/**
+ * prep ma_lmm/ma_lmv for md_attr from reply
+ */
+void mdt_prep_ma_buf_from_rep(struct mdt_thread_info *info,
+                             struct mdt_object *obj,
+                             struct md_attr *ma)
+{
+       LASSERT(ma->ma_lmv == NULL && ma->ma_lmm == NULL);
+       if (S_ISDIR(obj->mot_header.loh_attr)) {
+               ma->ma_lmv = req_capsule_server_get(info->mti_pill,
+                                                   &RMF_MDT_MD);
+               ma->ma_lmv_size = req_capsule_get_size(info->mti_pill,
+                                                      &RMF_MDT_MD,
+                                                      RCL_SERVER);
+               if (ma->ma_lmv_size > 0)
+                       ma->ma_need |= MA_LMV;
+       } else {
+               ma->ma_lmm = req_capsule_server_get(info->mti_pill,
+                                                   &RMF_MDT_MD);
+               ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
+                                                      &RMF_MDT_MD,
+                                                      RCL_SERVER);
+               if (ma->ma_lmm_size > 0)
+                       ma->ma_need |= MA_LOV;
+       }
+}
+
 static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                        struct mdt_object *o, __u64 flags, int created,
                        struct ldlm_reply *rep)
@@ -710,6 +737,13 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                         repbody->valid |= OBD_MD_FLEASIZE;
         }
 
+       if (ma->ma_valid & MA_LMV) {
+               LASSERT(ma->ma_lmv_size != 0);
+               repbody->eadatasize = ma->ma_lmv_size;
+               LASSERT(isdir);
+               repbody->valid |= OBD_MD_FLDIREA | OBD_MD_MEA;
+       }
+
         if (flags & FMODE_WRITE) {
                 rc = mdt_write_get(o);
                 if (rc == 0) {
@@ -1021,13 +1055,7 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
         ldlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
 
-        ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
-                                               RCL_SERVER);
        ma->ma_need = MA_INODE | MA_HSM;
-        if (ma->ma_lmm_size > 0)
-                ma->ma_need |= MA_LOV;
-
         ma->ma_valid = 0;
 
         mdt_req_from_lcd(req, lcd);
@@ -1086,6 +1114,7 @@ void mdt_reconstruct_open(struct mdt_thread_info *info,
                        if (mdt_object_exists(child)) {
                                mdt_set_capainfo(info, 1, rr->rr_fid2,
                                                 BYPASS_CAPA);
+                               mdt_prep_ma_buf_from_rep(info, child, ma);
                                rc = mdt_attr_get_complex(info, child, ma);
                                if (rc == 0)
                                        rc = mdt_finish_open(info, parent,
@@ -1144,7 +1173,7 @@ int mdt_open_by_fid(struct mdt_thread_info *info, struct ldlm_reply *rep)
                        mdt_set_disposition(info, rep, (DISP_IT_EXECD |
                                                        DISP_LOOKUP_EXECD |
                                                        DISP_LOOKUP_POS));
-
+                       mdt_prep_ma_buf_from_rep(info, o, ma);
                        rc = mdt_attr_get_complex(info, o, ma);
                        if (rc == 0)
                                rc = mdt_finish_open(info, NULL, o, flags, 0,
@@ -1451,6 +1480,7 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep,
 
        mdt_set_disposition(info, rep, (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
+       mdt_prep_ma_buf_from_rep(info, o, ma);
        if (flags & MDS_OPEN_RELEASE)
                ma->ma_need |= MA_HSM;
        rc = mdt_attr_get_complex(info, o, ma);
@@ -1528,6 +1558,7 @@ static int mdt_cross_open(struct mdt_thread_info *info,
                        if (rc)
                                goto out;
 
+                       mdt_prep_ma_buf_from_rep(info, o, ma);
                        mdt_set_capainfo(info, 0, fid, BYPASS_CAPA);
                        rc = mdt_attr_get_complex(info, o, ma);
                        if (rc != 0)
@@ -1581,13 +1612,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        mdt_counter_incr(req, LPROC_MDT_OPEN);
         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
 
-        ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
-        ma->ma_lmm_size = req_capsule_get_size(info->mti_pill, &RMF_MDT_MD,
-                                               RCL_SERVER);
         ma->ma_need = MA_INODE;
-        if (ma->ma_lmm_size > 0)
-                ma->ma_need |= MA_LOV;
-
         ma->ma_valid = 0;
 
         LASSERT(info->mti_pill->rc_fmt == &RQF_LDLM_INTENT_OPEN);
@@ -1766,7 +1791,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                         mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
                         GOTO(out_child, result);
                 } else {
-
+                       mdt_prep_ma_buf_from_rep(info, child, ma);
                        /* XXX: we should call this once, see few lines below */
                        if (result == 0)
                                result = mdt_attr_get_complex(info, child, ma);
@@ -1820,13 +1845,13 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                } else if (mdt_object_exists(child)) {
                        /* We have to get attr & LOV EA & HSM for this
                         * object. */
+                       mdt_prep_ma_buf_from_rep(info, child, ma);
                        ma->ma_need |= MA_HSM;
                        result = mdt_attr_get_complex(info, child, ma);
                } else {
                        /* Object does not exist. Likely FS corruption. */
                        CERROR("%s: name '"DNAME"' present, but FID "
-                              DFID" is invalid\n",
-                              mdt_obd_name(info->mti_mdt),
+                              DFID" is invalid\n", mdt_obd_name(info->mti_mdt),
                               PNAME(&rr->rr_name), PFID(child_fid));
                        GOTO(out_child, result = -EIO);
                }
index 48f4f27..e3ffad0 100644 (file)
@@ -276,27 +276,46 @@ static int mdt_md_create(struct mdt_thread_info *info)
        lh = &info->mti_lh[MDT_LH_PARENT];
        mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
 
-        parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
-                                      MDS_INODELOCK_UPDATE);
-        if (IS_ERR(parent))
-                RETURN(PTR_ERR(parent));
+       parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+       if (IS_ERR(parent))
+               RETURN(PTR_ERR(parent));
 
-        rc = mdt_version_get_check_save(info, parent, 0);
-        if (rc)
-                GOTO(out_put_parent, rc);
+       if (!mdt_object_exists(parent))
+               GOTO(put_parent, rc = -ENOENT);
 
-        /*
-         * Check child name version during replay.
-         * During create replay a file may exist with same name.
-         */
+       lh = &info->mti_lh[MDT_LH_PARENT];
+       if (mdt_object_remote(parent)) {
+               mdt_lock_reg_init(lh, LCK_EX);
+               rc = mdt_remote_object_lock(info, parent, &lh->mlh_rreg_lh,
+                                           lh->mlh_rreg_mode,
+                                           MDS_INODELOCK_UPDATE);
+               if (rc != ELDLM_OK)
+                       GOTO(put_parent, rc);
+
+       } else {
+               mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
+               rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE,
+                                    MDT_LOCAL_LOCK);
+               if (rc)
+                       GOTO(put_parent, rc);
+
+               rc = mdt_version_get_check_save(info, parent, 0);
+               if (rc)
+                       GOTO(unlock_parent, rc);
+       }
+
+       /*
+        * Check child name version during replay.
+        * During create replay a file may exist with same name.
+        */
        rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
                                      &info->mti_tmp_fid1, 1);
        if (rc == 0)
-               GOTO(out_put_parent, rc = -EEXIST);
+               GOTO(unlock_parent, rc = -EEXIST);
 
        /* -ENOENT is expected here */
        if (rc != -ENOENT)
-               GOTO(out_put_parent, rc);
+               GOTO(unlock_parent, rc);
 
        /* save version of file name for replay, it must be ENOENT here */
        mdt_enoent_version_save(info, 1);
@@ -381,9 +400,11 @@ out_put_child:
                 rc = PTR_ERR(child);
         }
         mdt_create_pack_capa(info, rc, child, repbody);
-out_put_parent:
-        mdt_object_unlock_put(info, parent, lh, rc);
-        RETURN(rc);
+unlock_parent:
+       mdt_object_unlock(info, parent, lh, rc);
+put_parent:
+       mdt_object_put(info->mti_env, parent);
+       RETURN(rc);
 }
 
 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
index 7cd7870..0ca58c5 100644 (file)
@@ -2482,6 +2482,8 @@ void lprocfs_init_mps_stats(int num_private_stats, struct lprocfs_stats *stats)
         LPROCFS_MD_OP_INIT(num_private_stats, stats, init_ea_size);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, get_lustre_md);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, free_lustre_md);
+       LPROCFS_MD_OP_INIT(num_private_stats, stats, update_lsm_md);
+       LPROCFS_MD_OP_INIT(num_private_stats, stats, merge_attr);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, set_open_replay_data);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, clear_open_replay_data);
         LPROCFS_MD_OP_INIT(num_private_stats, stats, set_lock_data);
index 2b7c822..2d2ddcc 100644 (file)
@@ -931,7 +931,7 @@ static void osd_trans_commit_cb(struct super_block *sb,
 
         lu_context_exit(&th->th_ctx);
         lu_context_fini(&th->th_ctx);
-        OBD_FREE_PTR(oh);
+       thandle_put(th);
 }
 
 static struct thandle *osd_trans_create(const struct lu_env *env,
@@ -956,7 +956,9 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
                 th->th_result = 0;
                 th->th_tags = LCT_TX_HANDLE;
                 oh->ot_credits = 0;
-                oti->oti_dev = osd_dt_dev(d);
+               atomic_set(&th->th_refc, 1);
+               th->th_alloc_size = sizeof(*oh);
+               oti->oti_dev = osd_dt_dev(d);
                 CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
                 osd_th_alloced(oh);
 
@@ -1097,7 +1099,8 @@ static int osd_seq_exists(const struct lu_env *env,
 /*
  * Concurrency: shouldn't matter.
  */
-static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
+static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
+                         struct thandle *th)
 {
         int                     rc = 0;
         struct osd_thandle     *oh;
@@ -1140,7 +1143,7 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
                 if (rc != 0)
                         CERROR("Failure to stop transaction: %d\n", rc);
         } else {
-                OBD_FREE_PTR(oh);
+               thandle_put(&oh->ot_super);
         }
 
        /* as we want IO to journal and data IO be concurrent, we don't block
@@ -2195,7 +2198,6 @@ static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
 {
         LASSERT(ah);
 
-        memset(ah, 0, sizeof(*ah));
         ah->dah_parent = parent;
         ah->dah_mode = child_mode;
 }
@@ -2970,6 +2972,9 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
+       CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n",
+              PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len);
+
        osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
        if (fl & LU_XATTR_REPLACE)
                fs_flags |= XATTR_REPLACE;
@@ -4831,14 +4836,16 @@ static int osd_ldiskfs_it_fill(const struct lu_env *env,
         else
                up_read(&obj->oo_ext_idx_sem);
 
-        if (it->oie_rd_dirent == 0) {
-                result = -EIO;
-        } else {
-                it->oie_dirent = it->oie_buf;
-                it->oie_it_dirent = 1;
-        }
+       if (it->oie_rd_dirent == 0) {
+               /*If it does not get any dirent, it means it has been reached
+                *to the end of the dir */
+               it->oie_file.f_pos = ldiskfs_get_htree_eof(&it->oie_file);
+       } else {
+               it->oie_dirent = it->oie_buf;
+               it->oie_it_dirent = 1;
+       }
 
-        RETURN(result);
+       RETURN(result);
 }
 
 /**
index d4193cd..1025be9 100644 (file)
@@ -163,7 +163,7 @@ static void osd_trans_commit_cb(void *cb_data, int error)
        th->th_dev = NULL;
        lu_context_exit(&th->th_ctx);
        lu_context_fini(&th->th_ctx);
-       OBD_FREE_PTR(oh);
+       thandle_put(&oh->ot_super);
 
        EXIT;
 }
@@ -227,7 +227,8 @@ static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
 /*
  * Concurrency: shouldn't matter.
  */
-static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
+static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
+                         struct thandle *th)
 {
        struct osd_device       *osd = osd_dt_dev(th->th_dev);
        struct osd_thandle      *oh;
@@ -244,7 +245,7 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
                /* there won't be any commit, release reserved quota space now,
                 * if any */
                qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
-               OBD_FREE_PTR(oh);
+               thandle_put(&oh->ot_super);
                RETURN(0);
        }
 
@@ -304,6 +305,8 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
        th->th_dev = dt;
        th->th_result = 0;
        th->th_tags = LCT_TX_HANDLE;
+       atomic_set(&th->th_refc, 1);
+       th->th_alloc_size = sizeof(*oh);
        RETURN(th);
 }
 
index 23a81b2..e3aad21 100644 (file)
@@ -77,13 +77,11 @@ struct lu_object *osp_object_alloc(const struct lu_env *env,
        if (o != NULL) {
                l = &o->opo_obj.do_lu;
 
-               /* For data object, OSP obj would always be the top
-                * object, i.e. hdr is always NULL, see lu_object_alloc.
-                * But for metadata object, we always build the object
-                * stack from MDT. i.e. mdt_object will be the top object
-                * i.e.  hdr != NULL */
+               /* If hdr is NULL, it means the object is not built
+                * from the top dev(MDT/OST), usually it happens when
+                * building striped object, like data object on MDT or
+                * striped object for directory */
                if (hdr == NULL) {
-                       /* object for OST */
                        h = &o->opo_header;
                        lu_object_header_init(h);
                        dt_object_init(&o->opo_obj, h, d);
@@ -1161,6 +1159,16 @@ static int osp_obd_get_info(const struct lu_env *env, struct obd_export *exp,
        RETURN(rc);
 }
 
+int osp_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+                 struct md_op_data *op_data)
+{
+       struct client_obd *cli = &exp->exp_obd->u.cli;
+       struct lu_client_seq *seq = cli->cl_seq;
+
+       ENTRY;
+       RETURN(seq_client_alloc_fid(NULL, seq, fid));
+}
+
 /* context key constructor/destructor: mdt_key_init, mdt_key_fini */
 LU_KEY_INIT_FINI(osp, struct osp_thread_info);
 static void osp_key_exit(const struct lu_context *ctx,
@@ -1182,7 +1190,7 @@ struct lu_context_key osp_thread_key = {
 LU_KEY_INIT_FINI(osp_txn, struct osp_txn_info);
 
 struct lu_context_key osp_txn_key = {
-       .lct_tags = LCT_OSP_THREAD,
+       .lct_tags = LCT_OSP_THREAD | LCT_TX_HANDLE,
        .lct_init = osp_txn_key_init,
        .lct_fini = osp_txn_key_fini
 };
@@ -1221,6 +1229,7 @@ static struct obd_ops osp_obd_device_ops = {
        .o_statfs       = osp_obd_statfs,
        .o_fid_init     = client_fid_init,
        .o_fid_fini     = client_fid_fini,
+       .o_fid_alloc    = osp_fid_alloc,
 };
 
 struct llog_operations osp_mds_ost_orig_logops;
index 1ba68d2..b253a46 100644 (file)
@@ -273,7 +273,9 @@ struct osp_thread_info {
        struct obdo              osi_obdo;
 };
 
-static inline bool is_remote_trans(struct thandle *th)
+/* The transaction only include the updates on the remote node, and
+ * no local updates at all */
+static inline bool is_only_remote_trans(struct thandle *th)
 {
        return th->th_dev->dd_ops == &osp_dt_ops;
 }
@@ -472,7 +474,6 @@ struct thandle *osp_trans_create(const struct lu_env *env,
                                 struct dt_device *d);
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th);
-int osp_trans_stop(const struct lu_env *env, struct thandle *th);
 
 /* osp_object.c */
 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
@@ -491,6 +492,9 @@ int osp_declare_object_destroy(const struct lu_env *env,
 int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
                       struct thandle *th);
 
+int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
+                  struct thandle *th);
+
 /* osp_precreate.c */
 int osp_init_precreate(struct osp_device *d);
 int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d);
index fd262ac..a39cdbc 100644 (file)
@@ -70,7 +70,7 @@ static int osp_md_declare_object_create(const struct lu_env *env,
        bufs[0] = (char *)&osi->osi_obdo;
        buf_count = 1;
        fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
-       if (hint->dah_parent) {
+       if (hint != NULL && hint->dah_parent) {
                struct lu_fid *fid2;
                struct lu_fid *tmp_fid = &osi->osi_fid;
 
@@ -251,7 +251,6 @@ static int osp_md_declare_attr_set(const struct lu_env *env,
        }
 
        osi->osi_obdo.o_valid = 0;
-       LASSERT(!(attr->la_valid & (LA_MODE | LA_TYPE)));
        obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
                     attr->la_valid);
        lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
index 7d302c9..5779b8d 100644 (file)
@@ -1088,6 +1088,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o,
                        po->opo_non_exist = 1;
                        rc = 0;
                }
+               init_rwsem(&po->opo_sem);
        }
        RETURN(rc);
 }
index 574cd0d..922af56 100644 (file)
@@ -1289,7 +1289,7 @@ int osp_init_precreate(struct osp_device *d)
         * start thread handling precreation and statfs updates
         */
        task = kthread_run(osp_precreate_thread, d,
-                              "osp-pre-%u", d->opd_index);
+                          "osp-pre-%u-%u", d->opd_index, d->opd_group);
        if (IS_ERR(task)) {
                CERROR("can't start precreate thread %ld\n", PTR_ERR(task));
                RETURN(PTR_ERR(task));
index fc19031..493fbfe 100644 (file)
@@ -1040,7 +1040,7 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d)
        CFS_INIT_LIST_HEAD(&d->opd_syn_committed_there);
 
        rc = PTR_ERR(kthread_run(osp_sync_thread, d,
-                                "osp-syn-%u", d->opd_index));
+                                "osp-syn-%u-%u", d->opd_index, d->opd_group));
        if (IS_ERR_VALUE(rc)) {
                CERROR("%s: can't start sync thread: rc = %d\n",
                       d->opd_obd->obd_name, rc);
index b819572..902de54 100644 (file)
@@ -219,38 +219,59 @@ out:
        return rc;
 }
 
-struct thandle *osp_trans_create(const struct lu_env *env,
-                                struct dt_device *d)
+/**
+ * If the transaction creation goes to OSP, it means the update
+ * in this transaction only includes remote UPDATE. It is only
+ * used by LFSCK right now.
+ **/
+struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
 {
-       struct thandle *th;
+       struct thandle *th = NULL;
+       struct thandle_update *tu = NULL;
+       int rc;
 
        OBD_ALLOC_PTR(th);
        if (unlikely(th == NULL))
-               return ERR_PTR(-ENOMEM);
+               GOTO(out, rc = -ENOMEM);
 
        th->th_dev = d;
        th->th_tags = LCT_TX_HANDLE;
-       INIT_LIST_HEAD(&th->th_remote_update_list);
+       atomic_set(&th->th_refc, 1);
+       th->th_alloc_size = sizeof(*th);
+
+       OBD_ALLOC_PTR(tu);
+       if (tu == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       INIT_LIST_HEAD(&tu->tu_remote_update_list);
+       tu->tu_only_remote_trans = 1;
+out:
+       if (rc != 0) {
+               if (tu != NULL)
+                       OBD_FREE_PTR(tu);
+               if (th != NULL)
+                       OBD_FREE_PTR(th);
+               th = ERR_PTR(rc);
+       }
 
        return th;
 }
 
 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
-                            struct thandle *th)
+                            struct update_request *update, struct thandle *th)
 {
-       struct update_request   *update = th->th_current_request;
-       int                      rc     = 0;
+       struct thandle_update   *tu = th->th_update;
+       int                     rc = 0;
 
-       if (unlikely(update == NULL || update->ur_buf == NULL ||
-                    update->ur_buf->ub_count == 0))
-               return 0;
+       LASSERT(tu != NULL);
 
-       if (is_remote_trans(th)) {
+       /* If the transaction only includes remote update, it should
+        * still be asynchronous */
+       if (tu->tu_only_remote_trans) {
                struct osp_async_update_args    *args;
                struct ptlrpc_request           *req;
 
                list_del_init(&update->ur_list);
-               th->th_current_request = NULL;
                rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                         update->ur_buf,
                                         UPDATE_BUFFER_SIZE, &req);
@@ -264,6 +285,8 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                        out_destroy_update_req(update);
                }
        } else {
+               /* Before we support async update, the cross MDT transaction
+                * has to been synchronized */
                th->th_sync = 1;
                rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
                                     update, NULL);
@@ -275,40 +298,67 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th)
 {
+       struct thandle_update *tu = th->th_update;
+       struct update_request *update;
        int rc = 0;
 
-       if (!is_remote_trans(th))
-               rc = osp_trans_trigger(env, dt2osp_dev(dt), th);
+       if (tu == NULL)
+               return rc;
+
+       /* Check whether there are updates related with this OSP */
+       update = out_find_update(tu, dt);
+       if (update == NULL)
+               return rc;
+
+       /* Note: some updates needs to send before local transaction,
+        * some needs to send after local transaction.
+        *
+        * If the transaction only includes remote updates, it will
+        * send updates to remote MDT in osp_trans_stop.
+        *
+        * If it is remote create, it will send the remote req after
+        * local transaction. i.e. create the object locally first,
+        * then insert the name entry.
+        *
+        * If it is remote unlink, it will send the remote req before
+        * the local transaction, i.e. delete the name entry remote
+        * first, then destroy the local object. */
+       if (!tu->tu_only_remote_trans && !tu->tu_sent_after_local_trans)
+               rc = osp_trans_trigger(env, dt2osp_dev(dt), update, th);
 
        return rc;
 }
 
-int osp_trans_stop(const struct lu_env *env, struct thandle *th)
+int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
+                  struct thandle *th)
 {
-       struct update_request   *update = th->th_current_request;
-       int                      rc     = 0;
+       struct thandle_update   *tu = th->th_update;
+       struct update_request   *update;
+       int rc = 0;
+
+       LASSERT(tu != NULL);
+       /* Check whether there are updates related with this OSP */
+       update = out_find_update(tu, dt);
+       if (update == NULL)
+               return rc;
 
-       if (is_remote_trans(th)) {
-               LASSERT(update == NULL);
+       if (update->ur_buf->ub_count == 0)
+               GOTO(free, rc);
 
-               update = out_find_update(th, th->th_dev);
-               th->th_current_request = update;
+       if (tu->tu_only_remote_trans) {
                if (th->th_result == 0)
-                       rc = osp_trans_trigger(env, dt2osp_dev(th->th_dev), th);
+                       rc = osp_trans_trigger(env, dt2osp_dev(dt),
+                                              update, th);
                else
                        rc = th->th_result;
-
-               if (th->th_current_request != NULL)
-                       out_destroy_update_req(update);
-
-               OBD_FREE_PTR(th);
        } else {
-               LASSERT(update != NULL);
-
+               if (tu->tu_sent_after_local_trans)
+                       rc = osp_trans_trigger(env, dt2osp_dev(dt),
+                                              update, th);
                rc = update->ur_rc;
-               out_destroy_update_req(update);
-               th->th_current_request = NULL;
        }
-
+free:
+       out_destroy_update_req(update);
+       thandle_put(th);
        return rc;
 }
index 0707ec3..77e92a2 100644 (file)
@@ -2130,9 +2130,6 @@ void lustre_swab_lmv_user_md(struct lmv_user_md *lum)
        __swab32s(&lum->lum_hash_type);
        __swab32s(&lum->lum_type);
        CLASSERT(offsetof(typeof(*lum), lum_padding1) != 0);
-       CLASSERT(offsetof(typeof(*lum), lum_padding2) != 0);
-       CLASSERT(offsetof(typeof(*lum), lum_padding3) != 0);
-
        for (i = 0; i < lum->lum_stripe_count; i++) {
                __swab32s(&lum->lum_objects[i].lum_mds);
                lustre_swab_lu_fid(&lum->lum_objects[i].lum_fid);
index 5c65021..83105a9 100644 (file)
@@ -263,7 +263,7 @@ static int __out_tx_create(const struct lu_env *env, struct dt_object *obj,
        lu_object_get(&obj->do_lu);
        arg->object = obj;
        arg->u.create.attr = *attr;
-       if (parent_fid)
+       if (parent_fid != NULL)
                arg->u.create.fid = *parent_fid;
        memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint));
        arg->u.create.dof  = *dof;
@@ -300,7 +300,7 @@ static int out_create(struct tgt_session_info *tsi)
        la_from_obdo(attr, lobdo, lobdo->o_valid);
 
        dof->dof_type = dt_mode_to_dft(attr->la_mode);
-       if (S_ISDIR(attr->la_mode)) {
+       if (update->u_lens[1] > 0) {
                int size;
 
                fid = update_param_buf(update, 1, &size);
index 3fe8080..c2d41f7 100644 (file)
 #include <lustre_update.h>
 #include <obd.h>
 
-struct update_request *out_find_update(struct thandle *th,
+struct update_request *out_find_update(struct thandle_update *tu,
                                       struct dt_device *dt_dev)
 {
        struct update_request   *update;
 
-       list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
+       LASSERT(tu != NULL);
+       list_for_each_entry(update, &tu->tu_remote_update_list, ur_list) {
                if (update->ur_dt == dt_dev)
                        return update;
        }
@@ -90,7 +91,7 @@ struct update_request *out_create_update_req(struct dt_device *dt)
 EXPORT_SYMBOL(out_create_update_req);
 
 /**
- * Find one loc in th_dev/dev_obj_update for the update,
+ * Find or create one loc in th_dev/dev_obj_update for the update,
  * Because only one thread can access this thandle, no need
  * lock now.
  */
@@ -98,10 +99,21 @@ struct update_request *out_find_create_update_loc(struct thandle *th,
                                                  struct dt_object *dt)
 {
        struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+       struct thandle_update   *tu = th->th_update;
        struct update_request   *update;
        ENTRY;
 
-       update = out_find_update(th, dt_dev);
+       if (tu == NULL) {
+               OBD_ALLOC_PTR(tu);
+               if (tu == NULL)
+                       RETURN(ERR_PTR(-ENOMEM));
+
+               INIT_LIST_HEAD(&tu->tu_remote_update_list);
+               tu->tu_sent_after_local_trans = 0;
+               th->th_update = tu;
+       }
+
+       update = out_find_update(tu, dt_dev);
        if (update != NULL)
                RETURN(update);
 
@@ -109,7 +121,9 @@ struct update_request *out_find_create_update_loc(struct thandle *th,
        if (IS_ERR(update))
                RETURN(update);
 
-       list_add_tail(&update->ur_list, &th->th_remote_update_list);
+       list_add_tail(&update->ur_list, &tu->tu_remote_update_list);
+
+       thandle_get(th);
 
        RETURN(update);
 }
index 0cdc7f7..ee148e0 100755 (executable)
@@ -1717,7 +1717,7 @@ test_110c () {
        local MDTIDX=1
 
        mkdir -p $DIR/$tdir
-       drop_update_reply $((MDTIDX + 1)) "$LFS mkdir -i $MDTIDX $remote_dir" ||
+       drop_update_reply $MDTIDX "$LFS mkdir -i $MDTIDX $remote_dir" ||
                                                error "lfs mkdir failed"
 
        diridx=$($GETSTRIPE -M $remote_dir)
index 04638e0..b08f1cc 100755 (executable)
@@ -623,23 +623,23 @@ test_22a () {
        do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
 
        # OBD_FAIL_MDS_REINT_NET_REP       0x119
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
        CLIENT_PID=$!
 
-       fail mds${MDTIDX}
+       fail mds$((MDTIDX + 1))
        wait $CLIENT_PID || error "lfs mkdir failed"
 
-       replay_barrier mds${MDTIDX}
+       replay_barrier mds$MDTIDX
        create_remote_dir_files_22 || error "Remote creation failed $?"
-       fail mds${MDTIDX}
+       fail mds$MDTIDX
 
        checkstat_22 || error "check stat failed $?"
 
        rm -rf $MOUNT1/$tdir || error "rmdir remote_dir failed"
        return 0
 }
-run_test 22a "c1 lfs mkdir -i 1 dir1, M0 drop reply & fail, c2 mkdir dir1/dir"
+run_test 22a "c1 lfs mkdir -i 1 dir1, M1 drop reply & fail, c2 mkdir dir1/dir"
 
 test_22b () {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
@@ -649,7 +649,7 @@ test_22b () {
        # OBD_FAIL_MDS_REINT_NET_REP       0x119
        do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
 
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
        CLIENT_PID=$!
 
@@ -665,7 +665,7 @@ test_22b () {
        rm -rf $MOUNT1/$tdir || error "rmdir remote_dir failed"
        return 0
 }
-run_test 22b "c1 lfs mkdir -i 1 d1, M0 drop reply & fail M0/M1, c2 mkdir d1/dir"
+run_test 22b "c1 lfs mkdir -i 1 d1, M1 drop reply & fail M0/M1, c2 mkdir d1/dir"
 
 test_22c () {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
@@ -679,12 +679,12 @@ test_22c () {
        do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
 
        # OBD_FAIL_UPDATE_OBJ_NET_REP    0x1701
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+       do_facet mds$MDTIDX lctl set_param fail_loc=0x1701
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
        CLIENT_PID=$!
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
+       do_facet mds$MDTIDX lctl set_param fail_loc=0
 
-       fail mds$((MDTIDX+1))
+       fail mds$MDTIDX
        wait $CLIENT_PID || error "lfs mkdir failed"
 
        replay_barrier mds$MDTIDX
@@ -706,10 +706,10 @@ test_22d () {
        do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
 
        # OBD_FAIL_UPDATE_OBJ_NET_REP    0x1701
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+       do_facet mds$MDTIDX lctl set_param fail_loc=0x1701
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
        CLIENT_PID=$!
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
+       do_facet mds$MDTIDX lctl set_param fail_loc=0
 
        fail mds${MDTIDX},mds$((MDTIDX + 1))
        wait $CLIENT_PID || error "lfs mkdir failed"
index de79700..a6ac607 100755 (executable)
@@ -2045,11 +2045,11 @@ test_80a() {
 
        mkdir -p $DIR/$tdir
        #define OBD_FAIL_UPDATE_OBJ_NET_REP     0x1701
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
 
-       fail mds$((MDTIDX + 1))
+       fail mds${MDTIDX}
 
        wait $CLIENT_PID || error "remote creation failed"
 
@@ -2058,7 +2058,7 @@ test_80a() {
 
        return 0
 }
-run_test 80a "DNE: create remote dir, drop update rep from MDT1, fail MDT1"
+run_test 80a "DNE: create remote dir, drop update rep from MDT0, fail MDT0"
 
 test_80b() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
@@ -2072,11 +2072,11 @@ test_80b() {
 
        mkdir -p $DIR/$tdir
        #define OBD_FAIL_UPDATE_OBJ_NET_REP     0x1701
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
 
-       fail mds${MDTIDX}
+       fail mds$((MDTIDX + 1))
 
        wait $CLIENT_PID || error "remote creation failed"
 
@@ -2085,7 +2085,7 @@ test_80b() {
 
        return 0
 }
-run_test 80b "DNE: create remote dir, drop update rep from MDT1, fail MDT0"
+run_test 80b "DNE: create remote dir, drop update rep from MDT0, fail MDT1"
 
 test_80c() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
@@ -2099,7 +2099,7 @@ test_80c() {
 
        mkdir -p $DIR/$tdir
        #define OBD_FAIL_UPDATE_OBJ_NET_REP     0x1701
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
 
@@ -2122,10 +2122,13 @@ test_80d() {
 
        mkdir -p $DIR/$tdir
        #define OBD_FAIL_UPDATE_OBJ_NET_REP     0x1701
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1701
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1701
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
 
+       # sleep 3 seconds to make sure MDTs are failed after
+       # lfs mkdir -i has finished on all of MDTs.
+       sleep 3
        fail mds${MDTIDX},mds$((MDTIDX + 1))
 
        wait $CLIENT_PID || error "remote creation failed"
@@ -2149,10 +2152,14 @@ test_80e() {
 
        mkdir -p $DIR/$tdir
        # OBD_FAIL_MDS_REINT_NET_REP       0x119
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
 
+       # sleep 3 seconds to make sure MDTs are failed after
+       # lfs mkdir -i has finished on all of MDTs.
+       sleep 3
+
        fail mds${MDTIDX}
 
        wait $CLIENT_PID || error "remote creation failed"
@@ -2162,7 +2169,7 @@ test_80e() {
 
        return 0
 }
-run_test 80e "DNE: create remote dir, drop MDT0 rep, fail MDT0"
+run_test 80e "DNE: create remote dir, drop MDT1 rep, fail MDT0"
 
 test_80f() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
@@ -2175,7 +2182,7 @@ test_80f() {
 
        mkdir -p $DIR/$tdir
        # OBD_FAIL_MDS_REINT_NET_REP       0x119
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
 
@@ -2188,7 +2195,7 @@ test_80f() {
 
        return 0
 }
-run_test 80f "DNE: create remote dir, drop MDT0 rep, fail MDT1"
+run_test 80f "DNE: create remote dir, drop MDT1 rep, fail MDT1"
 
 test_80g() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
@@ -2202,10 +2209,14 @@ test_80g() {
 
        mkdir -p $DIR/$tdir
        # OBD_FAIL_MDS_REINT_NET_REP       0x119
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
 
+       # sleep 3 seconds to make sure MDTs are failed after
+       # lfs mkdir -i has finished on all of MDTs.
+       sleep 3
+
        fail mds${MDTIDX}
        fail mds$((MDTIDX + 1))
 
@@ -2216,7 +2227,7 @@ test_80g() {
 
        return 0
 }
-run_test 80g "DNE: create remote dir, drop MDT0 rep, fail MDT0, then MDT1"
+run_test 80g "DNE: create remote dir, drop MDT1 rep, fail MDT0, then MDT1"
 
 test_80h() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
@@ -2225,10 +2236,14 @@ test_80h() {
 
        mkdir -p $DIR/$tdir
        # OBD_FAIL_MDS_REINT_NET_REP       0x119
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
 
+       # sleep 3 seconds to make sure MDTs are failed after
+       # lfs mkdir -i has finished on all of MDTs.
+       sleep 3
+
        fail mds${MDTIDX},mds$((MDTIDX + 1))
 
        wait $CLIENT_PID || return 1
@@ -2238,7 +2253,7 @@ test_80h() {
 
        return 0
 }
-run_test 80h "DNE: create remote dir, drop MDT0 rep, fail 2 MDTs"
+run_test 80h "DNE: create remote dir, drop MDT1 rep, fail 2 MDTs"
 
 test_81a() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
index 2b46a12..4b20e2b 100644 (file)
@@ -592,7 +592,9 @@ check_fs_consistency_17n() {
        local cmd
        local rc=0
 
-       for mdt_index in $(seq 1 $MDSCOUNT); do
+       # create/unlink in 17n only change 2 MDTs(MDT1/MDT2),
+       # so it only check MDT1/MDT2 instead of all of MDTs.
+       for mdt_index in $(seq 1 2); do
                devname=$(mdsdevname $mdt_index)
                cmd="$E2FSCK -fnvd $devname"
 
@@ -11770,44 +11772,6 @@ test_230a() {
 }
 run_test 230a "Create remote directory and files under the remote directory"
 
-test_230b() {
-       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
-       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
-       local MDTIDX=1
-       local remote_dir=$DIR/$tdir/remote_dir
-       local rc=0
-
-       mkdir -p $DIR/$tdir
-       $LFS mkdir -i $MDTIDX $remote_dir ||
-               error "create remote directory failed"
-
-       $LFS mkdir -i 0 $remote_dir/new_dir &&
-               error "nested remote directory create succeed!"
-
-       do_facet mds$((MDTIDX + 1)) lctl set_param mdt.*.enable_remote_dir=1
-       $LFS mkdir -i 0 $remote_dir/new_dir || rc=$?
-       do_facet mds$((MDTIDX + 1)) lctl set_param mdt.*.enable_remote_dir=0
-
-       [ $rc -ne 0 ] &&
-          error "create remote directory failed after set enable_remote_dir"
-
-       rm -rf $remote_dir || error "first unlink remote directory failed"
-
-       $RUNAS -G$RUNAS_GID $LFS mkdir -i $MDTIDX $DIR/$tfile &&
-                                                       error "chown worked"
-
-       do_facet mds$MDTIDX lctl set_param \
-                               mdt.*.enable_remote_dir_gid=$RUNAS_GID
-       $LFS mkdir -i $MDTIDX $remote_dir || rc=$?
-       do_facet mds$MDTIDX lctl set_param mdt.*.enable_remote_dir_gid=0
-
-       [ $rc -ne 0 ] &&
-          error "create remote dir failed after set enable_remote_dir_gid"
-
-       rm -r $DIR/$tdir || error "second unlink remote directory failed"
-}
-run_test 230b "nested remote directory should be failed"
-
 test_231a()
 {
        # For simplicity this test assumes that max_pages_per_rpc
index 142c870..8b130e3 100644 (file)
@@ -1264,11 +1264,6 @@ test_40a() {
        rmdir $DIR2/$tfile-3
        check_pdo_conflict $PID1 || error "unlink is blocked"
 
-       if [ $MDSCOUNT -ge 2 ]; then
-               $LFS mkdir -i 1 $DIR2/$tfile-6
-               check_pdo_conflict $PID1 || error "remote mkdir is blocked"
-       fi
-
        # all operations above shouldn't wait the first one
        check_pdo_conflict $PID1 || error "parallel operation is blocked"
        wait $PID1
@@ -1299,11 +1294,6 @@ test_40b() {
        check_pdo_conflict $PID1 || error "unlink is blocked"
        # all operations above shouldn't wait the first one
 
-       if [ $MDSCOUNT -ge 2 ]; then
-               $LFS mkdir -i 1 $DIR2/$tfile-6
-               check_pdo_conflict $PID1 || error "remote mkdir is blocked"
-       fi
-
         check_pdo_conflict $PID1 || error "parallel operation is blocked"
        wait $PID1
        rm -r $DIR1/*
@@ -1333,11 +1323,6 @@ test_40c() {
        rmdir $DIR2/$tfile-3
        check_pdo_conflict $PID1 || error "unlink is blocked"
 
-       if [ $MDSCOUNT -ge 2 ]; then
-               $LFS mkdir -i 1 $DIR2/$tfile-6
-               check_pdo_conflict $PID1 || error "remote mkdir is blocked"
-       fi
-
         # all operations above shouldn't wait the first one
        check_pdo_conflict $PID1 || error "parallel operation is blocked"
        wait $PID1
@@ -1368,11 +1353,6 @@ test_40d() {
        rmdir $DIR2/$tfile-3
        check_pdo_conflict $PID1 || error "unlink is blocked"
 
-       if [ $MDSCOUNT -ge 2 ]; then
-               $LFS mkdir -i 1 $DIR2/$tfile-6
-               check_pdo_conflict $PID1 || error "remote mkdir is blocked"
-       fi
-
        # all operations above shouldn't wait the first one
        check_pdo_conflict $PID1 || error "parallel operation is blocked"
        wait $PID1
@@ -1400,11 +1380,6 @@ test_40e() {
        rmdir $DIR2/$tfile-3
        check_pdo_conflict $PID1 || error "unlink is blocked"
 
-       if [ $MDSCOUNT -ge 2 ]; then
-               $LFS mkdir -i 1 $DIR2/$tfile-6
-               check_pdo_conflict $PID1 || error "remote mkdir is blocked"
-       fi
-
        # all operations above shouldn't wait the first one
        check_pdo_conflict $PID1 || error "parallel operation is blocked"
        wait $PID1
index f1021a3..3e2752f 100644 (file)
@@ -155,8 +155,11 @@ command_t cmdlist[] = {
         "                 <directory|filename> ..."},
        {"setdirstripe", lfs_setdirstripe, 0,
         "To create a remote directory on a specified MDT.\n"
-        "usage: setdirstripe <--index|-i mdt_index> <dir>\n"
-        "\tmdt_index:    MDT index of first stripe\n"},
+        "usage: setdirstripe <--count|-c stripe_count>\n"
+        "[--index|-i mdt_index] [--hash-type|-t hash_type] <dir>\n"
+        "\tstripe_count: stripe count of the striped directory\n"
+        "\tmdt_index:  MDT index of first stripe\n"
+        "\thash_type:  hash type of the striped directory\n"},
        {"getdirstripe", lfs_getdirstripe, 0,
         "To list the striping info for a given directory\n"
         "or recursively for all directories in a directory tree.\n"
@@ -1447,30 +1450,40 @@ static int lfs_getdirstripe(int argc, char **argv)
 /* functions */
 static int lfs_setdirstripe(int argc, char **argv)
 {
-       char *dname;
-       int result;
-       int  st_offset, st_count;
-       char *end;
-       int c;
-       char *stripe_off_arg = NULL;
-       int  flags = 0;
+       char                    *dname;
+       int                     result;
+       unsigned int            stripe_offset = -1;
+       unsigned int            stripe_count = 1;
+       enum lmv_hash_type      hash_type;
+       char                    *end;
+       int                     c;
+       char                    *stripe_offset_opt = NULL;
+       char                    *stripe_count_opt = NULL;
+       char                    *stripe_hash_opt = NULL;
+       int                     flags = 0;
 
        struct option long_opts[] = {
-               {"index",    required_argument, 0, 'i'},
+               {"count",       required_argument, 0, 'c'},
+               {"index",       required_argument, 0, 'i'},
+               {"hash-type",   required_argument, 0, 't'},
                {0, 0, 0, 0}
        };
 
-       st_offset = -1;
-       st_count = 1;
        optind = 0;
-       while ((c = getopt_long(argc, argv, "i:o",
-                               long_opts, NULL)) >= 0) {
+
+       while ((c = getopt_long(argc, argv, "c:i:t:", long_opts, NULL)) >= 0) {
                switch (c) {
                case 0:
                        /* Long options. */
                        break;
+               case 'c':
+                       stripe_count_opt = optarg;
+                       break;
                case 'i':
-                       stripe_off_arg = optarg;
+                       stripe_offset_opt = optarg;
+                       break;
+               case 't':
+                       stripe_hash_opt = optarg;
                        break;
                default:
                        fprintf(stderr, "error: %s: option '%s' "
@@ -1486,22 +1499,47 @@ static int lfs_setdirstripe(int argc, char **argv)
                return CMD_HELP;
        }
 
-       dname = argv[optind];
-       if (stripe_off_arg == NULL) {
-               fprintf(stderr, "error: %s: missing stripe_off.\n",
+       if (stripe_offset_opt == NULL && stripe_count_opt == NULL) {
+               fprintf(stderr, "error: %s: missing stripe offset and count.\n",
                        argv[0]);
                return CMD_HELP;
        }
-       /* get the stripe offset */
-       st_offset = strtoul(stripe_off_arg, &end, 0);
-       if (*end != '\0') {
-               fprintf(stderr, "error: %s: bad stripe offset '%s'\n",
-                       argv[0], stripe_off_arg);
+
+       if (stripe_offset_opt != NULL) {
+               /* get the stripe offset */
+               stripe_offset = strtoul(stripe_offset_opt, &end, 0);
+               if (*end != '\0') {
+                       fprintf(stderr, "error: %s: bad stripe offset '%s'\n",
+                               argv[0], stripe_offset_opt);
+                       return CMD_HELP;
+               }
+       }
+
+       if (stripe_hash_opt == NULL ||
+           strcmp(stripe_hash_opt, LMV_HASH_NAME_FNV_1A_64) == 0) {
+               hash_type = LMV_HASH_TYPE_FNV_1A_64;
+       } else if (strcmp(stripe_hash_opt, LMV_HASH_NAME_ALL_CHARS) == 0) {
+               hash_type = LMV_HASH_TYPE_ALL_CHARS;
+       } else {
+               fprintf(stderr, "error: %s: bad stripe hash type '%s'\n",
+                       argv[0], stripe_hash_opt);
                return CMD_HELP;
        }
+
+       /* get the stripe count */
+       if (stripe_count_opt != NULL) {
+               stripe_count = strtoul(stripe_count_opt, &end, 0);
+               if (*end != '\0') {
+                       fprintf(stderr, "error: %s: bad stripe count '%s'\n",
+                               argv[0], stripe_count_opt);
+                       return CMD_HELP;
+               }
+       }
+
+       dname = argv[optind];
        do {
-               result = llapi_dir_create_pool(dname, flags, st_offset,
-                                              st_count, 0, NULL);
+               result = llapi_dir_create_pool(dname, flags, stripe_offset,
+                                              stripe_count, hash_type, NULL);
                if (result) {
                        fprintf(stderr, "error: %s: create stripe dir '%s' "
                                "failed\n", argv[0], dname);
index 04827bb..11b0a54 100644 (file)
@@ -792,12 +792,9 @@ int llapi_file_create_pool(const char *name, unsigned long long stripe_size,
         return 0;
 }
 
-/**
- * In DNE phase I, only stripe_offset will be used in this function.
- * stripe_count, stripe_pattern and pool_name will be supported later.
- */
 int llapi_dir_create_pool(const char *name, int flags, int stripe_offset,
-                         int stripe_count, int stripe_pattern, char *pool_name)
+                         int stripe_count, int stripe_pattern,
+                         const char *pool_name)
 {
        struct lmv_user_md lmu = { 0 };
        struct obd_ioctl_data data = { 0 };
@@ -831,7 +828,6 @@ int llapi_dir_create_pool(const char *name, int flags, int stripe_offset,
        }
 
        filename = basename(namepath);
-       lmu.lum_type = LMV_STRIPE_TYPE;
        dir = dirname(dirpath);
 
        data.ioc_inlbuf1 = (char *)filename;