From 807509762b538ff018e7d9b39d4d35861db4dafa Mon Sep 17 00:00:00 2001 From: yury Date: Tue, 7 Nov 2006 20:07:27 +0000 Subject: [PATCH] - do not check dir for split if cleint already knows that it is split. --- lustre/cmm/cmm_internal.h | 7 +++-- lustre/cmm/cmm_lproc.c | 6 ++-- lustre/cmm/cmm_object.c | 60 +++++++++++++++++++++++--------------- lustre/cmm/cmm_split.c | 10 +++---- lustre/cmm/mdc_object.c | 2 +- lustre/include/lustre/lustre_idl.h | 12 +++++--- lustre/include/md_object.h | 27 ++++++++++------- lustre/lmv/lmv_intent.c | 11 +++++++ lustre/lmv/lmv_obd.c | 21 +++++++++---- lustre/mdc/mdc_lib.c | 5 ++++ lustre/mdc/mdc_reint.c | 4 +-- lustre/mdc/mdc_request.c | 5 ++-- lustre/mdd/mdd_dir.c | 6 ++-- lustre/mdd/mdd_internal.h | 2 +- lustre/mdd/mdd_lov.c | 2 +- lustre/mdd/mdd_object.c | 2 +- lustre/mdt/mdt_handler.c | 15 +++++++--- lustre/mdt/mdt_internal.h | 4 +-- lustre/mdt/mdt_lib.c | 14 ++++++--- lustre/mdt/mdt_open.c | 4 +-- lustre/mdt/mdt_reint.c | 12 ++++---- lustre/ptlrpc/pack_generic.c | 8 ++--- 22 files changed, 151 insertions(+), 88 deletions(-) diff --git a/lustre/cmm/cmm_internal.h b/lustre/cmm/cmm_internal.h index a43765a..e3af448 100644 --- a/lustre/cmm/cmm_internal.h +++ b/lustre/cmm/cmm_internal.h @@ -122,7 +122,7 @@ struct cmm_thread_info { struct lu_rdpg cmi_rdpg; /* pointers to pages for readpage. */ struct page *cmi_pages[CMM_SPLIT_PAGE_COUNT]; - struct md_create_spec cmi_spec; + struct md_op_spec cmi_spec; struct lmv_stripe_md cmi_lmv; char cmi_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE]; }; @@ -202,7 +202,7 @@ int cmm_split_check(const struct lu_env *env, struct md_object *mp, int cmm_split_expect(const struct lu_env *env, struct md_object *mo, struct md_attr *ma, int *split); -int cmm_split_try(const struct lu_env *env, struct md_object *mo); +int cmm_split_dir(const struct lu_env *env, struct md_object *mo); int cmm_split_access(const struct lu_env *env, struct md_object *mo, mdl_mode_t lm); @@ -222,7 +222,8 @@ void cmm_lprocfs_time_end(struct cmm_device *cmm, enum { LPROC_CMM_SPLIT_CHECK = 0, - LPROC_CMM_SPLIT_EXEC, + LPROC_CMM_SPLIT, + LPROC_CMM_LOOKUP, LPROC_CMM_LAST }; diff --git a/lustre/cmm/cmm_lproc.c b/lustre/cmm/cmm_lproc.c index 1c763b6..2aa1e99 100644 --- a/lustre/cmm/cmm_lproc.c +++ b/lustre/cmm/cmm_lproc.c @@ -58,10 +58,12 @@ static int cmm_procfs_init_stats(struct cmm_device *cmm, int num_stats) cmm->cmm_stats = stats; + lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_LOOKUP, + LPROCFS_CNTR_AVGMINMAX, "lookup", "time"); + lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_SPLIT, + LPROCFS_CNTR_AVGMINMAX, "split", "time"); lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_SPLIT_CHECK, LPROCFS_CNTR_AVGMINMAX, "split_check", "time"); - lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_SPLIT_EXEC, - LPROCFS_CNTR_AVGMINMAX, "split_exec", "time"); EXIT; cleanup: if (rc) { diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index 8a9c928..4d999d6 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -192,7 +192,7 @@ static struct lu_object_operations cml_obj_ops = { /* CMM local md_object operations */ static int cml_object_create(const struct lu_env *env, struct md_object *mo, - const struct md_create_spec *spec, + const struct md_op_spec *spec, struct md_attr *attr) { int rc; @@ -347,17 +347,27 @@ static struct md_object_operations cml_mo_ops = { /* md_dir operations */ static int cml_lookup(const struct lu_env *env, struct md_object *mo_p, - const char *name, struct lu_fid *lf) + const char *name, struct lu_fid *lf, + struct md_op_spec *spec) { + struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo_p)); + struct timeval start; int rc; ENTRY; + cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_LOOKUP); + #ifdef HAVE_SPLIT_SUPPORT - rc = cmm_split_check(env, mo_p, name); - if (rc) - RETURN(rc); + if (spec != NULL && spec->sp_ck_split) { + rc = cmm_split_check(env, mo_p, name); + if (rc) { + cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_LOOKUP); + RETURN(rc); + } + } #endif - rc = mdo_lookup(env, md_object_next(mo_p), name, lf); + rc = mdo_lookup(env, md_object_next(mo_p), name, lf, spec); + cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_LOOKUP); RETURN(rc); } @@ -377,7 +387,7 @@ static mdl_mode_t cml_lock_mode(const struct lu_env *env, static int cml_create(const struct lu_env *env, struct md_object *mo_p, const char *name, struct md_object *mo_c, - struct md_create_spec *spec, struct md_attr *ma) + struct md_op_spec *spec, struct md_attr *ma) { int rc; ENTRY; @@ -408,24 +418,25 @@ static int cml_create(const struct lu_env *env, struct md_object *mo_p, * -ERESTART to client to let it know that correct MDT should be * choosen. */ - rc = cmm_split_try(env, mo_p); - if (rc) { + rc = cmm_split_dir(env, mo_p); + if (rc) /* * -ERESTART or some split error is returned, we can't * proceed with create. */ RETURN(rc); - } } - - /* - * Check for possible split directory and let caller know that it should - * tell client that directory is split and operation should repeat to - * correct MDT. - */ - rc = cmm_split_check(env, mo_p, name); - if (rc) - RETURN(rc); + + if (spec != NULL && spec->sp_ck_split) { + /* + * Check for possible split directory and let caller know that + * it should tell client that directory is split and operation + * should repeat to correct MDT. + */ + rc = cmm_split_check(env, mo_p, name); + if (rc) + RETURN(rc); + } #endif rc = mdo_create(env, md_object_next(mo_p), name, md_object_next(mo_c), @@ -436,7 +447,7 @@ static int cml_create(const struct lu_env *env, struct md_object *mo_p, static int cml_create_data(const struct lu_env *env, struct md_object *p, struct md_object *o, - const struct md_create_spec *spec, + const struct md_op_spec *spec, struct md_attr *ma) { int rc; @@ -680,7 +691,7 @@ static struct lu_object_operations cmr_obj_ops = { /* CMM remote md_object operations. All are invalid */ static int cmr_object_create(const struct lu_env *env, struct md_object *mo, - const struct md_create_spec *spec, + const struct md_op_spec *spec, struct md_attr *ma) { return -EFAULT; @@ -789,7 +800,8 @@ static struct md_object_operations cmr_mo_ops = { /* remote part of md_dir operations */ static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p, - const char *name, struct lu_fid *lf) + const char *name, struct lu_fid *lf, + struct md_op_spec *spec) { /* * This can happens while rename() If new parent is remote dir, lookup @@ -816,7 +828,7 @@ static mdl_mode_t cmr_lock_mode(const struct lu_env *env, */ static int cmr_create(const struct lu_env *env, struct md_object *mo_p, const char *child_name, struct md_object *mo_c, - struct md_create_spec *spec, + struct md_op_spec *spec, struct md_attr *ma) { struct cmm_thread_info *cmi; @@ -879,7 +891,7 @@ static int cmr_link(const struct lu_env *env, struct md_object *mo_p, /* Make sure that name isn't exist before doing remote call. */ rc = mdo_lookup(env, md_object_next(mo_p), name, - &cmm_env_info(env)->cmi_fid); + &cmm_env_info(env)->cmi_fid, NULL); if (rc == 0) { rc = -EEXIST; } else if (rc == -ENOENT) { diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index 136560a..0a11c20 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -284,7 +284,7 @@ static int cmm_split_slave_create(const struct lu_env *env, struct lmv_stripe_md *lmv, int lmv_size) { - struct md_create_spec *spec = &cmm_env_info(env)->cmi_spec; + struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec; struct cmm_object *obj; int rc; ENTRY; @@ -613,7 +613,7 @@ cleanup: return rc; } -int cmm_split_try(const struct lu_env *env, struct md_object *mo) +int cmm_split_dir(const struct lu_env *env, struct md_object *mo) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct md_attr *ma = &cmm_env_info(env)->cmi_ma; @@ -622,7 +622,7 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo) struct timeval start; ENTRY; - cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_SPLIT_EXEC); + cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_SPLIT); LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu))); memset(ma, 0, sizeof(*ma)); @@ -643,7 +643,7 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo) /* * Disable transacrions for split, since there will be so many trans in - * this one ops, confilct with current recovery design. + * this one ops, conflict with current recovery design. */ rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS); if (rc) { @@ -694,6 +694,6 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo) cleanup: OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); out: - cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_SPLIT_EXEC); + cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_SPLIT); return rc; } diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c index 53aeabc..f33a635 100644 --- a/lustre/cmm/mdc_object.c +++ b/lustre/cmm/mdc_object.c @@ -228,7 +228,7 @@ static int mdc_attr_get(const struct lu_env *env, struct md_object *mo, static int mdc_object_create(const struct lu_env *env, struct md_object *mo, - const struct md_create_spec *spec, + const struct md_op_spec *spec, struct md_attr *ma) { struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 5060129..8059921 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -665,6 +665,9 @@ struct md_op_data { /* Capa fields */ struct obd_capa *op_capa1; struct obd_capa *op_capa2; + + /* Should server check split in lookups or not. */ + int op_cksplit; }; #define MDS_MODE_DONT_LOCK (1 << 30) @@ -752,6 +755,7 @@ struct lov_mds_md_v1 { /* LOV EA mds/wire data (little-endian) */ #define OBD_MD_FLRMTPERM (0x0000010000000000ULL) /* remote permission */ #define OBD_MD_FLMDSCAPA (0x0000020000000000ULL) /* MDS capability */ #define OBD_MD_FLOSSCAPA (0x0000040000000000ULL) /* OSS capability */ +#define OBD_MD_FLCKSPLIT (0x0000080000000000ULL) /* Check split on server */ #define OBD_MD_FLGETATTR (OBD_MD_FLID | OBD_MD_FLATIME | OBD_MD_FLMTIME | \ OBD_MD_FLCTIME | OBD_MD_FLSIZE | OBD_MD_FLBLKSZ | \ @@ -1258,7 +1262,7 @@ struct mdt_rec_create { __u64 cr_rdev; __u64 cr_ioepoch; __u32 cr_suppgid; - __u32 cr_padding_1; /* also fix lustre_swab_mds_rec_create */ + __u32 cr_cksplit; __u32 cr_padding_2; /* also fix lustre_swab_mds_rec_create */ __u32 cr_padding_3; /* also fix lustre_swab_mds_rec_create */ }; @@ -1293,7 +1297,7 @@ struct mdt_rec_link { struct lu_fid lk_fid1; struct lu_fid lk_fid2; __u64 lk_time; - __u32 lk_padding_1; /* also fix lustre_swab_mds_rec_link */ + __u32 lk_cksplit; __u32 lk_padding_2; /* also fix lustre_swab_mds_rec_link */ __u32 lk_padding_3; /* also fix lustre_swab_mds_rec_link */ __u32 lk_padding_4; /* also fix lustre_swab_mds_rec_link */ @@ -1329,7 +1333,7 @@ struct mdt_rec_unlink { struct lu_fid ul_fid1; struct lu_fid ul_fid2; __u64 ul_time; - __u32 ul_padding_1; /* also fix lustre_swab_mds_rec_unlink */ + __u32 ul_cksplit; __u32 ul_padding_2; /* also fix lustre_swab_mds_rec_unlink */ __u32 ul_padding_3; /* also fix lustre_swab_mds_rec_unlink */ __u32 ul_padding_4; /* also fix lustre_swab_mds_rec_unlink */ @@ -1366,7 +1370,7 @@ struct mdt_rec_rename { struct lu_fid rn_fid2; __u64 rn_time; __u32 rn_mode; /* cross-ref rename has mode */ - __u32 rn_padding_2; /* also fix lustre_swab_mdt_rec_rename */ + __u32 rn_cksplit; /* check for split or not */ __u32 rn_padding_3; /* also fix lustre_swab_mdt_rec_rename */ __u32 rn_padding_4; /* also fix lustre_swab_mdt_rec_rename */ }; diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index f089198..875b33e 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -134,8 +134,8 @@ struct md_attr { int ma_cookie_size; }; -/* additional parameters for create */ -struct md_create_spec { +/* Additional parameters for create */ +struct md_op_spec { union { /* symlink target */ const char *sp_symname; @@ -159,6 +159,9 @@ struct md_create_spec { /* Current lock mode for parent dir where create is performing. */ mdl_mode_t sp_cr_mode; + + /* Check for split */ + int sp_ck_split; }; /* @@ -196,7 +199,7 @@ struct md_object_operations { /* part of cross-ref operation */ int (*moo_object_create)(const struct lu_env *env, struct md_object *obj, - const struct md_create_spec *spec, + const struct md_op_spec *spec, struct md_attr *ma); int (*moo_ref_add)(const struct lu_env *env, struct md_object *obj); @@ -222,20 +225,21 @@ struct md_dir_operations { const struct lu_fid *fid, struct lu_fid *sfid); int (*mdo_lookup)(const struct lu_env *env, struct md_object *obj, - const char *name, struct lu_fid *fid); + const char *name, struct lu_fid *fid, + struct md_op_spec *spec); mdl_mode_t (*mdo_lock_mode)(const struct lu_env *env, struct md_object *obj, mdl_mode_t mode); int (*mdo_create)(const struct lu_env *env, struct md_object *pobj, const char *name, struct md_object *child, - struct md_create_spec *spec, + struct md_op_spec *spec, struct md_attr *ma); /* This method is used for creating data object for this meta object*/ int (*mdo_create_data)(const struct lu_env *env, struct md_object *p, struct md_object *o, - const struct md_create_spec *spec, + const struct md_op_spec *spec, struct md_attr *ma); int (*mdo_rename)(const struct lu_env *env, struct md_object *spobj, @@ -445,7 +449,7 @@ static inline int mo_readpage(const struct lu_env *env, static inline int mo_object_create(const struct lu_env *env, struct md_object *m, - const struct md_create_spec *spc, + const struct md_op_spec *spc, struct md_attr *at) { LASSERT(m->mo_ops->moo_object_create); @@ -479,10 +483,11 @@ static inline int mo_capa_get(const struct lu_env *env, static inline int mdo_lookup(const struct lu_env *env, struct md_object *p, const char *name, - struct lu_fid *f) + struct lu_fid *f, + struct md_op_spec *spec) { LASSERT(p->mo_dir_ops->mdo_lookup); - return p->mo_dir_ops->mdo_lookup(env, p, name, f); + return p->mo_dir_ops->mdo_lookup(env, p, name, f, spec); } static inline mdl_mode_t mdo_lock_mode(const struct lu_env *env, @@ -498,7 +503,7 @@ static inline int mdo_create(const struct lu_env *env, struct md_object *p, const char *child_name, struct md_object *c, - struct md_create_spec *spc, + struct md_op_spec *spc, struct md_attr *at) { LASSERT(c->mo_dir_ops->mdo_create); @@ -508,7 +513,7 @@ static inline int mdo_create(const struct lu_env *env, static inline int mdo_create_data(const struct lu_env *env, struct md_object *p, struct md_object *c, - const struct md_create_spec *spec, + const struct md_op_spec *spec, struct md_attr *ma) { LASSERT(c->mo_dir_ops->mdo_create_data); diff --git a/lustre/lmv/lmv_intent.c b/lustre/lmv/lmv_intent.c index c57c4de..1b0f683 100644 --- a/lustre/lmv/lmv_intent.c +++ b/lustre/lmv/lmv_intent.c @@ -110,6 +110,7 @@ int lmv_intent_remote(struct obd_export *exp, void *lmm, GOTO(out, rc = -ENOMEM); op_data->op_fid1 = body->fid1; + op_data->op_cksplit = 0; rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags, &req, cb_blocking, extra_lock_flags); @@ -216,10 +217,12 @@ repeat: rpid = obj->lo_inodes[mea_idx].li_fid; tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds); + sop_data->op_cksplit = 0; lmv_obj_put(obj); CDEBUG(D_OTHER, "Choose slave dir ("DFID")\n", PFID(&rpid)); } else { tgt_exp = lmv_find_export(lmv, &rpid); + sop_data->op_cksplit = 1; } if (IS_ERR(tgt_exp)) GOTO(out_free_sop_data, rc = PTR_ERR(tgt_exp)); @@ -390,6 +393,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data, op_data->op_namelen); rpid = obj->lo_inodes[mea_idx].li_fid; mds = obj->lo_inodes[mea_idx].li_mds; + sop_data->op_cksplit = 0; lmv_obj_put(obj); CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n", @@ -398,6 +402,7 @@ int lmv_intent_getattr(struct obd_export *exp, struct md_op_data *op_data, rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds); if (rc) GOTO(out_free_sop_data, rc); + sop_data->op_cksplit = 1; } } @@ -555,6 +560,7 @@ int lmv_lookup_slaves(struct obd_export *exp, struct ptlrpc_request **reqp) memset(op_data, 0, sizeof(*op_data)); op_data->op_fid1 = fid; op_data->op_fid2 = fid; + op_data->op_cksplit = 0; tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); if (IS_ERR(tgt_exp)) @@ -656,11 +662,13 @@ int lmv_intent_lookup(struct obd_export *exp, struct md_op_data *op_data, op_data->op_namelen); rpid = obj->lo_inodes[mea_idx].li_fid; mds = obj->lo_inodes[mea_idx].li_mds; + sop_data->op_cksplit = 0; lmv_obj_put(obj); } else { rc = lmv_fld_lookup(lmv, &rpid, &mds); if (rc) GOTO(out_free_sop_data, rc); + sop_data->op_cksplit = 1; } CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n", @@ -685,11 +693,13 @@ repeat: rpid = obj->lo_inodes[mea_idx].li_fid; mds = obj->lo_inodes[mea_idx].li_mds; } + sop_data->op_cksplit = 0; lmv_obj_put(obj); } else { rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds); if (rc) GOTO(out_free_sop_data, rc); + sop_data->op_cksplit = 1; } fid_zero(&sop_data->op_fid2); } @@ -892,6 +902,7 @@ int lmv_revalidate_slaves(struct obd_export *exp, struct ptlrpc_request **reqp, op_data->op_fid1 = fid; op_data->op_fid2 = fid; + op_data->op_cksplit = 0; /* is obj valid? */ tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 942e6b1..7982adb 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -1138,14 +1138,16 @@ static int lmv_getattr(struct obd_export *exp, const struct lu_fid *fid, obj = lmv_obj_grab(obd, fid); - CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n", - PFID(fid), obj ? "(split)" : ""); + CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n", PFID(fid), + obj ? "(split)" : ""); - /* if object is split, then we loop over all the slaves and gather size + /* + * If object is split, then we loop over all the slaves and gather size * attribute. In ideal world we would have to gather also mds field from * all slaves, as object is spread over the cluster and this is * definitely interesting information and it is not good to loss it, - * but... */ + * but... + */ if (obj) { struct mdt_body *body; @@ -1195,9 +1197,11 @@ static int lmv_change_cbdata(struct obd_export *exp, const struct lu_fid *fid, CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid)); - /* with CMD every object can have two locks in different namespaces: + /* + * With CMD every object can have two locks in different namespaces: * lookup lock in space of mds storing direntry and update/open lock in - * space of mds storing inode */ + * space of mds storing inode. + */ for (i = 0; i < lmv->desc.ld_tgt_count; i++) md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data); @@ -1311,10 +1315,12 @@ repeat: mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount, op_data->op_name, op_data->op_namelen); op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid; + op_data->op_cksplit = 0; tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds); lmv_obj_put(obj); } else { tgt_exp = lmv_find_export(lmv, &op_data->op_fid1); + op_data->op_cksplit = 1; } if (IS_ERR(tgt_exp)) @@ -1396,6 +1402,7 @@ lmv_enqueue_slaves(struct obd_export *exp, int locktype, for (i = 0; i < mea->mea_count; i++) { memset(op_data2, 0, sizeof(*op_data2)); op_data2->op_fid1 = mea->mea_ids[i]; + op_data2->op_cksplit = 0; tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1); if (IS_ERR(tgt_exp)) @@ -1593,8 +1600,10 @@ repeat: rid = obj->lo_inodes[mea_idx].li_fid; tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds); lmv_obj_put(obj); + valid &= ~OBD_MD_FLCKSPLIT; } else { tgt_exp = lmv_find_export(lmv, &rid); + valid |= OBD_MD_FLCKSPLIT; } if (IS_ERR(tgt_exp)) RETURN(PTR_ERR(tgt_exp)); diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index ee3acf0..f7298e9 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -128,6 +128,7 @@ void mdc_create_pack(struct ptlrpc_request *req, int offset, rec->cr_time = op_data->op_mod_time; rec->cr_suppgid = op_data->op_suppgids[0]; rec->cr_flags = op_data->op_flags; + rec->cr_cksplit = op_data->op_cksplit; mdc_pack_capa(req, offset + 1, op_data->op_capa1); @@ -194,6 +195,7 @@ void mdc_open_pack(struct ptlrpc_request *req, int offset, rec->cr_rdev = rdev; rec->cr_time = op_data->op_mod_time; rec->cr_suppgid = op_data->op_suppgids[0]; + rec->cr_cksplit = op_data->op_cksplit; mdc_pack_capa(req, offset + 1, op_data->op_capa1); /* the next buffer is child capa, which is used for replay, @@ -297,6 +299,7 @@ void mdc_unlink_pack(struct ptlrpc_request *req, int offset, rec->ul_fid1 = op_data->op_fid1; rec->ul_fid2 = op_data->op_fid2; rec->ul_time = op_data->op_mod_time; + rec->ul_cksplit = op_data->op_cksplit; mdc_pack_capa(req, offset + 1, op_data->op_capa1); @@ -322,6 +325,7 @@ void mdc_link_pack(struct ptlrpc_request *req, int offset, rec->lk_fid1 = op_data->op_fid1; rec->lk_fid2 = op_data->op_fid2; rec->lk_time = op_data->op_mod_time; + rec->lk_cksplit = op_data->op_cksplit; mdc_pack_capa(req, offset + 1, op_data->op_capa1); mdc_pack_capa(req, offset + 2, op_data->op_capa2); @@ -350,6 +354,7 @@ void mdc_rename_pack(struct ptlrpc_request *req, int offset, rec->rn_fid2 = op_data->op_fid2; rec->rn_time = op_data->op_mod_time; rec->rn_mode = op_data->op_mode; + rec->rn_cksplit = op_data->op_cksplit; mdc_pack_capa(req, offset + 1, op_data->op_capa1); mdc_pack_capa(req, offset + 2, op_data->op_capa2); diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 99f5c0b..d14640a 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -218,9 +218,9 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data, ENTRY; size[REQ_REC_OFF + 1] = op_data->op_capa1 ? - sizeof(struct lustre_capa) : 0; + sizeof(struct lustre_capa) : 0; size[REQ_REC_OFF + 2] = op_data->op_capa2 ? - sizeof(struct lustre_capa) : 0; + sizeof(struct lustre_capa) : 0; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, MDS_REINT, 5, size, NULL); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 49d956c..fd6f504 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -235,7 +235,7 @@ int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid, size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0; /* - * XXX do we need to make another request here? We just did a getattr + * XXX: Do we need to make another request here? We just did a getattr * to do the lookup in the first place. */ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, @@ -248,7 +248,8 @@ int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid, if (valid & OBD_MD_FLRMTPERM) acl_size = sizeof(struct mdt_remote_perm); - /* currently only root inode will call us with FLACL */ + + /* Currently only root inode will call us with FLACL */ else if (valid & OBD_MD_FLACL) acl_size = LUSTRE_POSIX_ACL_MAX_SIZE; diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 2a4772b..265d311 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -69,7 +69,7 @@ __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj, static int mdd_lookup(const struct lu_env *env, struct md_object *pobj, const char *name, - struct lu_fid* fid) + struct lu_fid* fid, struct md_op_spec *spec) { int rc; ENTRY; @@ -826,7 +826,7 @@ static int mdd_cd_sanity_check(const struct lu_env *env, static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, struct md_object *cobj, - const struct md_create_spec *spec, + const struct md_op_spec *spec, struct md_attr *ma) { struct mdd_device *mdd = mdo2mdd(cobj); @@ -1043,7 +1043,7 @@ static int mdd_create_sanity_check(const struct lu_env *env, static int mdd_create(const struct lu_env *env, struct md_object *pobj, const char *name, struct md_object *child, - struct md_create_spec *spec, + struct md_op_spec *spec, struct md_attr* ma) { struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix; diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index dafe88c..fa97b18 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -126,7 +126,7 @@ int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj, int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *parent, struct mdd_object *child, struct lov_mds_md **lmm, int *lmm_size, - const struct md_create_spec *spec, struct lu_attr *la); + const struct md_op_spec *spec, struct lu_attr *la); void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd, int rc); int mdd_get_md(const struct lu_env *env, struct mdd_object *obj, diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index d5f6bf5..c1cac97 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -387,7 +387,7 @@ void mdd_lov_create_finish(const struct lu_env *env, int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *parent, struct mdd_object *child, struct lov_mds_md **lmm, int *lmm_size, - const struct md_create_spec *spec, struct lu_attr *la) + const struct md_op_spec *spec, struct lu_attr *la) { struct obd_device *obd = mdd2obd_dev(mdd); struct obd_export *lov_exp = obd->u.mds.mds_osc_exp; diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 35db61d..46f50fc 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -969,7 +969,7 @@ static int mdd_oc_sanity_check(const struct lu_env *env, static int mdd_object_create(const struct lu_env *env, struct md_object *obj, - const struct md_create_spec *spec, + const struct md_op_spec *spec, struct md_attr *ma) { diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 07e9630..c0e5b0f 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -702,7 +702,8 @@ static int mdt_raw_lookup(struct mdt_thread_info *info, RETURN(0); /* Only got the fid of this obj by name */ - rc = mdo_lookup(info->mti_env, next, name, child_fid); + rc = mdo_lookup(info->mti_env, next, name, child_fid, + &info->mti_spec); if (rc != 0) { if (rc == -ENOENT) mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); @@ -825,7 +826,8 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, RETURN(rc); /* step 2: lookup child's fid by name */ - rc = mdo_lookup(info->mti_env, next, name, child_fid); + rc = mdo_lookup(info->mti_env, next, name, child_fid, + &info->mti_spec); if (rc != 0) { if (rc == -ENOENT) mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); @@ -914,9 +916,11 @@ static int mdt_getattr_name(struct mdt_thread_info *info) ENTRY; reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY); - LASSERT(reqbody); + LASSERT(reqbody != NULL); repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); - LASSERT(repbody); + LASSERT(repbody != NULL); + + info->mti_spec.sp_ck_split = (reqbody->valid & OBD_MD_FLCKSPLIT); repbody->eadatasize = 0; repbody->aclsize = 0; @@ -2045,6 +2049,9 @@ static void mdt_thread_info_init(struct ptlrpc_request *req, info->mti_has_trans = 0; info->mti_no_need_trans = 0; info->mti_opdata = 0; + + /* To not check for split by default. */ + info->mti_spec.sp_ck_split = 0; } static void mdt_thread_info_fini(struct mdt_thread_info *info) diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index eb7f37bd..3581ff0 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -341,11 +341,11 @@ struct mdt_thread_info { * reint record. contains information for reint operations. */ struct mdt_reint_record mti_rr; + /* * Create specification */ - struct md_create_spec mti_spec; - + struct md_op_spec mti_spec; /* * XXX: Part Four: diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index aa49766..ef1837d 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -779,7 +779,7 @@ static int mdt_create_unpack(struct mdt_thread_info *info) struct lu_attr *attr = &info->mti_attr.ma_attr; struct mdt_reint_record *rr = &info->mti_rr; struct req_capsule *pill = &info->mti_pill; - struct md_create_spec *sp = &info->mti_spec; + struct md_op_spec *sp = &info->mti_spec; ENTRY; rec = req_capsule_client_get(pill, &RMF_REC_CREATE); @@ -803,8 +803,9 @@ static int mdt_create_unpack(struct mdt_thread_info *info) attr->la_atime = rec->cr_time; attr->la_valid = LA_MODE | LA_RDEV | LA_UID | LA_GID | LA_CTIME | LA_MTIME | LA_ATIME; - memset(&sp->u, 0, sizeof sp->u); + memset(&sp->u, 0, sizeof(sp->u)); sp->sp_cr_flags = rec->cr_flags; + sp->sp_ck_split = rec->cr_cksplit; if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) mdt_set_capainfo(info, 0, rr->rr_fid1, @@ -823,7 +824,7 @@ static int mdt_create_unpack(struct mdt_thread_info *info) RCL_CLIENT)); sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA); sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT); + RCL_CLIENT); RETURN(0); } #endif @@ -896,6 +897,7 @@ static int mdt_link_unpack(struct mdt_thread_info *info) if (rr->rr_name == NULL) RETURN(-EFAULT); rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT); + info->mti_spec.sp_ck_split = rec->lk_cksplit; RETURN(0); } @@ -936,6 +938,7 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info) if (rr->rr_name == NULL) RETURN(-EFAULT); rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT); + info->mti_spec.sp_ck_split = rec->ul_cksplit; RETURN(0); } @@ -982,6 +985,7 @@ static int mdt_rename_unpack(struct mdt_thread_info *info) RETURN(-EFAULT); rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT); rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT); + info->mti_spec.sp_ck_split = rec->rn_cksplit; RETURN(0); } @@ -1021,6 +1025,8 @@ static int mdt_open_unpack(struct mdt_thread_info *info) info->mti_spec.sp_cr_flags = rec->cr_flags; info->mti_replayepoch = rec->cr_ioepoch; + info->mti_spec.sp_ck_split = rec->cr_cksplit; + if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT)) mdt_set_capainfo(info, 0, rr->rr_fid1, req_capsule_client_get(pill, &RMF_CAPA1)); @@ -1035,7 +1041,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info) rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT); if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) { - struct md_create_spec *sp = &info->mti_spec; + struct md_op_spec *sp = &info->mti_spec; sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA); sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 91e3e31..585c140 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -75,7 +75,7 @@ void mdt_mfd_free(struct mdt_file_data *mfd) static int mdt_create_data(struct mdt_thread_info *info, struct mdt_object *p, struct mdt_object *o) { - struct md_create_spec *spec = &info->mti_spec; + struct md_op_spec *spec = &info->mti_spec; struct md_attr *ma = &info->mti_attr; int rc; ENTRY; @@ -801,7 +801,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) GOTO(out, result = PTR_ERR(parent)); result = mdo_lookup(info->mti_env, mdt_object_child(parent), - rr->rr_name, child_fid); + rr->rr_name, child_fid, &info->mti_spec); if (result != 0 && result != -ENOENT && result != -ESTALE) GOTO(out_parent, result); diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index e03dfcf..785d69b 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -410,7 +410,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, /* step 2: find & lock the child */ rc = mdo_lookup(info->mti_env, mdt_object_child(mp), - rr->rr_name, child_fid); + rr->rr_name, child_fid, &info->mti_spec); if (rc != 0) GOTO(out_unlock_parent, rc); @@ -545,7 +545,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID, rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1)); - /* step 1: lookup & lock the tgt dir */ + /* step 1: lookup & lock the tgt dir. */ lh_tgtdir = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt, rr->rr_tgtlen); @@ -554,10 +554,10 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) if (IS_ERR(mtgtdir)) GOTO(out, rc = PTR_ERR(mtgtdir)); - /*step 2: find & lock the target object if exists*/ + /* step 2: find & lock the target object if exists. */ mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA); rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir), - rr->rr_tgt, tgt_fid); + rr->rr_tgt, tgt_fid, &info->mti_spec); if (rc != 0 && rc != -ENOENT) { GOTO(out_unlock_tgtdir, rc); } else if (rc == 0) { @@ -760,7 +760,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info, /* step 3: find & lock the old object. */ rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir), - rr->rr_name, old_fid); + rr->rr_name, old_fid, &info->mti_spec); if (rc != 0) GOTO(out_unlock_target, rc); @@ -783,7 +783,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info, /* step 4: find & lock the new object. */ /* new target object may not exist now */ rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir), - rr->rr_tgt, new_fid); + rr->rr_tgt, new_fid, &info->mti_spec); if (rc == 0) { /* the new_fid should have been filled at this moment */ if (lu_fid_eq(old_fid, new_fid)) diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 47faddc..cd008b1 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1827,7 +1827,7 @@ void lustre_swab_mdt_rec_create (struct mdt_rec_create *cr) __swab64s (&cr->cr_rdev); __swab64s (&cr->cr_ioepoch); __swab32s (&cr->cr_suppgid); - CLASSERT(offsetof(typeof(*cr), cr_padding_1) != 0); + __swab32s (&cr->cr_cksplit); CLASSERT(offsetof(typeof(*cr), cr_padding_2) != 0); CLASSERT(offsetof(typeof(*cr), cr_padding_3) != 0); } @@ -1860,7 +1860,7 @@ void lustre_swab_mdt_rec_link (struct mdt_rec_link *lk) lustre_swab_lu_fid (&lk->lk_fid1); lustre_swab_lu_fid (&lk->lk_fid2); __swab64s (&lk->lk_time); - CLASSERT(offsetof(typeof(*lk), lk_padding_1) != 0); + __swab32s (&lk->lk_cksplit); CLASSERT(offsetof(typeof(*lk), lk_padding_2) != 0); CLASSERT(offsetof(typeof(*lk), lk_padding_3) != 0); CLASSERT(offsetof(typeof(*lk), lk_padding_4) != 0); @@ -1894,7 +1894,7 @@ void lustre_swab_mdt_rec_unlink (struct mdt_rec_unlink *ul) lustre_swab_lu_fid (&ul->ul_fid1); lustre_swab_lu_fid (&ul->ul_fid2); __swab64s (&ul->ul_time); - CLASSERT(offsetof(typeof(*ul), ul_padding_1) != 0); + __swab32s (&ul->ul_cksplit); CLASSERT(offsetof(typeof(*ul), ul_padding_2) != 0); CLASSERT(offsetof(typeof(*ul), ul_padding_3) != 0); CLASSERT(offsetof(typeof(*ul), ul_padding_4) != 0); @@ -1929,7 +1929,7 @@ void lustre_swab_mdt_rec_rename (struct mdt_rec_rename *rn) lustre_swab_lu_fid (&rn->rn_fid2); __swab64s (&rn->rn_time); __swab32s (&rn->rn_mode); - CLASSERT(offsetof(typeof(*rn), rn_padding_2) != 0); + __swab32s (&rn->rn_cksplit); CLASSERT(offsetof(typeof(*rn), rn_padding_3) != 0); CLASSERT(offsetof(typeof(*rn), rn_padding_4) != 0); } -- 1.8.3.1