Whamcloud - gitweb
- do not check dir for split if cleint already knows that it is split.
authoryury <yury>
Tue, 7 Nov 2006 20:07:27 +0000 (20:07 +0000)
committeryury <yury>
Tue, 7 Nov 2006 20:07:27 +0000 (20:07 +0000)
22 files changed:
lustre/cmm/cmm_internal.h
lustre/cmm/cmm_lproc.c
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c
lustre/cmm/mdc_object.c
lustre/include/lustre/lustre_idl.h
lustre/include/md_object.h
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_reint.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lov.c
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/ptlrpc/pack_generic.c

index a43765a..e3af448 100644 (file)
@@ -122,7 +122,7 @@ struct cmm_thread_info {
         struct lu_rdpg        cmi_rdpg;
         /* pointers to pages for readpage. */
         struct page          *cmi_pages[CMM_SPLIT_PAGE_COUNT];
-        struct md_create_spec cmi_spec;
+        struct md_op_spec     cmi_spec;
         struct lmv_stripe_md  cmi_lmv;
         char                  cmi_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
 };
@@ -202,7 +202,7 @@ int cmm_split_check(const struct lu_env *env, struct md_object *mp,
 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
                      struct md_attr *ma, int *split);
 
-int cmm_split_try(const struct lu_env *env, struct md_object *mo);
+int cmm_split_dir(const struct lu_env *env, struct md_object *mo);
 
 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
                      mdl_mode_t lm);
@@ -222,7 +222,8 @@ void cmm_lprocfs_time_end(struct cmm_device *cmm,
 
 enum {
         LPROC_CMM_SPLIT_CHECK = 0,
-        LPROC_CMM_SPLIT_EXEC,
+        LPROC_CMM_SPLIT,
+        LPROC_CMM_LOOKUP,
         LPROC_CMM_LAST
 };
 
index 1c763b6..2aa1e99 100644 (file)
@@ -58,10 +58,12 @@ static int cmm_procfs_init_stats(struct cmm_device *cmm, int num_stats)
 
         cmm->cmm_stats = stats;
 
+        lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_LOOKUP,
+                             LPROCFS_CNTR_AVGMINMAX, "lookup", "time");
+        lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_SPLIT,
+                             LPROCFS_CNTR_AVGMINMAX, "split", "time");
         lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_SPLIT_CHECK,
                              LPROCFS_CNTR_AVGMINMAX, "split_check", "time");
-        lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_SPLIT_EXEC,
-                             LPROCFS_CNTR_AVGMINMAX, "split_exec", "time");
         EXIT;
 cleanup:
         if (rc) {
index 8a9c928..4d999d6 100644 (file)
@@ -192,7 +192,7 @@ static struct lu_object_operations cml_obj_ops = {
 /* CMM local md_object operations */
 static int cml_object_create(const struct lu_env *env,
                              struct md_object *mo,
-                             const struct md_create_spec *spec,
+                             const struct md_op_spec *spec,
                              struct md_attr *attr)
 {
         int rc;
@@ -347,17 +347,27 @@ static struct md_object_operations cml_mo_ops = {
 
 /* md_dir operations */
 static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
-                      const char *name, struct lu_fid *lf)
+                      const char *name, struct lu_fid *lf,
+                      struct md_op_spec *spec)
 {
+        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo_p));
+        struct timeval start;
         int rc;
         ENTRY;
 
+        cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_LOOKUP);
+        
 #ifdef HAVE_SPLIT_SUPPORT
-        rc = cmm_split_check(env, mo_p, name);
-        if (rc)
-                RETURN(rc);
+        if (spec != NULL && spec->sp_ck_split) {
+                rc = cmm_split_check(env, mo_p, name);
+                if (rc) {
+                        cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_LOOKUP);
+                        RETURN(rc);
+                }
+        }
 #endif
-        rc = mdo_lookup(env, md_object_next(mo_p), name, lf);
+        rc = mdo_lookup(env, md_object_next(mo_p), name, lf, spec);
+        cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_LOOKUP);
         RETURN(rc);
 
 }
@@ -377,7 +387,7 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env,
 
 static int cml_create(const struct lu_env *env, struct md_object *mo_p,
                       const char *name, struct md_object *mo_c,
-                      struct md_create_spec *spec, struct md_attr *ma)
+                      struct md_op_spec *spec, struct md_attr *ma)
 {
         int rc;
         ENTRY;
@@ -408,24 +418,25 @@ static int cml_create(const struct lu_env *env, struct md_object *mo_p,
                  * -ERESTART to client to let it know that correct MDT should be
                  * choosen.
                  */
-                rc = cmm_split_try(env, mo_p);
-                if (rc) {
+                rc = cmm_split_dir(env, mo_p);
+                if (rc)
                         /* 
                          * -ERESTART or some split error is returned, we can't
                          * proceed with create.
                          */
                         RETURN(rc);
-                }
         }
-        
-        /* 
-         * Check for possible split directory and let caller know that it should
-         * tell client that directory is split and operation should repeat to
-         * correct MDT.
-         */
-        rc = cmm_split_check(env, mo_p, name);
-        if (rc)
-                RETURN(rc);
+
+        if (spec != NULL && spec->sp_ck_split) {
+                /* 
+                 * Check for possible split directory and let caller know that
+                 * it should tell client that directory is split and operation
+                 * should repeat to correct MDT.
+                 */
+                rc = cmm_split_check(env, mo_p, name);
+                if (rc)
+                        RETURN(rc);
+        }
 #endif
 
         rc = mdo_create(env, md_object_next(mo_p), name, md_object_next(mo_c),
@@ -436,7 +447,7 @@ static int cml_create(const struct lu_env *env, struct md_object *mo_p,
 
 static int cml_create_data(const struct lu_env *env, struct md_object *p,
                            struct md_object *o,
-                           const struct md_create_spec *spec,
+                           const struct md_op_spec *spec,
                            struct md_attr *ma)
 {
         int rc;
@@ -680,7 +691,7 @@ static struct lu_object_operations cmr_obj_ops = {
 /* CMM remote md_object operations. All are invalid */
 static int cmr_object_create(const struct lu_env *env,
                              struct md_object *mo,
-                             const struct md_create_spec *spec,
+                             const struct md_op_spec *spec,
                              struct md_attr *ma)
 {
         return -EFAULT;
@@ -789,7 +800,8 @@ static struct md_object_operations cmr_mo_ops = {
 
 /* remote part of md_dir operations */
 static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
-                      const char *name, struct lu_fid *lf)
+                      const char *name, struct lu_fid *lf,
+                      struct md_op_spec *spec)
 {
         /*
          * This can happens while rename() If new parent is remote dir, lookup
@@ -816,7 +828,7 @@ static mdl_mode_t cmr_lock_mode(const struct lu_env *env,
  */
 static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
                       const char *child_name, struct md_object *mo_c,
-                      struct md_create_spec *spec,
+                      struct md_op_spec *spec,
                       struct md_attr *ma)
 {
         struct cmm_thread_info *cmi;
@@ -879,7 +891,7 @@ static int cmr_link(const struct lu_env *env, struct md_object *mo_p,
         
         /* Make sure that name isn't exist before doing remote call. */
         rc = mdo_lookup(env, md_object_next(mo_p), name,
-                        &cmm_env_info(env)->cmi_fid);
+                        &cmm_env_info(env)->cmi_fid, NULL);
         if (rc == 0) {
                 rc = -EEXIST;
         } else if (rc == -ENOENT) {
index 136560a..0a11c20 100644 (file)
@@ -284,7 +284,7 @@ static int cmm_split_slave_create(const struct lu_env *env,
                                   struct lmv_stripe_md *lmv,
                                   int lmv_size)
 {
-        struct md_create_spec *spec = &cmm_env_info(env)->cmi_spec;
+        struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
         struct cmm_object *obj;
         int rc;
         ENTRY;
@@ -613,7 +613,7 @@ cleanup:
         return rc;
 }
 
-int cmm_split_try(const struct lu_env *env, struct md_object *mo)
+int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
@@ -622,7 +622,7 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo)
         struct timeval     start;
         ENTRY;
 
-        cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_SPLIT_EXEC);
+        cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_SPLIT);
         
         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
         memset(ma, 0, sizeof(*ma));
@@ -643,7 +643,7 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo)
 
         /*
          * Disable transacrions for split, since there will be so many trans in
-         * this one ops, confilct with current recovery design.
+         * this one ops, conflict with current recovery design.
          */
         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
         if (rc) {
@@ -694,6 +694,6 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo)
 cleanup:
         OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
 out:
-        cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_SPLIT_EXEC);
+        cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_SPLIT);
         return rc;
 }
index 53aeabc..f33a635 100644 (file)
@@ -228,7 +228,7 @@ static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
 
 static int mdc_object_create(const struct lu_env *env,
                              struct md_object *mo,
-                             const struct md_create_spec *spec,
+                             const struct md_op_spec *spec,
                              struct md_attr *ma)
 {
         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
index 5060129..8059921 100644 (file)
@@ -665,6 +665,9 @@ struct md_op_data {
         /* Capa fields */
         struct obd_capa        *op_capa1;
         struct obd_capa        *op_capa2;
+
+        /* Should server check split in lookups or not. */
+        int                     op_cksplit;
 };
 
 #define MDS_MODE_DONT_LOCK (1 << 30)
@@ -752,6 +755,7 @@ struct lov_mds_md_v1 {            /* LOV EA mds/wire data (little-endian) */
 #define OBD_MD_FLRMTPERM   (0x0000010000000000ULL) /* remote permission */
 #define OBD_MD_FLMDSCAPA   (0x0000020000000000ULL) /* MDS capability */
 #define OBD_MD_FLOSSCAPA   (0x0000040000000000ULL) /* OSS capability */
+#define OBD_MD_FLCKSPLIT   (0x0000080000000000ULL) /* Check split on server */
 
 #define OBD_MD_FLGETATTR (OBD_MD_FLID    | OBD_MD_FLATIME | OBD_MD_FLMTIME | \
                           OBD_MD_FLCTIME | OBD_MD_FLSIZE  | OBD_MD_FLBLKSZ | \
@@ -1258,7 +1262,7 @@ struct mdt_rec_create {
         __u64           cr_rdev;
         __u64           cr_ioepoch;
         __u32           cr_suppgid;
-        __u32           cr_padding_1; /* also fix lustre_swab_mds_rec_create */
+        __u32           cr_cksplit;
         __u32           cr_padding_2; /* also fix lustre_swab_mds_rec_create */
         __u32           cr_padding_3; /* also fix lustre_swab_mds_rec_create */
 };
@@ -1293,7 +1297,7 @@ struct mdt_rec_link {
         struct lu_fid   lk_fid1;
         struct lu_fid   lk_fid2;
         __u64           lk_time;
-        __u32           lk_padding_1;  /* also fix lustre_swab_mds_rec_link */
+        __u32           lk_cksplit;
         __u32           lk_padding_2;  /* also fix lustre_swab_mds_rec_link */
         __u32           lk_padding_3;  /* also fix lustre_swab_mds_rec_link */
         __u32           lk_padding_4;  /* also fix lustre_swab_mds_rec_link */
@@ -1329,7 +1333,7 @@ struct mdt_rec_unlink {
         struct lu_fid   ul_fid1;
         struct lu_fid   ul_fid2;
         __u64           ul_time;
-        __u32           ul_padding_1; /* also fix lustre_swab_mds_rec_unlink */
+        __u32           ul_cksplit;
         __u32           ul_padding_2; /* also fix lustre_swab_mds_rec_unlink */
         __u32           ul_padding_3; /* also fix lustre_swab_mds_rec_unlink */
         __u32           ul_padding_4; /* also fix lustre_swab_mds_rec_unlink */
@@ -1366,7 +1370,7 @@ struct mdt_rec_rename {
         struct lu_fid   rn_fid2;
         __u64           rn_time;
         __u32           rn_mode;      /* cross-ref rename has mode */
-        __u32           rn_padding_2; /* also fix lustre_swab_mdt_rec_rename */
+        __u32           rn_cksplit;   /* check for split or not */
         __u32           rn_padding_3; /* also fix lustre_swab_mdt_rec_rename */
         __u32           rn_padding_4; /* also fix lustre_swab_mdt_rec_rename */
 };
index f089198..875b33e 100644 (file)
@@ -134,8 +134,8 @@ struct md_attr {
         int                     ma_cookie_size;
 };
 
-/* additional parameters for create */
-struct md_create_spec {
+/* Additional parameters for create */
+struct md_op_spec {
         union {
                 /* symlink target */
                 const char               *sp_symname;
@@ -159,6 +159,9 @@ struct md_create_spec {
 
         /* Current lock mode for parent dir where create is performing. */
         mdl_mode_t sp_cr_mode;
+
+        /* Check for split */
+        int        sp_ck_split;
 };
 
 /*
@@ -196,7 +199,7 @@ struct md_object_operations {
         /* part of cross-ref operation */
         int (*moo_object_create)(const struct lu_env *env,
                                  struct md_object *obj,
-                                 const struct md_create_spec *spec,
+                                 const struct md_op_spec *spec,
                                  struct md_attr *ma);
 
         int (*moo_ref_add)(const struct lu_env *env, struct md_object *obj);
@@ -222,20 +225,21 @@ struct md_dir_operations {
                               const struct lu_fid *fid, struct lu_fid *sfid);
 
         int (*mdo_lookup)(const struct lu_env *env, struct md_object *obj,
-                          const char *name, struct lu_fid *fid);
+                          const char *name, struct lu_fid *fid,
+                          struct md_op_spec *spec);
 
         mdl_mode_t (*mdo_lock_mode)(const struct lu_env *env, struct md_object *obj,
                                     mdl_mode_t mode);
 
         int (*mdo_create)(const struct lu_env *env, struct md_object *pobj,
                           const char *name, struct md_object *child,
-                          struct md_create_spec *spec,
+                          struct md_op_spec *spec,
                           struct md_attr *ma);
 
         /* This method is used for creating data object for this meta object*/
         int (*mdo_create_data)(const struct lu_env *env, struct md_object *p,
                                struct md_object *o,
-                               const struct md_create_spec *spec,
+                               const struct md_op_spec *spec,
                                struct md_attr *ma);
 
         int (*mdo_rename)(const struct lu_env *env, struct md_object *spobj,
@@ -445,7 +449,7 @@ static inline int mo_readpage(const struct lu_env *env,
 
 static inline int mo_object_create(const struct lu_env *env,
                                    struct md_object *m,
-                                   const struct md_create_spec *spc,
+                                   const struct md_op_spec *spc,
                                    struct md_attr *at)
 {
         LASSERT(m->mo_ops->moo_object_create);
@@ -479,10 +483,11 @@ static inline int mo_capa_get(const struct lu_env *env,
 static inline int mdo_lookup(const struct lu_env *env,
                              struct md_object *p,
                              const char *name,
-                             struct lu_fid *f)
+                             struct lu_fid *f,
+                             struct md_op_spec *spec)
 {
         LASSERT(p->mo_dir_ops->mdo_lookup);
-        return p->mo_dir_ops->mdo_lookup(env, p, name, f);
+        return p->mo_dir_ops->mdo_lookup(env, p, name, f, spec);
 }
 
 static inline mdl_mode_t mdo_lock_mode(const struct lu_env *env,
@@ -498,7 +503,7 @@ static inline int mdo_create(const struct lu_env *env,
                              struct md_object *p,
                              const char *child_name,
                              struct md_object *c,
-                             struct md_create_spec *spc,
+                             struct md_op_spec *spc,
                              struct md_attr *at)
 {
         LASSERT(c->mo_dir_ops->mdo_create);
@@ -508,7 +513,7 @@ static inline int mdo_create(const struct lu_env *env,
 static inline int mdo_create_data(const struct lu_env *env,
                                   struct md_object *p,
                                   struct md_object *c,
-                                  const struct md_create_spec *spec,
+                                  const struct md_op_spec *spec,
                                   struct md_attr *ma)
 {
         LASSERT(c->mo_dir_ops->mdo_create_data);
index c57c4de..1b0f683 100644 (file)
@@ -110,6 +110,7 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm,
                 GOTO(out, rc = -ENOMEM);
 
         op_data->op_fid1 = body->fid1;
+        op_data->op_cksplit = 0;
 
         rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags,
                             &req, cb_blocking, extra_lock_flags);
@@ -216,10 +217,12 @@ repeat:
 
                 rpid = obj->lo_inodes[mea_idx].li_fid;
                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
+                sop_data->op_cksplit = 0;
                 lmv_obj_put(obj);
                 CDEBUG(D_OTHER, "Choose slave dir ("DFID")\n", PFID(&rpid));
         } else {
                 tgt_exp = lmv_find_export(lmv, &rpid);
+                sop_data->op_cksplit = 1;
         }
         if (IS_ERR(tgt_exp))
                 GOTO(out_free_sop_data, rc = PTR_ERR(tgt_exp));
@@ -390,6 +393,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
                                                op_data->op_namelen);
                         rpid = obj->lo_inodes[mea_idx].li_fid;
                         mds = obj->lo_inodes[mea_idx].li_mds;
+                        sop_data->op_cksplit = 0;
                         lmv_obj_put(obj);
 
                         CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n",
@@ -398,6 +402,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
                         rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
                         if (rc)
                                 GOTO(out_free_sop_data, rc);
+                        sop_data->op_cksplit = 1;
                 }
         }
 
@@ -555,6 +560,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 memset(op_data, 0, sizeof(*op_data));
                 op_data->op_fid1 = fid;
                 op_data->op_fid2 = fid;
+                op_data->op_cksplit = 0;
 
                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
                 if (IS_ERR(tgt_exp))
@@ -656,11 +662,13 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
                                                op_data->op_namelen);
                         rpid = obj->lo_inodes[mea_idx].li_fid;
                         mds = obj->lo_inodes[mea_idx].li_mds;
+                        sop_data->op_cksplit = 0;
                         lmv_obj_put(obj);
                 } else {
                         rc = lmv_fld_lookup(lmv, &rpid, &mds);
                         if (rc)
                                 GOTO(out_free_sop_data, rc);
+                        sop_data->op_cksplit = 1;
                 }
 
                 CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n",
@@ -685,11 +693,13 @@ repeat:
                                 rpid = obj->lo_inodes[mea_idx].li_fid;
                                 mds = obj->lo_inodes[mea_idx].li_mds;
                         }
+                        sop_data->op_cksplit = 0;
                         lmv_obj_put(obj);
                 } else {
                         rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
                         if (rc)
                                 GOTO(out_free_sop_data, rc);
+                        sop_data->op_cksplit = 1;
                 }
                 fid_zero(&sop_data->op_fid2);
         }
@@ -892,6 +902,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
 
                 op_data->op_fid1 = fid;
                 op_data->op_fid2 = fid;
+                op_data->op_cksplit = 0;
 
                 /* is obj valid? */
                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
index 942e6b1..7982adb 100644 (file)
@@ -1138,14 +1138,16 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid,
 
         obj = lmv_obj_grab(obd, fid);
 
-        CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n",
-               PFID(fid), obj ? "(split)" : "");
+        CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n", PFID(fid),
+               obj ? "(split)" : "");
 
-        /* if object is split, then we loop over all the slaves and gather size
+        /*
+         * If object is split, then we loop over all the slaves and gather size
          * attribute. In ideal world we would have to gather also mds field from
          * all slaves, as object is spread over the cluster and this is
          * definitely interesting information and it is not good to loss it,
-         * but... */
+         * but...
+         */
         if (obj) {
                 struct mdt_body *body;
 
@@ -1195,9 +1197,11 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid,
 
         CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid));
 
-        /* with CMD every object can have two locks in different namespaces:
+        /*
+         * With CMD every object can have two locks in different namespaces:
          * lookup lock in space of mds storing direntry and update/open lock in
-         * space of mds storing inode */
+         * space of mds storing inode.
+         */
         for (i = 0; i < lmv->desc.ld_tgt_count; i++)
                 md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
 
@@ -1311,10 +1315,12 @@ repeat:
                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                        op_data->op_name, op_data->op_namelen);
                 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
+                op_data->op_cksplit = 0;
                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
                 lmv_obj_put(obj);
         } else {
                 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
+                op_data->op_cksplit = 1;
         }
 
         if (IS_ERR(tgt_exp))
@@ -1396,6 +1402,7 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype,
         for (i = 0; i < mea->mea_count; i++) {
                 memset(op_data2, 0, sizeof(*op_data2));
                 op_data2->op_fid1 = mea->mea_ids[i];
+                op_data2->op_cksplit = 0;
 
                 tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
                 if (IS_ERR(tgt_exp))
@@ -1593,8 +1600,10 @@ repeat:
                 rid = obj->lo_inodes[mea_idx].li_fid;
                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
                 lmv_obj_put(obj);
+                valid &= ~OBD_MD_FLCKSPLIT;
         } else {
                 tgt_exp = lmv_find_export(lmv, &rid);
+                valid |= OBD_MD_FLCKSPLIT;
         }
         if (IS_ERR(tgt_exp))
                 RETURN(PTR_ERR(tgt_exp));
index ee3acf0..f7298e9 100644 (file)
@@ -128,6 +128,7 @@ void mdc_create_pack(struct ptlrpc_request *req, int offset,
         rec->cr_time = op_data->op_mod_time;
         rec->cr_suppgid = op_data->op_suppgids[0];
         rec->cr_flags = op_data->op_flags;
+        rec->cr_cksplit = op_data->op_cksplit;
 
         mdc_pack_capa(req, offset + 1, op_data->op_capa1);
 
@@ -194,6 +195,7 @@ void mdc_open_pack(struct ptlrpc_request *req, int offset,
         rec->cr_rdev = rdev;
         rec->cr_time = op_data->op_mod_time;
         rec->cr_suppgid = op_data->op_suppgids[0];
+        rec->cr_cksplit = op_data->op_cksplit;
 
         mdc_pack_capa(req, offset + 1, op_data->op_capa1);
         /* the next buffer is child capa, which is used for replay,
@@ -297,6 +299,7 @@ void mdc_unlink_pack(struct ptlrpc_request *req, int offset,
         rec->ul_fid1 = op_data->op_fid1;
         rec->ul_fid2 = op_data->op_fid2;
         rec->ul_time = op_data->op_mod_time;
+        rec->ul_cksplit = op_data->op_cksplit;
 
         mdc_pack_capa(req, offset + 1, op_data->op_capa1);
 
@@ -322,6 +325,7 @@ void mdc_link_pack(struct ptlrpc_request *req, int offset,
         rec->lk_fid1 = op_data->op_fid1;
         rec->lk_fid2 = op_data->op_fid2;
         rec->lk_time = op_data->op_mod_time;
+        rec->lk_cksplit = op_data->op_cksplit;
 
         mdc_pack_capa(req, offset + 1, op_data->op_capa1);
         mdc_pack_capa(req, offset + 2, op_data->op_capa2);
@@ -350,6 +354,7 @@ void mdc_rename_pack(struct ptlrpc_request *req, int offset,
         rec->rn_fid2 = op_data->op_fid2;
         rec->rn_time = op_data->op_mod_time;
         rec->rn_mode = op_data->op_mode;
+        rec->rn_cksplit = op_data->op_cksplit;
 
         mdc_pack_capa(req, offset + 1, op_data->op_capa1);
         mdc_pack_capa(req, offset + 2, op_data->op_capa2);
index 99f5c0b..d14640a 100644 (file)
@@ -218,9 +218,9 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
         ENTRY;
 
         size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                                        sizeof(struct lustre_capa) : 0;
+                sizeof(struct lustre_capa) : 0;
         size[REQ_REC_OFF + 2] = op_data->op_capa2 ?
-                                        sizeof(struct lustre_capa) : 0;
+                sizeof(struct lustre_capa) : 0;
 
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
                               MDS_REINT, 5, size, NULL);
index 49d956c..fd6f504 100644 (file)
@@ -235,7 +235,7 @@ int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid,
         size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0;
 
         /*
-         * XXX do we need to make another request here?  We just did a getattr
+         * XXX: Do we need to make another request here?  We just did a getattr
          * to do the lookup in the first place.
          */
         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
@@ -248,7 +248,8 @@ int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid,
 
         if (valid & OBD_MD_FLRMTPERM)
                 acl_size = sizeof(struct mdt_remote_perm);
-        /* currently only root inode will call us with FLACL */
+        
+        /* Currently only root inode will call us with FLACL */
         else if (valid & OBD_MD_FLACL)
                 acl_size = LUSTRE_POSIX_ACL_MAX_SIZE;
 
index 2a4772b..265d311 100644 (file)
@@ -69,7 +69,7 @@ __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
 
 static int mdd_lookup(const struct lu_env *env,
                       struct md_object *pobj, const char *name,
-                      struct lu_fid* fid)
+                      struct lu_fid* fid, struct md_op_spec *spec)
 {
         int rc;
         ENTRY;
@@ -826,7 +826,7 @@ static int mdd_cd_sanity_check(const struct lu_env *env,
 
 static int mdd_create_data(const struct lu_env *env,
                            struct md_object *pobj, struct md_object *cobj,
-                           const struct md_create_spec *spec,
+                           const struct md_op_spec *spec,
                            struct md_attr *ma)
 {
         struct mdd_device *mdd = mdo2mdd(cobj);
@@ -1043,7 +1043,7 @@ static int mdd_create_sanity_check(const struct lu_env *env,
 static int mdd_create(const struct lu_env *env,
                       struct md_object *pobj, const char *name,
                       struct md_object *child,
-                      struct md_create_spec *spec,
+                      struct md_op_spec *spec,
                       struct md_attr* ma)
 {
         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
index dafe88c..fa97b18 100644 (file)
@@ -126,7 +126,7 @@ int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj,
 int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
                    struct mdd_object *parent, struct mdd_object *child,
                    struct lov_mds_md **lmm, int *lmm_size,
-                   const struct md_create_spec *spec, struct lu_attr *la);
+                   const struct md_op_spec *spec, struct lu_attr *la);
 void mdd_lov_create_finish(const struct lu_env *env,
                            struct mdd_device *mdd, int rc);
 int mdd_get_md(const struct lu_env *env, struct mdd_object *obj,
index d5f6bf5..c1cac97 100644 (file)
@@ -387,7 +387,7 @@ void mdd_lov_create_finish(const struct lu_env *env,
 int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
                    struct mdd_object *parent, struct mdd_object *child,
                    struct lov_mds_md **lmm, int *lmm_size,
-                   const struct md_create_spec *spec, struct lu_attr *la)
+                   const struct md_op_spec *spec, struct lu_attr *la)
 {
         struct obd_device     *obd = mdd2obd_dev(mdd);
         struct obd_export     *lov_exp = obd->u.mds.mds_osc_exp;
index 35db61d..46f50fc 100644 (file)
@@ -969,7 +969,7 @@ static int mdd_oc_sanity_check(const struct lu_env *env,
 
 static int mdd_object_create(const struct lu_env *env,
                              struct md_object *obj,
-                             const struct md_create_spec *spec,
+                             const struct md_op_spec *spec,
                              struct md_attr *ma)
 {
 
index 07e9630..c0e5b0f 100644 (file)
@@ -702,7 +702,8 @@ static int mdt_raw_lookup(struct mdt_thread_info *info,
                 RETURN(0);
 
         /* Only got the fid of this obj by name */
-        rc = mdo_lookup(info->mti_env, next, name, child_fid);
+        rc = mdo_lookup(info->mti_env, next, name, child_fid,
+                        &info->mti_spec);
         if (rc != 0) {
                 if (rc == -ENOENT)
                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
@@ -825,7 +826,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 RETURN(rc);
 
         /* step 2: lookup child's fid by name */
-        rc = mdo_lookup(info->mti_env, next, name, child_fid);
+        rc = mdo_lookup(info->mti_env, next, name, child_fid,
+                        &info->mti_spec);
         if (rc != 0) {
                 if (rc == -ENOENT)
                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
@@ -914,9 +916,11 @@ static int mdt_getattr_name(struct mdt_thread_info *info)
         ENTRY;
 
         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(reqbody);
+        LASSERT(reqbody != NULL);
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
-        LASSERT(repbody);
+        LASSERT(repbody != NULL);
+
+        info->mti_spec.sp_ck_split = (reqbody->valid & OBD_MD_FLCKSPLIT);
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
 
@@ -2045,6 +2049,9 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_has_trans = 0;
         info->mti_no_need_trans = 0;
         info->mti_opdata = 0;
+
+        /* To not check for split by default. */
+        info->mti_spec.sp_ck_split = 0;
 }
 
 static void mdt_thread_info_fini(struct mdt_thread_info *info)
index eb7f37b..3581ff0 100644 (file)
@@ -341,11 +341,11 @@ struct mdt_thread_info {
          * reint record. contains information for reint operations.
          */
         struct mdt_reint_record    mti_rr;
+        
         /*
          * Create specification
          */
-        struct md_create_spec      mti_spec;
-
+        struct md_op_spec          mti_spec;
 
         /*
          * XXX: Part Four:
index aa49766..ef1837d 100644 (file)
@@ -779,7 +779,7 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
         struct lu_attr          *attr = &info->mti_attr.ma_attr;
         struct mdt_reint_record *rr = &info->mti_rr;
         struct req_capsule      *pill = &info->mti_pill;
-        struct md_create_spec   *sp = &info->mti_spec;
+        struct md_op_spec       *sp = &info->mti_spec;
         ENTRY;
 
         rec = req_capsule_client_get(pill, &RMF_REC_CREATE);
@@ -803,8 +803,9 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
         attr->la_atime = rec->cr_time;
         attr->la_valid = LA_MODE | LA_RDEV | LA_UID | LA_GID |
                          LA_CTIME | LA_MTIME | LA_ATIME;
-        memset(&sp->u, 0, sizeof sp->u);
+        memset(&sp->u, 0, sizeof(sp->u));
         sp->sp_cr_flags = rec->cr_flags;
+        sp->sp_ck_split = rec->cr_cksplit;
 
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
                 mdt_set_capainfo(info, 0, rr->rr_fid1,
@@ -823,7 +824,7 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                                                   RCL_CLIENT));
                 sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA);
                 sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
-                                                             RCL_CLIENT);
+                                                                RCL_CLIENT);
                 RETURN(0);
         }
 #endif
@@ -896,6 +897,7 @@ static int mdt_link_unpack(struct mdt_thread_info *info)
         if (rr->rr_name == NULL)
                 RETURN(-EFAULT);
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
+        info->mti_spec.sp_ck_split = rec->lk_cksplit;
 
         RETURN(0);
 }
@@ -936,6 +938,7 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
         if (rr->rr_name == NULL)
                 RETURN(-EFAULT);
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
+        info->mti_spec.sp_ck_split = rec->ul_cksplit;
 
         RETURN(0);
 }
@@ -982,6 +985,7 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
                 RETURN(-EFAULT);
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
         rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT);
+        info->mti_spec.sp_ck_split = rec->rn_cksplit;
 
         RETURN(0);
 }
@@ -1021,6 +1025,8 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
         info->mti_spec.sp_cr_flags = rec->cr_flags;
         info->mti_replayepoch = rec->cr_ioepoch;
 
+        info->mti_spec.sp_ck_split = rec->cr_cksplit;
+
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
                 mdt_set_capainfo(info, 0, rr->rr_fid1,
                                  req_capsule_client_get(pill, &RMF_CAPA1));
@@ -1035,7 +1041,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
 
         if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
-                struct md_create_spec *sp = &info->mti_spec;
+                struct md_op_spec *sp = &info->mti_spec;
                 sp->u.sp_ea.eadata = req_capsule_client_get(pill,
                                                             &RMF_EADATA);
                 sp->u.sp_ea.eadatalen = req_capsule_get_size(pill,
index 91e3e31..585c140 100644 (file)
@@ -75,7 +75,7 @@ void mdt_mfd_free(struct mdt_file_data *mfd)
 static int mdt_create_data(struct mdt_thread_info *info,
                            struct mdt_object *p, struct mdt_object *o)
 {
-        struct md_create_spec *spec = &info->mti_spec;
+        struct md_op_spec     *spec = &info->mti_spec;
         struct md_attr        *ma = &info->mti_attr;
         int rc;
         ENTRY;
@@ -801,7 +801,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                 GOTO(out, result = PTR_ERR(parent));
 
         result = mdo_lookup(info->mti_env, mdt_object_child(parent),
-                            rr->rr_name, child_fid);
+                            rr->rr_name, child_fid, &info->mti_spec);
         if (result != 0 && result != -ENOENT && result != -ESTALE)
                 GOTO(out_parent, result);
 
index e03dfcf..785d69b 100644 (file)
@@ -410,7 +410,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
 
         /* step 2: find & lock the child */
         rc = mdo_lookup(info->mti_env, mdt_object_child(mp),
-                        rr->rr_name, child_fid);
+                        rr->rr_name, child_fid, &info->mti_spec);
         if (rc != 0)
                  GOTO(out_unlock_parent, rc);
 
@@ -545,7 +545,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
         DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID,
                   rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
 
-        /* step 1: lookup & lock the tgt dir */
+        /* step 1: lookup & lock the tgt dir. */
         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
         mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
                           rr->rr_tgtlen);
@@ -554,10 +554,10 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
         if (IS_ERR(mtgtdir))
                 GOTO(out, rc = PTR_ERR(mtgtdir));
 
-        /*step 2: find & lock the target object if exists*/
+        /* step 2: find & lock the target object if exists. */
         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
-                        rr->rr_tgt, tgt_fid);
+                        rr->rr_tgt, tgt_fid, &info->mti_spec);
         if (rc != 0 && rc != -ENOENT) {
                 GOTO(out_unlock_tgtdir, rc);
         } else if (rc == 0) {
@@ -760,7 +760,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
 
         /* step 3: find & lock the old object. */
         rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
-                        rr->rr_name, old_fid);
+                        rr->rr_name, old_fid, &info->mti_spec);
         if (rc != 0)
                 GOTO(out_unlock_target, rc);
 
@@ -783,7 +783,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         /* step 4: find & lock the new object. */
         /* new target object may not exist now */
         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
-                        rr->rr_tgt, new_fid);
+                        rr->rr_tgt, new_fid, &info->mti_spec);
         if (rc == 0) {
                 /* the new_fid should have been filled at this moment */
                 if (lu_fid_eq(old_fid, new_fid))
index 47faddc..cd008b1 100644 (file)
@@ -1827,7 +1827,7 @@ void lustre_swab_mdt_rec_create (struct mdt_rec_create *cr)
         __swab64s (&cr->cr_rdev);
         __swab64s (&cr->cr_ioepoch);
         __swab32s (&cr->cr_suppgid);
-        CLASSERT(offsetof(typeof(*cr), cr_padding_1) != 0);
+        __swab32s (&cr->cr_cksplit);
         CLASSERT(offsetof(typeof(*cr), cr_padding_2) != 0);
         CLASSERT(offsetof(typeof(*cr), cr_padding_3) != 0);
 }
@@ -1860,7 +1860,7 @@ void lustre_swab_mdt_rec_link (struct mdt_rec_link *lk)
         lustre_swab_lu_fid (&lk->lk_fid1);
         lustre_swab_lu_fid (&lk->lk_fid2);
         __swab64s (&lk->lk_time);
-        CLASSERT(offsetof(typeof(*lk), lk_padding_1) != 0);
+        __swab32s (&lk->lk_cksplit);
         CLASSERT(offsetof(typeof(*lk), lk_padding_2) != 0);
         CLASSERT(offsetof(typeof(*lk), lk_padding_3) != 0);
         CLASSERT(offsetof(typeof(*lk), lk_padding_4) != 0);
@@ -1894,7 +1894,7 @@ void lustre_swab_mdt_rec_unlink (struct mdt_rec_unlink *ul)
         lustre_swab_lu_fid (&ul->ul_fid1);
         lustre_swab_lu_fid (&ul->ul_fid2);
         __swab64s (&ul->ul_time);
-        CLASSERT(offsetof(typeof(*ul), ul_padding_1) != 0);
+        __swab32s (&ul->ul_cksplit);
         CLASSERT(offsetof(typeof(*ul), ul_padding_2) != 0);
         CLASSERT(offsetof(typeof(*ul), ul_padding_3) != 0);
         CLASSERT(offsetof(typeof(*ul), ul_padding_4) != 0);
@@ -1929,7 +1929,7 @@ void lustre_swab_mdt_rec_rename (struct mdt_rec_rename *rn)
         lustre_swab_lu_fid (&rn->rn_fid2);
         __swab64s (&rn->rn_time);
         __swab32s (&rn->rn_mode);
-        CLASSERT(offsetof(typeof(*rn), rn_padding_2) != 0);
+        __swab32s (&rn->rn_cksplit);
         CLASSERT(offsetof(typeof(*rn), rn_padding_3) != 0);
         CLASSERT(offsetof(typeof(*rn), rn_padding_4) != 0);
 }