Whamcloud - gitweb
- detect cross-ref cases in MDT using special flag rather than by using if (name...
authoryury <yury>
Fri, 10 Nov 2006 16:17:17 +0000 (16:17 +0000)
committeryury <yury>
Fri, 10 Nov 2006 16:17:17 +0000 (16:17 +0000)
- fixes in check split optimization.

15 files changed:
lustre/cmm/mdc_object.c
lustre/include/lustre/lustre_idl.h
lustre/llite/llite_lib.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_lib.c
lustre/mdd/mdd_dir.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c
lustre/mdt/mdt_reint.c
lustre/ptlrpc/client.c
lustre/ptlrpc/pack_generic.c
lustre/utils/mkfs_lustre.c

index f33a635..9121f80 100644 (file)
@@ -214,7 +214,7 @@ static int mdc_attr_get(const struct lu_env *env, struct md_object *mo,
 
         rc = md_getattr(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
                         NULL, OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID |
-                        OBD_MD_FLFLAGS, 0, &mci->mci_req);
+                        OBD_MD_FLFLAGS | OBD_MD_FLCROSSREF, 0, &mci->mci_req);
         if (rc == 0) {
                 /* get attr from request */
                 rc = mdc_req2attr_update(env, ma);
@@ -246,6 +246,7 @@ static int mdc_object_create(const struct lu_env *env,
         LASSERT(spec->u.sp_pfid != NULL);
 
         mci = mdc_info_init(env);
+        mci->mci_opdata.op_bias = MDS_CROSS_REF;
         mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu);
         
         /* Parent fid is needed to create dotdot on the remote node. */
@@ -313,6 +314,7 @@ static int mdc_ref_add(const struct lu_env *env, struct md_object *mo)
         LASSERT(mci);
 
         memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata));
+        mci->mci_opdata.op_bias = MDS_CROSS_REF;
         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
         //mci->mci_opdata.op_mod_time = la->la_ctime;
         //mci->mci_opdata.op_fsuid = la->la_uid;
@@ -356,6 +358,7 @@ static int mdc_ref_del(const struct lu_env *env, struct md_object *mo,
         ENTRY;
 
         mci = mdc_info_init(env);
+        mci->mci_opdata.op_bias = MDS_CROSS_REF;
         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu);
         mci->mci_opdata.op_mode = la->la_mode;
         mci->mci_opdata.op_mod_time = la->la_ctime;
@@ -422,6 +425,7 @@ static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p,
         ENTRY;
 
         mci = mdc_info_init(env);
+        mci->mci_opdata.op_bias = MDS_CROSS_REF;
         mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu);
         mci->mci_opdata.op_fid2 = *lf;
         mci->mci_opdata.op_mode = la->la_mode;
index 8059921..fe848cb 100644 (file)
@@ -666,8 +666,8 @@ struct md_op_data {
         struct obd_capa        *op_capa1;
         struct obd_capa        *op_capa2;
 
-        /* Should server check split in lookups or not. */
-        int                     op_cksplit;
+        /* Various operation flags. */
+        __u32                   op_bias;
 };
 
 #define MDS_MODE_DONT_LOCK (1 << 30)
@@ -756,6 +756,7 @@ struct lov_mds_md_v1 {            /* LOV EA mds/wire data (little-endian) */
 #define OBD_MD_FLMDSCAPA   (0x0000020000000000ULL) /* MDS capability */
 #define OBD_MD_FLOSSCAPA   (0x0000040000000000ULL) /* OSS capability */
 #define OBD_MD_FLCKSPLIT   (0x0000080000000000ULL) /* Check split on server */
+#define OBD_MD_FLCROSSREF  (0x0000100000000000ULL) /* Cross-ref case */
 
 #define OBD_MD_FLGETATTR (OBD_MD_FLID    | OBD_MD_FLATIME | OBD_MD_FLMTIME | \
                           OBD_MD_FLCTIME | OBD_MD_FLSIZE  | OBD_MD_FLBLKSZ | \
@@ -1214,6 +1215,11 @@ extern void lustre_swab_mdt_rec_setattr (struct mdt_rec_setattr *sa);
 #define MDS_OPEN_HAS_EA      010000000000 /* specify object create pattern */
 #define MDS_OPEN_HAS_OBJS    020000000000 /* Just set the EA the obj exist */
 
+enum {
+        MDS_CHECK_SPLIT = 1 << 0,
+        MDS_CROSS_REF   = 1 << 1
+};
+
 struct mds_rec_join {
         struct ll_fid  jr_fid;
         __u64          jr_headsize;
@@ -1262,7 +1268,7 @@ struct mdt_rec_create {
         __u64           cr_rdev;
         __u64           cr_ioepoch;
         __u32           cr_suppgid;
-        __u32           cr_cksplit;
+        __u32           cr_bias;
         __u32           cr_padding_2; /* also fix lustre_swab_mds_rec_create */
         __u32           cr_padding_3; /* also fix lustre_swab_mds_rec_create */
 };
@@ -1297,7 +1303,7 @@ struct mdt_rec_link {
         struct lu_fid   lk_fid1;
         struct lu_fid   lk_fid2;
         __u64           lk_time;
-        __u32           lk_cksplit;
+        __u32           lk_bias;
         __u32           lk_padding_2;  /* also fix lustre_swab_mds_rec_link */
         __u32           lk_padding_3;  /* also fix lustre_swab_mds_rec_link */
         __u32           lk_padding_4;  /* also fix lustre_swab_mds_rec_link */
@@ -1333,7 +1339,7 @@ struct mdt_rec_unlink {
         struct lu_fid   ul_fid1;
         struct lu_fid   ul_fid2;
         __u64           ul_time;
-        __u32           ul_cksplit;
+        __u32           ul_bias;
         __u32           ul_padding_2; /* also fix lustre_swab_mds_rec_unlink */
         __u32           ul_padding_3; /* also fix lustre_swab_mds_rec_unlink */
         __u32           ul_padding_4; /* also fix lustre_swab_mds_rec_unlink */
@@ -1370,7 +1376,7 @@ struct mdt_rec_rename {
         struct lu_fid   rn_fid2;
         __u64           rn_time;
         __u32           rn_mode;      /* cross-ref rename has mode */
-        __u32           rn_cksplit;   /* check for split or not */
+        __u32           rn_bias;      /* some operation flags */
         __u32           rn_padding_3; /* also fix lustre_swab_mdt_rec_rename */
         __u32           rn_padding_4; /* also fix lustre_swab_mdt_rec_rename */
 };
index 9b02026..4359684 100644 (file)
@@ -2264,6 +2264,7 @@ ll_prep_md_op_data(struct md_op_data *op_data, struct inode *i1,
         op_data->op_fsuid = current->fsuid;
         op_data->op_fsgid = current->fsgid;
         op_data->op_cap = current->cap_effective;
+        op_data->op_bias = MDS_CHECK_SPLIT;
 
         return op_data;
 }
index 1b0f683..167f0c9 100644 (file)
@@ -110,7 +110,7 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm,
                 GOTO(out, rc = -ENOMEM);
 
         op_data->op_fid1 = body->fid1;
-        op_data->op_cksplit = 0;
+        op_data->op_bias = MDS_CROSS_REF;
 
         rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags,
                             &req, cb_blocking, extra_lock_flags);
@@ -217,12 +217,12 @@ repeat:
 
                 rpid = obj->lo_inodes[mea_idx].li_fid;
                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
-                sop_data->op_cksplit = 0;
+                sop_data->op_bias &= ~MDS_CHECK_SPLIT;
                 lmv_obj_put(obj);
                 CDEBUG(D_OTHER, "Choose slave dir ("DFID")\n", PFID(&rpid));
         } else {
                 tgt_exp = lmv_find_export(lmv, &rpid);
-                sop_data->op_cksplit = 1;
+                sop_data->op_bias |= MDS_CHECK_SPLIT;
         }
         if (IS_ERR(tgt_exp))
                 GOTO(out_free_sop_data, rc = PTR_ERR(tgt_exp));
@@ -393,7 +393,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
                                                op_data->op_namelen);
                         rpid = obj->lo_inodes[mea_idx].li_fid;
                         mds = obj->lo_inodes[mea_idx].li_mds;
-                        sop_data->op_cksplit = 0;
+                        sop_data->op_bias &= ~MDS_CHECK_SPLIT;
                         lmv_obj_put(obj);
 
                         CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n",
@@ -402,7 +402,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data,
                         rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
                         if (rc)
                                 GOTO(out_free_sop_data, rc);
-                        sop_data->op_cksplit = 1;
+                        sop_data->op_bias |= MDS_CHECK_SPLIT;
                 }
         }
 
@@ -560,7 +560,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp)
                 memset(op_data, 0, sizeof(*op_data));
                 op_data->op_fid1 = fid;
                 op_data->op_fid2 = fid;
-                op_data->op_cksplit = 0;
+                op_data->op_bias = MDS_CROSS_REF;
 
                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
                 if (IS_ERR(tgt_exp))
@@ -662,13 +662,13 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data,
                                                op_data->op_namelen);
                         rpid = obj->lo_inodes[mea_idx].li_fid;
                         mds = obj->lo_inodes[mea_idx].li_mds;
-                        sop_data->op_cksplit = 0;
+                        sop_data->op_bias &= ~MDS_CHECK_SPLIT;
                         lmv_obj_put(obj);
                 } else {
                         rc = lmv_fld_lookup(lmv, &rpid, &mds);
                         if (rc)
                                 GOTO(out_free_sop_data, rc);
-                        sop_data->op_cksplit = 1;
+                        sop_data->op_bias |= MDS_CHECK_SPLIT;
                 }
 
                 CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n",
@@ -693,17 +693,18 @@ repeat:
                                 rpid = obj->lo_inodes[mea_idx].li_fid;
                                 mds = obj->lo_inodes[mea_idx].li_mds;
                         }
-                        sop_data->op_cksplit = 0;
+                        sop_data->op_bias &= ~MDS_CHECK_SPLIT;
                         lmv_obj_put(obj);
                 } else {
                         rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
                         if (rc)
                                 GOTO(out_free_sop_data, rc);
-                        sop_data->op_cksplit = 1;
+                        sop_data->op_bias |= MDS_CHECK_SPLIT;
                 }
                 fid_zero(&sop_data->op_fid2);
         }
 
+        sop_data->op_bias &= ~MDS_CROSS_REF;
         sop_data->op_fid1 = rpid;
 
         rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data, lmm, lmmsize,
@@ -899,10 +900,10 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp,
                         master = 1;
                         cb = cb_blocking;
                 }
-
+                
                 op_data->op_fid1 = fid;
                 op_data->op_fid2 = fid;
-                op_data->op_cksplit = 0;
+                op_data->op_bias = MDS_CROSS_REF;
 
                 /* is obj valid? */
                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
index d2b33e8..7fec591 100644 (file)
@@ -1315,12 +1315,12 @@ repeat:
                 mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
                                        op_data->op_name, op_data->op_namelen);
                 op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
-                op_data->op_cksplit = 0;
+                op_data->op_bias &= ~MDS_CHECK_SPLIT;
                 tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
                 lmv_obj_put(obj);
         } else {
                 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
-                op_data->op_cksplit = 1;
+                op_data->op_bias |= MDS_CHECK_SPLIT;
         }
 
         if (IS_ERR(tgt_exp))
@@ -1402,8 +1402,8 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype,
         for (i = 0; i < mea->mea_count; i++) {
                 memset(op_data2, 0, sizeof(*op_data2));
                 op_data2->op_fid1 = mea->mea_ids[i];
-                op_data2->op_cksplit = 0;
-
+                op_data2->op_bias = 0;
+        
                 tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
                 if (IS_ERR(tgt_exp))
                         GOTO(cleanup, rc = PTR_ERR(tgt_exp));
@@ -1497,6 +1497,7 @@ lmv_enqueue_remote(struct obd_export *exp, int lock_type,
         rdata->op_fid1 = fid_copy;
         rdata->op_name = NULL;
         rdata->op_namelen = 0;
+        rdata->op_bias = 0;
 
         rc = md_enqueue(tgt_exp, lock_type, it, lock_mode, rdata,
                         lockh, lmm, lmmsize, cb_compl, cb_blocking,
@@ -1632,7 +1633,8 @@ repeat:
                         }
 
                         rc = md_getattr_name(tgt_exp, &rid, NULL, NULL, 1,
-                                             valid, ea_size, &req);
+                                             valid | OBD_MD_FLCROSSREF,
+                                             ea_size, &req);
                         ptlrpc_req_finished(*request);
                         *request = req;
                 }
@@ -2050,13 +2052,15 @@ static int lmv_unlink_slaves(struct obd_export *exp,
         if (op_data2 == NULL)
                 RETURN(-ENOMEM);
 
+        op_data2->op_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
+        op_data2->op_fsuid = current->fsuid;
+        op_data2->op_fsgid = current->fsgid;
+        op_data2->op_bias = 0;
+        
         LASSERT(mea != NULL);
         for (i = 0; i < mea->mea_count; i++) {
                 memset(op_data2, 0, sizeof(*op_data2));
                 op_data2->op_fid1 = mea->mea_ids[i];
-                op_data2->op_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
-                op_data2->op_fsuid = current->fsuid;
-                op_data2->op_fsgid = current->fsgid;
                 tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
                 if (IS_ERR(tgt_exp))
                         GOTO(out_free_op_data2, rc = PTR_ERR(tgt_exp));
@@ -2114,6 +2118,7 @@ repeat:
                                                obj->lo_objcount,
                                                op_data->op_name,
                                                op_data->op_namelen);
+                        op_data->op_bias &= ~MDS_CHECK_SPLIT;
                         op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
                         tgt_exp = lmv_get_export(lmv,
                                                  obj->lo_inodes[mea_idx].li_mds);
@@ -2126,14 +2131,17 @@ repeat:
                 CDEBUG(D_OTHER, "drop i_nlink on "DFID"\n",
                        PFID(&op_data->op_fid1));
         }
-        if (tgt_exp == NULL)
+        if (tgt_exp == NULL) {
                 tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
-        if (IS_ERR(tgt_exp))
-                RETURN(PTR_ERR(tgt_exp));
+                if (IS_ERR(tgt_exp))
+                        RETURN(PTR_ERR(tgt_exp));
+                op_data->op_bias |= MDS_CHECK_SPLIT;
+        }
 
         op_data->op_fsuid = current->fsuid;
         op_data->op_fsgid = current->fsgid;
         op_data->op_cap   = current->cap_effective;
+        
         rc = md_unlink(tgt_exp, op_data, request);
         if (rc == -ERESTART) {
                 LASSERT(*request != NULL);
index f7298e9..bbccdab 100644 (file)
@@ -128,7 +128,7 @@ void mdc_create_pack(struct ptlrpc_request *req, int offset,
         rec->cr_time = op_data->op_mod_time;
         rec->cr_suppgid = op_data->op_suppgids[0];
         rec->cr_flags = op_data->op_flags;
-        rec->cr_cksplit = op_data->op_cksplit;
+        rec->cr_bias = op_data->op_bias;
 
         mdc_pack_capa(req, offset + 1, op_data->op_capa1);
 
@@ -195,7 +195,7 @@ void mdc_open_pack(struct ptlrpc_request *req, int offset,
         rec->cr_rdev = rdev;
         rec->cr_time = op_data->op_mod_time;
         rec->cr_suppgid = op_data->op_suppgids[0];
-        rec->cr_cksplit = op_data->op_cksplit;
+        rec->cr_bias = op_data->op_bias;
 
         mdc_pack_capa(req, offset + 1, op_data->op_capa1);
         /* the next buffer is child capa, which is used for replay,
@@ -299,7 +299,7 @@ void mdc_unlink_pack(struct ptlrpc_request *req, int offset,
         rec->ul_fid1 = op_data->op_fid1;
         rec->ul_fid2 = op_data->op_fid2;
         rec->ul_time = op_data->op_mod_time;
-        rec->ul_cksplit = op_data->op_cksplit;
+        rec->ul_bias = op_data->op_bias;
 
         mdc_pack_capa(req, offset + 1, op_data->op_capa1);
 
@@ -325,7 +325,7 @@ void mdc_link_pack(struct ptlrpc_request *req, int offset,
         rec->lk_fid1 = op_data->op_fid1;
         rec->lk_fid2 = op_data->op_fid2;
         rec->lk_time = op_data->op_mod_time;
-        rec->lk_cksplit = op_data->op_cksplit;
+        rec->lk_bias = op_data->op_bias;
 
         mdc_pack_capa(req, offset + 1, op_data->op_capa1);
         mdc_pack_capa(req, offset + 2, op_data->op_capa2);
@@ -354,7 +354,7 @@ void mdc_rename_pack(struct ptlrpc_request *req, int offset,
         rec->rn_fid2 = op_data->op_fid2;
         rec->rn_time = op_data->op_mod_time;
         rec->rn_mode = op_data->op_mode;
-        rec->rn_cksplit = op_data->op_cksplit;
+        rec->rn_bias = op_data->op_bias;
 
         mdc_pack_capa(req, offset + 1, op_data->op_capa1);
         mdc_pack_capa(req, offset + 2, op_data->op_capa2);
@@ -378,6 +378,10 @@ void mdc_getattr_pack(struct ptlrpc_request *req, int offset, __u64 valid,
         b->fsgid = current->fsgid;
         b->capability = current->cap_effective;
         b->valid = valid;
+        if (op_data->op_bias & MDS_CHECK_SPLIT)
+                b->valid |= OBD_MD_FLCKSPLIT;
+        if (op_data->op_bias & MDS_CROSS_REF)
+                b->valid |= OBD_MD_FLCROSSREF;
         b->flags = flags | MDS_BFLAG_EXT_FLAGS;
         b->suppgid = op_data->op_suppgids[0];
 
index 5783b25..8aa57ae 100644 (file)
@@ -824,13 +824,12 @@ static int mdd_cd_sanity_check(const struct lu_env *env,
 
 }
 
-static int mdd_create_data(const struct lu_env *env,
-                           struct md_object *pobj, struct md_object *cobj,
-                           const struct md_op_spec *spec,
+static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
+                           struct md_object *cobj, const struct md_op_spec *spec,
                            struct md_attr *ma)
 {
         struct mdd_device *mdd = mdo2mdd(cobj);
-        struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
+        struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
         struct mdd_object *son = md2mdd_obj(cobj);
         struct lu_attr    *attr = &ma->ma_attr;
         struct lov_mds_md *lmm = NULL;
@@ -844,10 +843,11 @@ static int mdd_create_data(const struct lu_env *env,
                 RETURN(rc);
 
         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
-                        !(spec->sp_cr_flags & FMODE_WRITE))
+            !(spec->sp_cr_flags & FMODE_WRITE))
                 RETURN(0);
-        rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
-                            attr);
+        
+        rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
+                            spec, attr);
         if (rc)
                 RETURN(rc);
 
index c0e5b0f..fc8df95 100644 (file)
@@ -475,7 +475,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                         repbody->valid |= OBD_MD_MEA;
                 }
         } else if (S_ISLNK(la->la_mode) &&
-                          reqbody->valid & OBD_MD_LINKNAME) {
+                   reqbody->valid & OBD_MD_LINKNAME) {
                 buffer->lb_buf = ma->ma_lmm;
                 buffer->lb_len = reqbody->eadatasize;
                 rc = mo_readlink(env, next, buffer);
@@ -497,9 +497,8 @@ static int mdt_getattr_internal(struct mdt_thread_info *info,
                 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
                 repbody->valid |= OBD_MD_FLMODEASIZE;
                 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
-                                "MAX_COOKIE to : %d:%d\n",
-                                repbody->max_mdsize,
-                                repbody->max_cookiesize);
+                       "MAX_COOKIE to : %d:%d\n", repbody->max_mdsize,
+                       repbody->max_cookiesize);
         }
 
         if (med->med_rmtclient && (reqbody->valid & OBD_MD_FLRMTPERM)) {
@@ -632,7 +631,7 @@ static int mdt_getattr(struct mdt_thread_info *info)
                 RETURN(err_serious(rc));
 
         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
-        LASSERT(repbody);
+        LASSERT(repbody != NULL);
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
 
@@ -643,9 +642,13 @@ static int mdt_getattr(struct mdt_thread_info *info)
         if (rc)
                 GOTO(out, rc);
 
-        /* don't check capability at all, because rename might
-         * getattr for remote obj, and at that time no capability
-         * is available. */
+        info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
+        info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
+        
+        /*
+         * Don't check capability at all, because rename might getattr for
+         * remote obj, and at that time no capability is available.
+         */
         mdt_set_capainfo(info, 1, &reqbody->fid1, BYPASS_CAPA);
         rc = mdt_getattr_internal(info, obj);
         if (reqbody->valid & OBD_MD_FLRMTPERM)
@@ -701,6 +704,8 @@ static int mdt_raw_lookup(struct mdt_thread_info *info,
         if (reqbody->valid != OBD_MD_FLID)
                 RETURN(0);
 
+        LASSERT(!info->mti_cross_ref);
+        
         /* Only got the fid of this obj by name */
         rc = mdo_lookup(info->mti_env, next, name, child_fid,
                         &info->mti_spec);
@@ -762,7 +767,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 RETURN(-ESTALE);
         else if (rc < 0) {
                 CERROR("Object "DFID" locates on remote server\n",
-                        PFID(mdt_object_fid(parent)));
+                       PFID(mdt_object_fid(parent)));
                 LBUG();
         }
 
@@ -773,7 +778,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 RETURN(rc);
         }
 
-        if (name[0] == 0) {
+        if (info->mti_cross_ref) {
                 /* Only getattr on the child. Parent is on another node. */
                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
                 child = parent;
@@ -877,17 +882,19 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
 
                         /* Debugging code. */
                         res_id = &lock->l_resource->lr_name;
-                        LDLM_DEBUG(lock, "we will return this lock client\n");
+                        LDLM_DEBUG(lock, "We will return this lock client\n");
                         LASSERTF(fid_res_name_eq(mdt_object_fid(child),
                                                  &lock->l_resource->lr_name),
-                                "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
-                                (unsigned long)res_id->name[0],
-                                (unsigned long)res_id->name[1],
-                                (unsigned long)res_id->name[2],
-                                PFID(mdt_object_fid(child)));
-
-                        /* Pack Size-on-MDS inode attributes to the body if
-                         * update lock is given. */
+                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
+                                 (unsigned long)res_id->name[0],
+                                 (unsigned long)res_id->name[1],
+                                 (unsigned long)res_id->name[2],
+                                 PFID(mdt_object_fid(child)));
+
+                        /*
+                         * Pack Size-on-MDS inode attributes to the body if
+                         * update lock is given.
+                         */
                         repbody = req_capsule_server_get(&info->mti_pill,
                                                          &RMF_MDT_BODY);
                         ma = &info->mti_attr.ma_attr;
@@ -920,7 +927,8 @@ static int mdt_getattr_name(struct mdt_thread_info *info)
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
         LASSERT(repbody != NULL);
 
-        info->mti_spec.sp_ck_split = (reqbody->valid & OBD_MD_FLCKSPLIT);
+        info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
+        info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
 
@@ -1852,8 +1860,9 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
                 rc = 0;
 
         if (rc == 0 && (flags & HABEO_REFERO)) {
-                struct mdt_device       *mdt = info->mti_mdt;
-                /*pack reply*/
+                struct mdt_device *mdt = info->mti_mdt;
+
+                /* Pack reply. */
                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
                                              mdt->mdt_max_mdsize);
@@ -2048,6 +2057,7 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_dlm_req = NULL;
         info->mti_has_trans = 0;
         info->mti_no_need_trans = 0;
+        info->mti_cross_ref = 0;
         info->mti_opdata = 0;
 
         /* To not check for split by default. */
@@ -2518,8 +2528,12 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
 
         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
         LASSERT(reqbody);
+
         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
         LASSERT(repbody);
+
+        info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
+        info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
         repbody->eadatasize = 0;
         repbody->aclsize = 0;
 
index 3581ff0..f861722 100644 (file)
@@ -322,7 +322,8 @@ struct mdt_thread_info {
         const struct ldlm_request *mti_dlm_req;
 
         __u32                      mti_has_trans:1, /* has txn already? */
-                                   mti_no_need_trans:1;
+                                   mti_no_need_trans:1,
+                                   mti_cross_ref:1;
 
         /* opdata for mdt_reint_open(), has the same as
          * ldlm_reply:lock_policy_res1.  mdt_update_last_rcvd() stores this
@@ -343,7 +344,7 @@ struct mdt_thread_info {
         struct mdt_reint_record    mti_rr;
         
         /*
-         * Create specification
+         * Operation specification (currently create and lookup)
          */
         struct md_op_spec          mti_spec;
 
index ef1837d..22399e2 100644 (file)
@@ -805,7 +805,8 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                          LA_CTIME | LA_MTIME | LA_ATIME;
         memset(&sp->u, 0, sizeof(sp->u));
         sp->sp_cr_flags = rec->cr_flags;
-        sp->sp_ck_split = rec->cr_cksplit;
+        sp->sp_ck_split = !!(rec->cr_bias & MDS_CHECK_SPLIT);
+        info->mti_cross_ref = !!(rec->cr_bias & MDS_CROSS_REF);
 
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
                 mdt_set_capainfo(info, 0, rr->rr_fid1,
@@ -897,7 +898,8 @@ static int mdt_link_unpack(struct mdt_thread_info *info)
         if (rr->rr_name == NULL)
                 RETURN(-EFAULT);
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
-        info->mti_spec.sp_ck_split = rec->lk_cksplit;
+        info->mti_spec.sp_ck_split = !!(rec->lk_bias & MDS_CHECK_SPLIT);
+        info->mti_cross_ref = !!(rec->lk_bias & MDS_CROSS_REF);
 
         RETURN(0);
 }
@@ -938,7 +940,8 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
         if (rr->rr_name == NULL)
                 RETURN(-EFAULT);
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
-        info->mti_spec.sp_ck_split = rec->ul_cksplit;
+        info->mti_spec.sp_ck_split = !!(rec->ul_bias & MDS_CHECK_SPLIT);
+        info->mti_cross_ref = !!(rec->ul_bias & MDS_CROSS_REF);
 
         RETURN(0);
 }
@@ -985,7 +988,8 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
                 RETURN(-EFAULT);
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
         rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT);
-        info->mti_spec.sp_ck_split = rec->rn_cksplit;
+        info->mti_spec.sp_ck_split = !!(rec->rn_bias & MDS_CHECK_SPLIT);
+        info->mti_cross_ref = !!(rec->rn_bias & MDS_CROSS_REF);
 
         RETURN(0);
 }
@@ -1025,7 +1029,8 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
         info->mti_spec.sp_cr_flags = rec->cr_flags;
         info->mti_replayepoch = rec->cr_ioepoch;
 
-        info->mti_spec.sp_ck_split = rec->cr_cksplit;
+        info->mti_spec.sp_ck_split = !!(rec->cr_bias & MDS_CHECK_SPLIT);
+        info->mti_cross_ref = !!(rec->cr_bias & MDS_CROSS_REF);
 
         if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
                 mdt_set_capainfo(info, 0, rr->rr_fid1,
index 585c140..3c8297b 100644 (file)
@@ -779,7 +779,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         mdt_set_disposition(info, ldlm_rep,
                             (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
 
-        if (rr->rr_name[0] == 0) {
+        if (info->mti_cross_ref) {
                 /* This is cross-ref open */
                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
                 result = mdt_cross_open(info, rr->rr_fid1, ldlm_rep,
index 785d69b..87971bb 100644 (file)
@@ -325,7 +325,7 @@ static int mdt_reint_create(struct mdt_thread_info *info,
         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
         case S_IFDIR:{
                 /* Cross-ref case. */
-                if (info->mti_rr.rr_name[0] == 0) {
+                if (info->mti_cross_ref) {
                         rc = mdt_md_mkobj(info);
                         break;
                 }
@@ -391,7 +391,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         if (!ma->ma_lmm || !ma->ma_cookie)
                 GOTO(out_unlock_parent, rc = -EINVAL);
 
-        if (rr->rr_name[0] == 0) {
+        if (info->mti_cross_ref) {
                 /*
                  * Remote partial operation. It is possible that replay may
                  * happen on parent MDT and this operation will be repeated.
@@ -470,7 +470,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
                 RETURN(err_serious(-ENOENT));
 
-        if (rr->rr_name[0] == 0) {
+        if (info->mti_cross_ref) {
                 /* MDT holding name ask us to add ref. */
                 lhs = &info->mti_lh[MDT_LH_CHILD];
                 mdt_lock_reg_init(lhs, LCK_EX);
@@ -706,7 +706,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
         int                      rc;
         ENTRY;
 
-        if (rr->rr_name[0] == 0) {
+        if (info->mti_cross_ref) {
                 rc = mdt_reint_rename_tgt(info);
                 RETURN(rc);
         }
index cf497aa..0f94d29 100644 (file)
@@ -315,6 +315,7 @@ static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
         struct ptlrpc_request_pool *pool = request->rq_pool;
 
         spin_lock(&pool->prp_lock);
+        LASSERT(list_empty(&request->rq_list));
         list_add_tail(&request->rq_list, &pool->prp_req_list);
         spin_unlock(&pool->prp_lock);
 }
index cd008b1..074c8b9 100644 (file)
@@ -1827,7 +1827,7 @@ void lustre_swab_mdt_rec_create (struct mdt_rec_create *cr)
         __swab64s (&cr->cr_rdev);
         __swab64s (&cr->cr_ioepoch);
         __swab32s (&cr->cr_suppgid);
-        __swab32s (&cr->cr_cksplit);
+        __swab32s (&cr->cr_bias);
         CLASSERT(offsetof(typeof(*cr), cr_padding_2) != 0);
         CLASSERT(offsetof(typeof(*cr), cr_padding_3) != 0);
 }
@@ -1860,7 +1860,7 @@ void lustre_swab_mdt_rec_link (struct mdt_rec_link *lk)
         lustre_swab_lu_fid (&lk->lk_fid1);
         lustre_swab_lu_fid (&lk->lk_fid2);
         __swab64s (&lk->lk_time);
-        __swab32s (&lk->lk_cksplit);
+        __swab32s (&lk->lk_bias);
         CLASSERT(offsetof(typeof(*lk), lk_padding_2) != 0);
         CLASSERT(offsetof(typeof(*lk), lk_padding_3) != 0);
         CLASSERT(offsetof(typeof(*lk), lk_padding_4) != 0);
@@ -1894,7 +1894,7 @@ void lustre_swab_mdt_rec_unlink (struct mdt_rec_unlink *ul)
         lustre_swab_lu_fid (&ul->ul_fid1);
         lustre_swab_lu_fid (&ul->ul_fid2);
         __swab64s (&ul->ul_time);
-        __swab32s (&ul->ul_cksplit);
+        __swab32s (&ul->ul_bias);
         CLASSERT(offsetof(typeof(*ul), ul_padding_2) != 0);
         CLASSERT(offsetof(typeof(*ul), ul_padding_3) != 0);
         CLASSERT(offsetof(typeof(*ul), ul_padding_4) != 0);
@@ -1929,7 +1929,7 @@ void lustre_swab_mdt_rec_rename (struct mdt_rec_rename *rn)
         lustre_swab_lu_fid (&rn->rn_fid2);
         __swab64s (&rn->rn_time);
         __swab32s (&rn->rn_mode);
-        __swab32s (&rn->rn_cksplit);
+        __swab32s (&rn->rn_bias);
         CLASSERT(offsetof(typeof(*rn), rn_padding_3) != 0);
         CLASSERT(offsetof(typeof(*rn), rn_padding_4) != 0);
 }
index cf2101f..089f012 100644 (file)
@@ -506,17 +506,14 @@ int make_lustre_backfs(struct mkfs_opts *mop)
                                 sprintf(buf, " -I %ld", inode_size);
                                 strcat(mop->mo_mkfsopts, buf);
                         }
-
                 }
 
-                if (verbose < 2) {
+                if (verbose < 2)
                         strcat(mop->mo_mkfsopts, " -q");
-                }
 
                 /* Enable hashed b-tree directory lookup in large dirs bz6224 */
-                if (strstr(mop->mo_mkfsopts, "-O") == NULL) {
+                if (strstr(mop->mo_mkfsopts, "-O") == NULL)
                         strcat(mop->mo_mkfsopts, " -O dir_index");
-                }
 
                 /* Allow reformat of full devices (as opposed to
                    partitions.)  We already checked for mounted dev. */
@@ -524,7 +521,6 @@ int make_lustre_backfs(struct mkfs_opts *mop)
 
                 sprintf(mkfs_cmd, "mkfs.ext2 -j -b %d -L %s ", L_BLOCK_SIZE,
                         mop->mo_ldd.ldd_svname);
-
         } else if (mop->mo_ldd.ldd_mount_type == LDD_MT_REISERFS) {
                 long journal_sz = 0; /* FIXME default journal size */
                 if (journal_sz > 0) {