struct lu_rdpg cmi_rdpg;
/* pointers to pages for readpage. */
struct page *cmi_pages[CMM_SPLIT_PAGE_COUNT];
- struct md_create_spec cmi_spec;
+ struct md_op_spec cmi_spec;
struct lmv_stripe_md cmi_lmv;
char cmi_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
};
int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
struct md_attr *ma, int *split);
-int cmm_split_try(const struct lu_env *env, struct md_object *mo);
+int cmm_split_dir(const struct lu_env *env, struct md_object *mo);
int cmm_split_access(const struct lu_env *env, struct md_object *mo,
mdl_mode_t lm);
enum {
LPROC_CMM_SPLIT_CHECK = 0,
- LPROC_CMM_SPLIT_EXEC,
+ LPROC_CMM_SPLIT,
+ LPROC_CMM_LOOKUP,
LPROC_CMM_LAST
};
cmm->cmm_stats = stats;
+ lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_LOOKUP,
+ LPROCFS_CNTR_AVGMINMAX, "lookup", "time");
+ lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_SPLIT,
+ LPROCFS_CNTR_AVGMINMAX, "split", "time");
lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_SPLIT_CHECK,
LPROCFS_CNTR_AVGMINMAX, "split_check", "time");
- lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_SPLIT_EXEC,
- LPROCFS_CNTR_AVGMINMAX, "split_exec", "time");
EXIT;
cleanup:
if (rc) {
/* CMM local md_object operations */
static int cml_object_create(const struct lu_env *env,
struct md_object *mo,
- const struct md_create_spec *spec,
+ const struct md_op_spec *spec,
struct md_attr *attr)
{
int rc;
/* md_dir operations */
static int cml_lookup(const struct lu_env *env, struct md_object *mo_p,
- const char *name, struct lu_fid *lf)
+ const char *name, struct lu_fid *lf,
+ struct md_op_spec *spec)
{
+ struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo_p));
+ struct timeval start;
int rc;
ENTRY;
+ cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_LOOKUP);
+
#ifdef HAVE_SPLIT_SUPPORT
- rc = cmm_split_check(env, mo_p, name);
- if (rc)
- RETURN(rc);
+ if (spec != NULL && spec->sp_ck_split) {
+ rc = cmm_split_check(env, mo_p, name);
+ if (rc) {
+ cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_LOOKUP);
+ RETURN(rc);
+ }
+ }
#endif
- rc = mdo_lookup(env, md_object_next(mo_p), name, lf);
+ rc = mdo_lookup(env, md_object_next(mo_p), name, lf, spec);
+ cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_LOOKUP);
RETURN(rc);
}
static int cml_create(const struct lu_env *env, struct md_object *mo_p,
const char *name, struct md_object *mo_c,
- struct md_create_spec *spec, struct md_attr *ma)
+ struct md_op_spec *spec, struct md_attr *ma)
{
int rc;
ENTRY;
* -ERESTART to client to let it know that correct MDT should be
* choosen.
*/
- rc = cmm_split_try(env, mo_p);
- if (rc) {
+ rc = cmm_split_dir(env, mo_p);
+ if (rc)
/*
* -ERESTART or some split error is returned, we can't
* proceed with create.
*/
RETURN(rc);
- }
}
-
- /*
- * Check for possible split directory and let caller know that it should
- * tell client that directory is split and operation should repeat to
- * correct MDT.
- */
- rc = cmm_split_check(env, mo_p, name);
- if (rc)
- RETURN(rc);
+
+ if (spec != NULL && spec->sp_ck_split) {
+ /*
+ * Check for possible split directory and let caller know that
+ * it should tell client that directory is split and operation
+ * should repeat to correct MDT.
+ */
+ rc = cmm_split_check(env, mo_p, name);
+ if (rc)
+ RETURN(rc);
+ }
#endif
rc = mdo_create(env, md_object_next(mo_p), name, md_object_next(mo_c),
static int cml_create_data(const struct lu_env *env, struct md_object *p,
struct md_object *o,
- const struct md_create_spec *spec,
+ const struct md_op_spec *spec,
struct md_attr *ma)
{
int rc;
/* CMM remote md_object operations. All are invalid */
static int cmr_object_create(const struct lu_env *env,
struct md_object *mo,
- const struct md_create_spec *spec,
+ const struct md_op_spec *spec,
struct md_attr *ma)
{
return -EFAULT;
/* remote part of md_dir operations */
static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p,
- const char *name, struct lu_fid *lf)
+ const char *name, struct lu_fid *lf,
+ struct md_op_spec *spec)
{
/*
* This can happens while rename() If new parent is remote dir, lookup
*/
static int cmr_create(const struct lu_env *env, struct md_object *mo_p,
const char *child_name, struct md_object *mo_c,
- struct md_create_spec *spec,
+ struct md_op_spec *spec,
struct md_attr *ma)
{
struct cmm_thread_info *cmi;
/* Make sure that name isn't exist before doing remote call. */
rc = mdo_lookup(env, md_object_next(mo_p), name,
- &cmm_env_info(env)->cmi_fid);
+ &cmm_env_info(env)->cmi_fid, NULL);
if (rc == 0) {
rc = -EEXIST;
} else if (rc == -ENOENT) {
struct lmv_stripe_md *lmv,
int lmv_size)
{
- struct md_create_spec *spec = &cmm_env_info(env)->cmi_spec;
+ struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
struct cmm_object *obj;
int rc;
ENTRY;
return rc;
}
-int cmm_split_try(const struct lu_env *env, struct md_object *mo)
+int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
struct timeval start;
ENTRY;
- cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_SPLIT_EXEC);
+ cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_SPLIT);
LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
memset(ma, 0, sizeof(*ma));
/*
* Disable transacrions for split, since there will be so many trans in
- * this one ops, confilct with current recovery design.
+ * this one ops, conflict with current recovery design.
*/
rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
if (rc) {
cleanup:
OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
out:
- cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_SPLIT_EXEC);
+ cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_SPLIT);
return rc;
}
static int mdc_object_create(const struct lu_env *env,
struct md_object *mo,
- const struct md_create_spec *spec,
+ const struct md_op_spec *spec,
struct md_attr *ma)
{
struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
/* Capa fields */
struct obd_capa *op_capa1;
struct obd_capa *op_capa2;
+
+ /* Should server check split in lookups or not. */
+ int op_cksplit;
};
#define MDS_MODE_DONT_LOCK (1 << 30)
#define OBD_MD_FLRMTPERM (0x0000010000000000ULL) /* remote permission */
#define OBD_MD_FLMDSCAPA (0x0000020000000000ULL) /* MDS capability */
#define OBD_MD_FLOSSCAPA (0x0000040000000000ULL) /* OSS capability */
+#define OBD_MD_FLCKSPLIT (0x0000080000000000ULL) /* Check split on server */
#define OBD_MD_FLGETATTR (OBD_MD_FLID | OBD_MD_FLATIME | OBD_MD_FLMTIME | \
OBD_MD_FLCTIME | OBD_MD_FLSIZE | OBD_MD_FLBLKSZ | \
__u64 cr_rdev;
__u64 cr_ioepoch;
__u32 cr_suppgid;
- __u32 cr_padding_1; /* also fix lustre_swab_mds_rec_create */
+ __u32 cr_cksplit;
__u32 cr_padding_2; /* also fix lustre_swab_mds_rec_create */
__u32 cr_padding_3; /* also fix lustre_swab_mds_rec_create */
};
struct lu_fid lk_fid1;
struct lu_fid lk_fid2;
__u64 lk_time;
- __u32 lk_padding_1; /* also fix lustre_swab_mds_rec_link */
+ __u32 lk_cksplit;
__u32 lk_padding_2; /* also fix lustre_swab_mds_rec_link */
__u32 lk_padding_3; /* also fix lustre_swab_mds_rec_link */
__u32 lk_padding_4; /* also fix lustre_swab_mds_rec_link */
struct lu_fid ul_fid1;
struct lu_fid ul_fid2;
__u64 ul_time;
- __u32 ul_padding_1; /* also fix lustre_swab_mds_rec_unlink */
+ __u32 ul_cksplit;
__u32 ul_padding_2; /* also fix lustre_swab_mds_rec_unlink */
__u32 ul_padding_3; /* also fix lustre_swab_mds_rec_unlink */
__u32 ul_padding_4; /* also fix lustre_swab_mds_rec_unlink */
struct lu_fid rn_fid2;
__u64 rn_time;
__u32 rn_mode; /* cross-ref rename has mode */
- __u32 rn_padding_2; /* also fix lustre_swab_mdt_rec_rename */
+ __u32 rn_cksplit; /* check for split or not */
__u32 rn_padding_3; /* also fix lustre_swab_mdt_rec_rename */
__u32 rn_padding_4; /* also fix lustre_swab_mdt_rec_rename */
};
int ma_cookie_size;
};
-/* additional parameters for create */
-struct md_create_spec {
+/* Additional parameters for create */
+struct md_op_spec {
union {
/* symlink target */
const char *sp_symname;
/* Current lock mode for parent dir where create is performing. */
mdl_mode_t sp_cr_mode;
+
+ /* Check for split */
+ int sp_ck_split;
};
/*
/* part of cross-ref operation */
int (*moo_object_create)(const struct lu_env *env,
struct md_object *obj,
- const struct md_create_spec *spec,
+ const struct md_op_spec *spec,
struct md_attr *ma);
int (*moo_ref_add)(const struct lu_env *env, struct md_object *obj);
const struct lu_fid *fid, struct lu_fid *sfid);
int (*mdo_lookup)(const struct lu_env *env, struct md_object *obj,
- const char *name, struct lu_fid *fid);
+ const char *name, struct lu_fid *fid,
+ struct md_op_spec *spec);
mdl_mode_t (*mdo_lock_mode)(const struct lu_env *env, struct md_object *obj,
mdl_mode_t mode);
int (*mdo_create)(const struct lu_env *env, struct md_object *pobj,
const char *name, struct md_object *child,
- struct md_create_spec *spec,
+ struct md_op_spec *spec,
struct md_attr *ma);
/* This method is used for creating data object for this meta object*/
int (*mdo_create_data)(const struct lu_env *env, struct md_object *p,
struct md_object *o,
- const struct md_create_spec *spec,
+ const struct md_op_spec *spec,
struct md_attr *ma);
int (*mdo_rename)(const struct lu_env *env, struct md_object *spobj,
static inline int mo_object_create(const struct lu_env *env,
struct md_object *m,
- const struct md_create_spec *spc,
+ const struct md_op_spec *spc,
struct md_attr *at)
{
LASSERT(m->mo_ops->moo_object_create);
static inline int mdo_lookup(const struct lu_env *env,
struct md_object *p,
const char *name,
- struct lu_fid *f)
+ struct lu_fid *f,
+ struct md_op_spec *spec)
{
LASSERT(p->mo_dir_ops->mdo_lookup);
- return p->mo_dir_ops->mdo_lookup(env, p, name, f);
+ return p->mo_dir_ops->mdo_lookup(env, p, name, f, spec);
}
static inline mdl_mode_t mdo_lock_mode(const struct lu_env *env,
struct md_object *p,
const char *child_name,
struct md_object *c,
- struct md_create_spec *spc,
+ struct md_op_spec *spc,
struct md_attr *at)
{
LASSERT(c->mo_dir_ops->mdo_create);
static inline int mdo_create_data(const struct lu_env *env,
struct md_object *p,
struct md_object *c,
- const struct md_create_spec *spec,
+ const struct md_op_spec *spec,
struct md_attr *ma)
{
LASSERT(c->mo_dir_ops->mdo_create_data);
GOTO(out, rc = -ENOMEM);
op_data->op_fid1 = body->fid1;
+ op_data->op_cksplit = 0;
rc = md_intent_lock(tgt_exp, op_data, lmm, lmmsize, it, flags,
&req, cb_blocking, extra_lock_flags);
rpid = obj->lo_inodes[mea_idx].li_fid;
tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
+ sop_data->op_cksplit = 0;
lmv_obj_put(obj);
CDEBUG(D_OTHER, "Choose slave dir ("DFID")\n", PFID(&rpid));
} else {
tgt_exp = lmv_find_export(lmv, &rpid);
+ sop_data->op_cksplit = 1;
}
if (IS_ERR(tgt_exp))
GOTO(out_free_sop_data, rc = PTR_ERR(tgt_exp));
op_data->op_namelen);
rpid = obj->lo_inodes[mea_idx].li_fid;
mds = obj->lo_inodes[mea_idx].li_mds;
+ sop_data->op_cksplit = 0;
lmv_obj_put(obj);
CDEBUG(D_OTHER, "forward to MDS #"LPU64" (slave "DFID")\n",
rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
if (rc)
GOTO(out_free_sop_data, rc);
+ sop_data->op_cksplit = 1;
}
}
memset(op_data, 0, sizeof(*op_data));
op_data->op_fid1 = fid;
op_data->op_fid2 = fid;
+ op_data->op_cksplit = 0;
tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
if (IS_ERR(tgt_exp))
op_data->op_namelen);
rpid = obj->lo_inodes[mea_idx].li_fid;
mds = obj->lo_inodes[mea_idx].li_mds;
+ sop_data->op_cksplit = 0;
lmv_obj_put(obj);
} else {
rc = lmv_fld_lookup(lmv, &rpid, &mds);
if (rc)
GOTO(out_free_sop_data, rc);
+ sop_data->op_cksplit = 1;
}
CDEBUG(D_OTHER, "revalidate lookup for "DFID" to #"LPU64" MDS\n",
rpid = obj->lo_inodes[mea_idx].li_fid;
mds = obj->lo_inodes[mea_idx].li_mds;
}
+ sop_data->op_cksplit = 0;
lmv_obj_put(obj);
} else {
rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
if (rc)
GOTO(out_free_sop_data, rc);
+ sop_data->op_cksplit = 1;
}
fid_zero(&sop_data->op_fid2);
}
op_data->op_fid1 = fid;
op_data->op_fid2 = fid;
+ op_data->op_cksplit = 0;
/* is obj valid? */
tgt_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
obj = lmv_obj_grab(obd, fid);
- CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n",
- PFID(fid), obj ? "(split)" : "");
+ CDEBUG(D_OTHER, "GETATTR for "DFID" %s\n", PFID(fid),
+ obj ? "(split)" : "");
- /* if object is split, then we loop over all the slaves and gather size
+ /*
+ * If object is split, then we loop over all the slaves and gather size
* attribute. In ideal world we would have to gather also mds field from
* all slaves, as object is spread over the cluster and this is
* definitely interesting information and it is not good to loss it,
- * but... */
+ * but...
+ */
if (obj) {
struct mdt_body *body;
CDEBUG(D_OTHER, "CBDATA for "DFID"\n", PFID(fid));
- /* with CMD every object can have two locks in different namespaces:
+ /*
+ * With CMD every object can have two locks in different namespaces:
* lookup lock in space of mds storing direntry and update/open lock in
- * space of mds storing inode */
+ * space of mds storing inode.
+ */
for (i = 0; i < lmv->desc.ld_tgt_count; i++)
md_change_cbdata(lmv->tgts[i].ltd_exp, fid, it, data);
mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
op_data->op_name, op_data->op_namelen);
op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
+ op_data->op_cksplit = 0;
tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
lmv_obj_put(obj);
} else {
tgt_exp = lmv_find_export(lmv, &op_data->op_fid1);
+ op_data->op_cksplit = 1;
}
if (IS_ERR(tgt_exp))
for (i = 0; i < mea->mea_count; i++) {
memset(op_data2, 0, sizeof(*op_data2));
op_data2->op_fid1 = mea->mea_ids[i];
+ op_data2->op_cksplit = 0;
tgt_exp = lmv_find_export(lmv, &op_data2->op_fid1);
if (IS_ERR(tgt_exp))
rid = obj->lo_inodes[mea_idx].li_fid;
tgt_exp = lmv_get_export(lmv, obj->lo_inodes[mea_idx].li_mds);
lmv_obj_put(obj);
+ valid &= ~OBD_MD_FLCKSPLIT;
} else {
tgt_exp = lmv_find_export(lmv, &rid);
+ valid |= OBD_MD_FLCKSPLIT;
}
if (IS_ERR(tgt_exp))
RETURN(PTR_ERR(tgt_exp));
rec->cr_time = op_data->op_mod_time;
rec->cr_suppgid = op_data->op_suppgids[0];
rec->cr_flags = op_data->op_flags;
+ rec->cr_cksplit = op_data->op_cksplit;
mdc_pack_capa(req, offset + 1, op_data->op_capa1);
rec->cr_rdev = rdev;
rec->cr_time = op_data->op_mod_time;
rec->cr_suppgid = op_data->op_suppgids[0];
+ rec->cr_cksplit = op_data->op_cksplit;
mdc_pack_capa(req, offset + 1, op_data->op_capa1);
/* the next buffer is child capa, which is used for replay,
rec->ul_fid1 = op_data->op_fid1;
rec->ul_fid2 = op_data->op_fid2;
rec->ul_time = op_data->op_mod_time;
+ rec->ul_cksplit = op_data->op_cksplit;
mdc_pack_capa(req, offset + 1, op_data->op_capa1);
rec->lk_fid1 = op_data->op_fid1;
rec->lk_fid2 = op_data->op_fid2;
rec->lk_time = op_data->op_mod_time;
+ rec->lk_cksplit = op_data->op_cksplit;
mdc_pack_capa(req, offset + 1, op_data->op_capa1);
mdc_pack_capa(req, offset + 2, op_data->op_capa2);
rec->rn_fid2 = op_data->op_fid2;
rec->rn_time = op_data->op_mod_time;
rec->rn_mode = op_data->op_mode;
+ rec->rn_cksplit = op_data->op_cksplit;
mdc_pack_capa(req, offset + 1, op_data->op_capa1);
mdc_pack_capa(req, offset + 2, op_data->op_capa2);
ENTRY;
size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
- sizeof(struct lustre_capa) : 0;
+ sizeof(struct lustre_capa) : 0;
size[REQ_REC_OFF + 2] = op_data->op_capa2 ?
- sizeof(struct lustre_capa) : 0;
+ sizeof(struct lustre_capa) : 0;
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
MDS_REINT, 5, size, NULL);
size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0;
/*
- * XXX do we need to make another request here? We just did a getattr
+ * XXX: Do we need to make another request here? We just did a getattr
* to do the lookup in the first place.
*/
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
if (valid & OBD_MD_FLRMTPERM)
acl_size = sizeof(struct mdt_remote_perm);
- /* currently only root inode will call us with FLACL */
+
+ /* Currently only root inode will call us with FLACL */
else if (valid & OBD_MD_FLACL)
acl_size = LUSTRE_POSIX_ACL_MAX_SIZE;
static int mdd_lookup(const struct lu_env *env,
struct md_object *pobj, const char *name,
- struct lu_fid* fid)
+ struct lu_fid* fid, struct md_op_spec *spec)
{
int rc;
ENTRY;
static int mdd_create_data(const struct lu_env *env,
struct md_object *pobj, struct md_object *cobj,
- const struct md_create_spec *spec,
+ const struct md_op_spec *spec,
struct md_attr *ma)
{
struct mdd_device *mdd = mdo2mdd(cobj);
static int mdd_create(const struct lu_env *env,
struct md_object *pobj, const char *name,
struct md_object *child,
- struct md_create_spec *spec,
+ struct md_op_spec *spec,
struct md_attr* ma)
{
struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
struct mdd_object *parent, struct mdd_object *child,
struct lov_mds_md **lmm, int *lmm_size,
- const struct md_create_spec *spec, struct lu_attr *la);
+ const struct md_op_spec *spec, struct lu_attr *la);
void mdd_lov_create_finish(const struct lu_env *env,
struct mdd_device *mdd, int rc);
int mdd_get_md(const struct lu_env *env, struct mdd_object *obj,
int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
struct mdd_object *parent, struct mdd_object *child,
struct lov_mds_md **lmm, int *lmm_size,
- const struct md_create_spec *spec, struct lu_attr *la)
+ const struct md_op_spec *spec, struct lu_attr *la)
{
struct obd_device *obd = mdd2obd_dev(mdd);
struct obd_export *lov_exp = obd->u.mds.mds_osc_exp;
static int mdd_object_create(const struct lu_env *env,
struct md_object *obj,
- const struct md_create_spec *spec,
+ const struct md_op_spec *spec,
struct md_attr *ma)
{
RETURN(0);
/* Only got the fid of this obj by name */
- rc = mdo_lookup(info->mti_env, next, name, child_fid);
+ rc = mdo_lookup(info->mti_env, next, name, child_fid,
+ &info->mti_spec);
if (rc != 0) {
if (rc == -ENOENT)
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
RETURN(rc);
/* step 2: lookup child's fid by name */
- rc = mdo_lookup(info->mti_env, next, name, child_fid);
+ rc = mdo_lookup(info->mti_env, next, name, child_fid,
+ &info->mti_spec);
if (rc != 0) {
if (rc == -ENOENT)
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
ENTRY;
reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
- LASSERT(reqbody);
+ LASSERT(reqbody != NULL);
repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
- LASSERT(repbody);
+ LASSERT(repbody != NULL);
+
+ info->mti_spec.sp_ck_split = (reqbody->valid & OBD_MD_FLCKSPLIT);
repbody->eadatasize = 0;
repbody->aclsize = 0;
info->mti_has_trans = 0;
info->mti_no_need_trans = 0;
info->mti_opdata = 0;
+
+ /* To not check for split by default. */
+ info->mti_spec.sp_ck_split = 0;
}
static void mdt_thread_info_fini(struct mdt_thread_info *info)
* reint record. contains information for reint operations.
*/
struct mdt_reint_record mti_rr;
+
/*
* Create specification
*/
- struct md_create_spec mti_spec;
-
+ struct md_op_spec mti_spec;
/*
* XXX: Part Four:
struct lu_attr *attr = &info->mti_attr.ma_attr;
struct mdt_reint_record *rr = &info->mti_rr;
struct req_capsule *pill = &info->mti_pill;
- struct md_create_spec *sp = &info->mti_spec;
+ struct md_op_spec *sp = &info->mti_spec;
ENTRY;
rec = req_capsule_client_get(pill, &RMF_REC_CREATE);
attr->la_atime = rec->cr_time;
attr->la_valid = LA_MODE | LA_RDEV | LA_UID | LA_GID |
LA_CTIME | LA_MTIME | LA_ATIME;
- memset(&sp->u, 0, sizeof sp->u);
+ memset(&sp->u, 0, sizeof(sp->u));
sp->sp_cr_flags = rec->cr_flags;
+ sp->sp_ck_split = rec->cr_cksplit;
if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
mdt_set_capainfo(info, 0, rr->rr_fid1,
RCL_CLIENT));
sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA);
sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
- RCL_CLIENT);
+ RCL_CLIENT);
RETURN(0);
}
#endif
if (rr->rr_name == NULL)
RETURN(-EFAULT);
rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
+ info->mti_spec.sp_ck_split = rec->lk_cksplit;
RETURN(0);
}
if (rr->rr_name == NULL)
RETURN(-EFAULT);
rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
+ info->mti_spec.sp_ck_split = rec->ul_cksplit;
RETURN(0);
}
RETURN(-EFAULT);
rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
rr->rr_tgtlen = req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT);
+ info->mti_spec.sp_ck_split = rec->rn_cksplit;
RETURN(0);
}
info->mti_spec.sp_cr_flags = rec->cr_flags;
info->mti_replayepoch = rec->cr_ioepoch;
+ info->mti_spec.sp_ck_split = rec->cr_cksplit;
+
if (req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
mdt_set_capainfo(info, 0, rr->rr_fid1,
req_capsule_client_get(pill, &RMF_CAPA1));
rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
- struct md_create_spec *sp = &info->mti_spec;
+ struct md_op_spec *sp = &info->mti_spec;
sp->u.sp_ea.eadata = req_capsule_client_get(pill,
&RMF_EADATA);
sp->u.sp_ea.eadatalen = req_capsule_get_size(pill,
static int mdt_create_data(struct mdt_thread_info *info,
struct mdt_object *p, struct mdt_object *o)
{
- struct md_create_spec *spec = &info->mti_spec;
+ struct md_op_spec *spec = &info->mti_spec;
struct md_attr *ma = &info->mti_attr;
int rc;
ENTRY;
GOTO(out, result = PTR_ERR(parent));
result = mdo_lookup(info->mti_env, mdt_object_child(parent),
- rr->rr_name, child_fid);
+ rr->rr_name, child_fid, &info->mti_spec);
if (result != 0 && result != -ENOENT && result != -ESTALE)
GOTO(out_parent, result);
/* step 2: find & lock the child */
rc = mdo_lookup(info->mti_env, mdt_object_child(mp),
- rr->rr_name, child_fid);
+ rr->rr_name, child_fid, &info->mti_spec);
if (rc != 0)
GOTO(out_unlock_parent, rc);
DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID,
rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
- /* step 1: lookup & lock the tgt dir */
+ /* step 1: lookup & lock the tgt dir. */
lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
rr->rr_tgtlen);
if (IS_ERR(mtgtdir))
GOTO(out, rc = PTR_ERR(mtgtdir));
- /*step 2: find & lock the target object if exists*/
+ /* step 2: find & lock the target object if exists. */
mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
- rr->rr_tgt, tgt_fid);
+ rr->rr_tgt, tgt_fid, &info->mti_spec);
if (rc != 0 && rc != -ENOENT) {
GOTO(out_unlock_tgtdir, rc);
} else if (rc == 0) {
/* step 3: find & lock the old object. */
rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
- rr->rr_name, old_fid);
+ rr->rr_name, old_fid, &info->mti_spec);
if (rc != 0)
GOTO(out_unlock_target, rc);
/* step 4: find & lock the new object. */
/* new target object may not exist now */
rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
- rr->rr_tgt, new_fid);
+ rr->rr_tgt, new_fid, &info->mti_spec);
if (rc == 0) {
/* the new_fid should have been filled at this moment */
if (lu_fid_eq(old_fid, new_fid))
__swab64s (&cr->cr_rdev);
__swab64s (&cr->cr_ioepoch);
__swab32s (&cr->cr_suppgid);
- CLASSERT(offsetof(typeof(*cr), cr_padding_1) != 0);
+ __swab32s (&cr->cr_cksplit);
CLASSERT(offsetof(typeof(*cr), cr_padding_2) != 0);
CLASSERT(offsetof(typeof(*cr), cr_padding_3) != 0);
}
lustre_swab_lu_fid (&lk->lk_fid1);
lustre_swab_lu_fid (&lk->lk_fid2);
__swab64s (&lk->lk_time);
- CLASSERT(offsetof(typeof(*lk), lk_padding_1) != 0);
+ __swab32s (&lk->lk_cksplit);
CLASSERT(offsetof(typeof(*lk), lk_padding_2) != 0);
CLASSERT(offsetof(typeof(*lk), lk_padding_3) != 0);
CLASSERT(offsetof(typeof(*lk), lk_padding_4) != 0);
lustre_swab_lu_fid (&ul->ul_fid1);
lustre_swab_lu_fid (&ul->ul_fid2);
__swab64s (&ul->ul_time);
- CLASSERT(offsetof(typeof(*ul), ul_padding_1) != 0);
+ __swab32s (&ul->ul_cksplit);
CLASSERT(offsetof(typeof(*ul), ul_padding_2) != 0);
CLASSERT(offsetof(typeof(*ul), ul_padding_3) != 0);
CLASSERT(offsetof(typeof(*ul), ul_padding_4) != 0);
lustre_swab_lu_fid (&rn->rn_fid2);
__swab64s (&rn->rn_time);
__swab32s (&rn->rn_mode);
- CLASSERT(offsetof(typeof(*rn), rn_padding_2) != 0);
+ __swab32s (&rn->rn_cksplit);
CLASSERT(offsetof(typeof(*rn), rn_padding_3) != 0);
CLASSERT(offsetof(typeof(*rn), rn_padding_4) != 0);
}