if (ma->ma_lmv_size)
GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
-
OBD_ALLOC_PTR(fid);
rc = cmm_root_get(ctx, &cmm->cmm_md_dev, fid);
if (rc)
static int cmm_creat_remote_obj(const struct lu_context *ctx,
struct cmm_device *cmm,
- struct lu_fid *fid, struct md_attr *ma)
+ struct lu_fid *fid, struct md_attr *ma,
+ const struct lmv_stripe_md *lmv,
+ int lmv_size)
{
struct cmm_object *obj;
struct md_create_spec *spec;
RETURN(PTR_ERR(obj));
OBD_ALLOC_PTR(spec);
- spec->u.sp_pfid = fid;
+
+ spec->u.sp_ea.fid = fid;
+ spec->u.sp_ea.eadata = lmv;
+ spec->u.sp_ea.eadatalen = lmv_size;
+ spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
rc = mo_object_create(ctx, md_object_next(&obj->cmo_obj),
spec, ma);
OBD_FREE_PTR(spec);
struct md_object *mo, struct md_attr *ma)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
- struct lmv_stripe_md *lmv = NULL;
+ struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
int lmv_size, i, rc;
struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
ENTRY;
if (rc)
GOTO(cleanup, rc);
+ OBD_ALLOC_PTR(slave_lmv);
+ if (!slave_lmv)
+ GOTO(cleanup, rc = -ENOMEM);
+
+ slave_lmv->mea_master = cmm->cmm_local_num;
+ slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
+ slave_lmv->mea_count = 0;
for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
- rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma);
+ rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma,
+ slave_lmv, sizeof(slave_lmv));
if (rc)
GOTO(cleanup, rc);
}
ma->ma_lmv_size = lmv_size;
ma->ma_lmv = lmv;
cleanup:
+ if (slave_lmv)
+ OBD_FREE_PTR(slave_lmv);
RETURN(rc);
}
/* Read splitted page and send them to the slave master */
do {
+ struct lu_dirpage *ldp;
+
/* init page with '0' */
for (i = 0; i < rdpg->rp_npages; i++) {
memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
RETURN(rc);
rc = cmm_send_split_pages(ctx, mo, rdpg, lf, end);
-
+ if (rc)
+ RETURN(rc);
+
+ kmap(rdpg->rp_pages[0]);
+ ldp = page_address(rdpg->rp_pages[0]);
+ if (ldp->ldp_hash_end == ~0ul)
+ rc = -E2BIG;
+ rdpg->rp_hash = ldp->ldp_hash_end;
+ kunmap(rdpg->rp_pages[0]);
} while (rc == 0);
/* it means already finish splitting this segment */
rdpg->rp_hash = i * hash_segement;
hash_end = rdpg->rp_hash + hash_segement;
-
rc = cmm_split_entries(ctx, mo, rdpg, lf, hash_end);
if (rc)
GOTO(cleanup, rc);
struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
struct lu_attr *la = &ma->ma_attr;
struct mdc_thread_info *mci;
- const char *symname;
+ const void *symname;
int rc, symlen;
ENTRY;
+ LASSERT(spec->u.sp_pfid != NULL);
mci = mdc_info_init(ctx);
mci->mci_opdata.fid2 = *lu_object_fid(&mo->mo_lu);
/* parent fid is needed to create dotdot on the remote node */
mci->mci_opdata.mod_time = la->la_mtime;
/* get data from spec */
- symname = spec->u.sp_symname;
- symlen = symname ? strlen(symname) + 1 : 0;
+ if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
+ symname = spec->u.sp_ea.eadata;
+ symlen = spec->u.sp_ea.eadatalen;
+ mci->mci_opdata.fid1 = *(spec->u.sp_ea.fid);
+ mci->mci_opdata.flags |= MDS_CREATE_SLAVE_OBJ;
+ } else {
+ symname = spec->u.sp_symname;
+ symlen = symname ? strlen(symname) + 1 : 0;
+ }
rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata,
symname, symlen,
#define MEA_MAGIC_LAST_CHAR 0xb2221ca1
#define MEA_MAGIC_ALL_CHARS 0xb222a11c
#define MEA_MAGIC_HASH_SEGMENT 0xb222a11b
-#define MAX_HASH_SIZE 0x3fffffff
+#define MAX_HASH_SIZE 0x7fffffff
struct lmv_stripe_md {
__u32 mea_magic;
#define MDS_OPEN_DELAY_CREATE 0100000000 /* delay initial object create */
#define MDS_OPEN_OWNEROVERRIDE 0200000000 /* NFSD rw-reopen ro file for owner */
#define MDS_OPEN_JOIN_FILE 0400000000 /* open for join file*/
+#define MDS_CREATE_SLAVE_OBJ 02000000000 /* indicate create slave object
+ * actually, this is for create, not
+ * conflict with other open flags */
#define MDS_OPEN_LOCK 04000000000 /* This open requires open lock */
#define MDS_OPEN_HAS_EA 010000000000 /* specify object create pattern */
#define MDS_OPEN_HAS_OBJS 020000000000 /* Just set the EA the obj exist */
extern const struct req_format RQF_MDS_GETATTR_NAME;
extern const struct req_format RQF_MDS_REINT;
extern const struct req_format RQF_MDS_REINT_CREATE;
+extern const struct req_format RQF_MDS_REINT_CREATE_SLAVE;
+extern const struct req_format RQF_MDS_REINT_CREATE_SYM;
extern const struct req_format RQF_MDS_REINT_OPEN;
extern const struct req_format RQF_MDS_REINT_UNLINK;
extern const struct req_format RQF_MDS_REINT_LINK;
/* eadata for regular files */
struct md_spec_reg {
/* lov objs exist already */
+ const struct lu_fid *fid;
int no_lov_create;
const void *eadata;
int eadatalen;
return rc;
}
+int lmv_alloc_fid_for_split(struct obd_device *obd, struct lu_fid *pid,
+ struct md_op_data *op, struct lu_fid *fid)
+{
+ struct lmv_obj *obj;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lu_fid *rpid;
+ mdsno_t mds;
+ int rc;
+ ENTRY;
+
+ obj = lmv_obj_grab(obd, pid);
+ if (!obj)
+ RETURN(0);
+ mds = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
+ (char *)op->name, op->namelen);
+ rpid = &obj->lo_inodes[mds].li_fid;
+ rc = lmv_fld_lookup(lmv, rpid, &mds);
+ if (rc)
+ GOTO(cleanup, rc);
+
+ rc = obd_fid_alloc(lmv->tgts[mds].ltd_exp, fid, NULL);
+ if (rc < 0)
+ GOTO(cleanup, rc);
+ if (rc > 0) {
+ LASSERT(fid_is_sane(fid));
+ rc = fld_client_create(&lmv->lmv_fld,
+ fid_seq(fid), mds, NULL);
+ if (rc) {
+ CERROR("can't create fld rc%d\n", rc);
+ GOTO(cleanup, rc);
+ }
+ }
+ CDEBUG(D_INFO, "Allocate new fid"DFID"for split obj\n",PFID(fid));
+cleanup:
+ lmv_obj_put(obj);
+ RETURN(rc);
+}
+
/*
* IT_OPEN is intended to open (and create, possible) an object. Parent (pid)
* may be split dir.
rc = lmv_fld_lookup(lmv, &rpid, &mds);
if (rc)
GOTO(out_free_sop_data, rc);
+
obj = lmv_obj_grab(obd, &rpid);
if (obj) {
/*
rc = md_intent_lock(lmv->tgts[mds].ltd_exp, sop_data,
lmm, lmmsize, it, flags, reqp,
cb_blocking, extra_lock_flags);
+
if (rc == -ERESTART) {
/*
* Directory got split. Time to update local object and repeat
rc = lmv_handle_split(exp, &rpid);
if (rc == 0) {
ptlrpc_req_finished(*reqp);
+ /* We shoudld reallocate the FID for the object */
+ rc = lmv_alloc_fid_for_split(obd, &rpid, op_data,
+ &sop_data->fid2);
+ if (rc)
+ GOTO(out_free_sop_data, rc);
+ /* client switches to new sequence, setup fld */
goto repeat;
}
}
void *, int);
int lmv_fld_lookup(struct lmv_obd *lmv, const struct lu_fid *fid,
mdsno_t *mds);
+int lmv_alloc_fid_for_split(struct obd_device *obd, struct lu_fid *pid,
+ struct md_op_data *op, struct lu_fid *fid);
static inline struct lmv_stripe_md *
lmv_get_mea(struct ptlrpc_request *req, int offset)
obd_free_memmd(exp, (struct lov_stripe_md **)&md.mea);
- EXIT;
cleanup:
if (req)
ptlrpc_req_finished(req);
- return rc;
+ RETURN(rc);
}
int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
rc = lmv_handle_split(exp, &op_data->fid1);
if (rc == 0) {
ptlrpc_req_finished(*request);
+ rc = lmv_alloc_fid_for_split(obd, &op_data->fid1,
+ op_data, &op_data->fid2);
+ if (rc)
+ RETURN(rc);
goto repeat;
}
}
obj = lmv_obj_grab(obd, fid);
if (obj) {
+ __u64 index = offset;
+ __u32 seg = MAX_HASH_SIZE;
lmv_obj_lock(obj);
-
- /* find dirobj containing page with requested offset. */
- for (i = 0; i < obj->lo_objcount; i++) {
- if (offset < obj->lo_inodes[i].li_size)
- break;
- offset -= obj->lo_inodes[i].li_size;
- }
+
+ LASSERT(obj->lo_objcount > 0);
+ do_div(seg, obj->lo_objcount);
+ do_div(index, seg);
+ offset -= index * seg;
+ i = (int)index;
rid = obj->lo_inodes[i].li_fid;
lmv_obj_unlock(obj);
#ifdef __KERNEL__
if (obj && i < obj->lo_objcount - 1) {
struct lu_dirpage *dp;
- __u32 end;
+ __u32 end, max_hash = MAX_HASH_SIZE;
/*
* This dirobj has been split, so we check whether reach the end
* of one hash_segment and reset ldp->ldp_hash_end.
dp = page_address(page);
end = le32_to_cpu(dp->ldp_hash_end);
if (end == ~0ul) {
- __u32 hash_segment_end = (i + 1) *
- MAX_HASH_SIZE/obj->lo_objcount;
- dp->ldp_hash_end = cpu_to_le32(hash_segment_end);
- CDEBUG(D_INFO,"reset hash end %x for split obj "DFID"",
- le32_to_cpu(dp->ldp_hash_end), PFID(&rid));
+ __u32 seg_end;
+
+ do_div(max_hash, obj->lo_objcount);
+ seg_end = max_hash * (i + 1);
+
+ dp->ldp_hash_end = cpu_to_le32(seg_end);
+ CDEBUG(D_INFO,"reset hash end %x for split obj "DFID" "
+ "obj count %d \n",
+ le32_to_cpu(dp->ldp_hash_end), PFID(&rid),
+ obj->lo_objcount);
}
kunmap(page);
-
}
#endif
/*
rec->cr_rdev = rdev;
rec->cr_time = op_data->mod_time;
rec->cr_suppgid = op_data->suppgids[0];
-
+ rec->cr_flags = op_data->flags;
+
tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1, op_data->namelen + 1);
LOGL0(op_data->name, op_data->namelen, tmp);
if (buf && buf_len > 0) {
rc = next->do_ops->do_xattr_set(ctxt, next, buf, buf_len, name,
0, handle);
-#ifdef HAVE_SPLIT_SUPPORT
- if (rc == 0) {
- /* very ugly hack, if setting lmv, it means splitting
- * sucess, we should return -ERESTART to notify the
- * client, so transno for this splitting should be
- * zero according to the replay rules. so return -ERESTART
- * here let mdt trans stop callback know this.
- */
- if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
- rc = -ERESTART;
- }
-#endif
}else if (buf == NULL && buf_len == 0) {
rc = next->do_ops->do_xattr_del(ctxt, next, name, handle);
}
rc = mdd_xattr_set_txn(ctxt, md2mdd_obj(obj), buf, buf_len, name,
fl, handle);
-
+#ifdef HAVE_SPLIT_SUPPORT
+ if (rc == 0) {
+ /* very ugly hack, if setting lmv, it means splitting
+ * sucess, we should return -ERESTART to notify the
+ * client, so transno for this splitting should be
+ * zero according to the replay rules. so return -ERESTART
+ * here let mdt trans stop callback know this.
+ */
+ if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
+ rc = -ERESTART;
+ }
+#endif
mdd_trans_stop(ctxt, mdd, rc, handle);
RETURN(rc);
struct mdd_device *mdd = mdo2mdd(obj);
struct mdd_object *mdd_obj = md2mdd_obj(obj);
struct thandle *handle;
+ const struct lu_fid *pfid = spec->u.sp_pfid;
int rc;
ENTRY;
mdd_write_lock(ctxt, mdd_obj);
rc = __mdd_object_create(ctxt, mdd_obj, ma, handle);
+ if (rc == 0 && spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
+ /* if creating the slave object, set slave EA here */
+ rc = __mdd_xattr_set(ctxt, mdd_obj, spec->u.sp_ea.eadata,
+ spec->u.sp_ea.eadatalen, MDS_LMV_MD_NAME,
+ 0, handle);
+ pfid = spec->u.sp_ea.fid;
+ CWARN("set slave ea "DFID" eadatalen %d rc %d \n",
+ PFID(mdo2fid(mdd_obj)), spec->u.sp_ea.eadatalen, rc);
+ }
+
if (rc == 0)
- rc = __mdd_object_initialize(ctxt, spec->u.sp_pfid, mdd_obj,
- ma, handle);
+ rc = __mdd_object_initialize(ctxt, pfid, mdd_obj, ma, handle);
mdd_write_unlock(ctxt, mdd_obj);
if (rc == 0)
rc = mdd_attr_get_internal_locked(ctxt, mdd_obj, ma);
-
mdd_trans_stop(ctxt, mdd, rc, handle);
RETURN(rc);
}
ent = lu_dirent_next(ent)) {
struct lu_fid *lf = &ent->lde_fid;
- /* FIXME: check isdir */
+ /* FIXME: multi-trans for this name insert */
rc = mdo_name_insert(info->mti_ctxt,
md_object_next(&object->mot_obj),
ent->lde_name, lf, 0);
- CDEBUG(D_INFO, "insert name %s rc %d \n", ent->lde_name, rc);
if (rc) {
kunmap(page);
RETURN(rc);
rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
if (S_ISDIR(attr->la_mode)) {
+ struct md_create_spec *sp = &info->mti_spec;
/* pass parent fid for cross-ref cases */
- info->mti_spec.u.sp_pfid = rr->rr_fid1;
+ sp->u.sp_pfid = rr->rr_fid1;
+ if (info->mti_spec.sp_cr_flags & MDS_CREATE_SLAVE_OBJ) {
+ /* create salve object req, need
+ * unpack split ea here
+ */
+ req_capsule_extend(pill,
+ &RQF_MDS_REINT_CREATE_SLAVE);
+ LASSERT(req_capsule_field_present(pill,
+ &RMF_EADATA, RCL_CLIENT));
+ sp->u.sp_ea.eadata = req_capsule_client_get(pill,
+ &RMF_EADATA);
+ sp->u.sp_ea.eadatalen =req_capsule_get_size(pill,
+ &RMF_EADATA, RCL_CLIENT);
+ sp->u.sp_ea.fid = rr->rr_fid1;
+ }
} else if (S_ISLNK(attr->la_mode)) {
const char *tgt = NULL;
+ req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SYM);
if (req_capsule_field_present(pill, &RMF_SYMTGT,
RCL_CLIENT)) {
tgt = req_capsule_client_get(pill,
&RMF_PTLRPC_BODY,
&RMF_REC_CREATE,
&RMF_NAME,
+};
+
+static const struct req_msg_field *mds_reint_create_sym_client[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_REC_CREATE,
+ &RMF_NAME,
&RMF_SYMTGT
};
+static const struct req_msg_field *mds_reint_create_slave_client[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_REC_CREATE,
+ &RMF_NAME,
+ &RMF_EADATA
+};
+
static const struct req_msg_field *mds_reint_open_client[] = {
&RMF_PTLRPC_BODY,
&RMF_REC_CREATE,
&RQF_MDS_GETATTR_NAME,
&RQF_MDS_REINT,
&RQF_MDS_REINT_CREATE,
+ &RQF_MDS_REINT_CREATE_SYM,
+ &RQF_MDS_REINT_CREATE_SLAVE,
&RQF_MDS_REINT_OPEN,
&RQF_MDS_REINT_UNLINK,
&RQF_MDS_REINT_LINK,
mds_reint_create_client, mdt_body_only);
EXPORT_SYMBOL(RQF_MDS_REINT_CREATE);
+const struct req_format RQF_MDS_REINT_CREATE_SLAVE =
+ DEFINE_REQ_FMT0("MDS_REINT_CREATE_SLAVE",
+ mds_reint_create_slave_client, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_REINT_CREATE_SLAVE);
+
+const struct req_format RQF_MDS_REINT_CREATE_SYM =
+ DEFINE_REQ_FMT0("MDS_REINT_CREATE_SYM",
+ mds_reint_create_sym_client, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_REINT_CREATE_SYM);
+
const struct req_format RQF_MDS_REINT_OPEN =
DEFINE_REQ_FMT0("MDS_REINT_OPEN",
mds_reint_open_client, mds_reint_open_server);