};
/* --- lu_device_type operations --- */
-static int cmm_upcall(const struct lu_context *ctxt, struct md_device *md,
- enum md_upcall_event ev)
+int cmm_upcall(const struct lu_context *ctxt, struct md_device *md,
+ enum md_upcall_event ev)
{
struct md_device *upcall_dev;
int rc;
const struct lu_object_header *hdr,
struct lu_device *);
+
+int cmm_upcall(const struct lu_context *ctxt, struct md_device *md,
+ enum md_upcall_event ev);
#ifdef HAVE_SPLIT_SUPPORT
/* cmm_split.c */
int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo);
lmv->mea_ids[0] = *lf;
- rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1], cmm->cmm_tgt_count);
+ rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1],
+ cmm->cmm_tgt_count);
if (rc)
GOTO(cleanup, rc);
slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
slave_lmv->mea_count = 0;
for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
- rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma,
+ rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma,
slave_lmv, sizeof(slave_lmv));
if (rc)
GOTO(cleanup, rc);
rc = mo_readpage(ctx, md_object_next(mo), rdpg);
/* -E2BIG means it already reach the end of the dir */
- if (rc) {
- if (rc == -E2BIG || rc == -ERANGE)
- rc = 0;
- RETURN(rc);
+ if (rc) {
+ if (rc != -ERANGE) {
+ if (rc == -E2BIG)
+ rc = 0;
+ RETURN(rc);
+ }
}
/* Remove the old entries */
/* Send page to slave object */
if (len > 0) {
rc = cmm_send_split_pages(ctx, mo, rdpg, lf, len);
- if (rc)
+ if (rc)
RETURN(rc);
}
int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
{
+ struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct md_attr *ma;
int rc = 0;
ENTRY;
if (rc != CMM_EXPECT_SPLIT)
GOTO(cleanup, rc = 0);
+ /* Disable trans for splitting, since there will be
+ * so many trans in this one ops, confilct with current
+ * recovery design */
+ rc = cmm_upcall(ctx, &cmm->cmm_md_dev, MD_NO_TRANS);
+ if (rc)
+ GOTO(cleanup, rc = 0);
+
/* step2: create slave objects */
rc = cmm_create_slave_objects(ctx, mo, ma);
if (rc)
enum md_upcall_event {
/*sync the md layer*/
- MD_LOV_SYNC
+ MD_LOV_SYNC = (1 << 0),
+ MD_NO_TRANS = (1 << 1), /* Just for split, no need trans, for replay */
};
struct md_upcall {
RETURN(0);
}
+static int lmv_reset_hash_seg_end (struct lmv_obd *lmv, struct lmv_obj *obj,
+ const struct lu_fid *fid, int index,
+ struct lu_dirpage *dp)
+{
+ struct ptlrpc_request *tmp_req = NULL;
+ struct page *page = NULL;
+ struct lu_dirpage *next_dp;
+ struct obd_export *tgt_exp;
+ struct lu_fid rid = *fid;
+ __u32 seg_end, max_hash = MAX_HASH_SIZE;
+ int rc;
+
+ /*
+ * We have reached the end of this hash segment,
+ * and the start offset of next segment need to
+ * be gotten out from the next segment, set it to
+ * the end of this segment.
+ * */
+
+ do_div(max_hash, obj->lo_objcount);
+ seg_end = max_hash * index;
+
+ /* Get start offset from next segment */
+ rid = obj->lo_inodes[index].li_fid;
+ tgt_exp = lmv_get_export(lmv, &rid);
+ if (IS_ERR(tgt_exp))
+ GOTO(cleanup, PTR_ERR(tgt_exp));
+
+ /* Alloc a page to get next segment hash,
+ * FIXME: should we try to page from cache first */
+ page = alloc_pages(GFP_KERNEL, 0);
+ if (!page)
+ GOTO(cleanup, rc = -ENOMEM);
+
+ rc = md_readpage(tgt_exp, &rid, seg_end, page, &tmp_req);
+ if (rc) {
+ /* E2BIG means it already reached the end of the dir,
+ * no need reset the hash segment end */
+ if (rc == -E2BIG)
+ GOTO(cleanup, rc = 0);
+ if (rc != -ERANGE)
+ GOTO(cleanup, rc);
+ if (rc == -ERANGE)
+ rc = 0;
+ }
+ kmap(page);
+ next_dp = page_address(page);
+ LASSERT(le32_to_cpu(next_dp->ldp_hash_start) >= seg_end);
+ dp->ldp_hash_end = next_dp->ldp_hash_start;
+ kunmap(page);
+ CDEBUG(D_WARNING,"reset h_end %x for split obj"DFID"o_count %d index %d\n",
+ le32_to_cpu(dp->ldp_hash_end), PFID(&rid), obj->lo_objcount,
+ index);
+cleanup:
+ if (tmp_req)
+ ptlrpc_req_finished(tmp_req);
+ if (page)
+ __free_pages(page, 0);
+ RETURN(rc);
+}
+
static int lmv_readpage(struct obd_export *exp,
const struct lu_fid *fid,
__u64 offset, struct page *page,
#ifdef __KERNEL__
if (obj && i < obj->lo_objcount - 1) {
struct lu_dirpage *dp;
- __u32 end, max_hash = MAX_HASH_SIZE;
- /*
- * This dirobj has been split, so we check whether reach the end
- * of one hash_segment and reset ldp->ldp_hash_end.
- */
+ __u32 end;
kmap(page);
dp = page_address(page);
end = le32_to_cpu(dp->ldp_hash_end);
- if (end == ~0ul) {
- __u32 seg_end;
-
- do_div(max_hash, obj->lo_objcount);
- seg_end = max_hash * (i + 1);
-
- dp->ldp_hash_end = cpu_to_le32(seg_end);
- CDEBUG(D_INFO,"reset hash end %x for split obj "DFID" "
- "obj count %d \n",
- le32_to_cpu(dp->ldp_hash_end), PFID(&rid),
- obj->lo_objcount);
- }
+ if (end == ~0ul)
+ rc = lmv_reset_hash_seg_end(lmv, obj, fid,
+ i + 1, dp);
kunmap(page);
- }
+ } else
+ if (rc == -ERANGE)
+ rc = -EIO;
#endif
/*
* Here we could remove "." and ".." from all pages which at not from
ptlrpc_req_set_repsize(req, 2, size);
rc = ptlrpc_queue_wait(req);
- if (rc == 0) {
+ if (rc == 0 || rc == -ERANGE) {
body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
lustre_swab_mdt_body);
if (body == NULL) {
fl, handle);
#ifdef HAVE_SPLIT_SUPPORT
if (rc == 0) {
- /* very ugly hack, if setting lmv, it means splitting
- * sucess, we should return -ERESTART to notify the
+ /* very ugly hack, if setting lmv, it means splitting
+ * sucess, we should return -ERESTART to notify the
* client, so transno for this splitting should be
* zero according to the replay rules. so return -ERESTART
* here let mdt trans stop callback know this.
*/
- if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
+ if (strncmp(name, MDS_LMV_MD_NAME, strlen(name)) == 0)
rc = -ERESTART;
}
#endif
return (rep->lock_policy_res1 & flag);
}
+void mdt_clear_disposition(struct mdt_thread_info *info,
+ struct ldlm_reply *rep, int flag)
+{
+ if (info)
+ info->mti_opdata &= ~flag;
+ if (rep)
+ rep->lock_policy_res1 &= ~flag;
+}
+
void mdt_set_disposition(struct mdt_thread_info *info,
struct ldlm_reply *rep, int flag)
{
struct lu_dirent *ent;
int rc = 0;
+
+ /* Disable trans for this name insert, since it will
+ * include many trans for this */
+ info->mti_no_need_trans = 1;
kmap(page);
dp = page_address(page);
for (ent = lu_dirent_start(dp); ent != NULL;
struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
struct mdt_body *reqbody;
struct mdt_body *repbody;
- int rc;
+ int rc, rc1 = 0;
int i;
ENTRY;
rc = mo_readpage(info->mti_ctxt, mdt_object_child(object), rdpg);
if (rc) {
if (rc == -ERANGE)
- rc = -EIO;
- GOTO(free_rdpg, rc);
+ rc1 = rc;
+ else
+ GOTO(free_rdpg, rc);
}
/* send pages to client */
EXIT;
free_rdpg:
+
for (i = 0; i < rdpg->rp_npages; i++)
if (rdpg->rp_pages[i] != NULL)
__free_pages(rdpg->rp_pages[i], 0);
MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
- return rc;
+ return rc ? rc : rc1;
}
static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
{
struct mdt_device *m = mdt_dev(&md->md_lu_dev);
struct md_device *next = m->mdt_child;
+ struct mdt_thread_info *mti;
int rc = 0;
ENTRY;
CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
m->mdt_max_mdsize, m->mdt_max_cookiesize);
break;
+ case MD_NO_TRANS:
+ mti = lu_context_key_get(ctx, &mdt_thread_key);
+ mti->mti_no_need_trans = 1;
+ CDEBUG(D_INFO, "disable mdt trans for this thread\n");
+ break;
default:
CERROR("invalid event\n");
rc = -EINVAL;
/* transaction number of current request */
__u64 mti_transno;
- __u32 mti_has_trans:1; /* has txn already? */
+ __u32 mti_has_trans:1, /* has txn already? */
+ mti_no_need_trans:1;
/* opdata for mdt_open(), has the same as ldlm_reply:lock_policy_res1.
* mdt_update_last_rcvd() stores this value onto disk for recovery
int mdt_get_disposition(struct ldlm_reply *rep, int flag);
void mdt_set_disposition(struct mdt_thread_info *info,
struct ldlm_reply *rep, int flag);
+void mdt_clear_disposition(struct mdt_thread_info *info,
+ struct ldlm_reply *rep, int flag);
int mdt_object_lock(struct mdt_thread_info *,
struct mdt_object *,
mdt_object_child(child),
&info->mti_spec,
&info->mti_attr);
- if (result == -ERESTART)
+ if (result == -ERESTART) {
+ mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
GOTO(out_child, result);
+ }
else {
if (result != 0)
GOTO(out_child, result);
struct mdt_txn_info *txi;
struct mdt_thread_info *mti;
struct ptlrpc_request *req;
-
+
/* transno in two contexts - for commit_cb and for thread */
txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
mti = lu_context_key_get(ctx, &mdt_thread_key);
/* FIXME: don't handle requests from SEQ/FLD,
* should be fixed
*/
- if (mti->mti_mdt == NULL || req == NULL) {
+ if (mti->mti_mdt == NULL || req == NULL || mti->mti_no_need_trans) {
txi->txi_transno = 0;
return 0;
}
if (first) {
area += sizeof (struct lu_dirpage);
nob -= sizeof (struct lu_dirpage);
-
}
LASSERT(nob > sizeof *ent);
struct dt_it *it;
struct osd_object *obj = osd_dt_obj(dt);
struct dt_it_ops *iops;
- int i;
- int rc;
- int nob;
+ int i, rc, rc1 = 0, nob;
LASSERT(dt_object_exists(dt));
LASSERT(osd_invariant(obj));
* XXX position iterator at rdpg->rp_hash
*/
rc = iops->load(ctxt, it, rdpg->rp_hash);
- if (rc > 0) {
+
+ /* When spliting, it need read entries from some offset by computing
+ * not by some entries offset like readdir, so it might return 0 here.
+ */
+ if (rc == 0)
+ rc1 = -ERANGE;
+
+ if (rc >= 0) {
struct page *pg; /* no, Richard, it _is_ initialized */
struct lu_dirent *last;
__u32 hash_start;
dp->ldp_hash_end = hash_end;
kunmap(rdpg->rp_pages[0]);
}
- } else if (rc == 0)
- rc = -EIO;
+ }
iops->put(ctxt, it);
iops->fini(ctxt, it);
-
- return rc;
+
+ return rc ? rc : rc1;
}
static struct dt_object_operations osd_obj_ops = {