static int cmm_send_split_pages(const struct lu_context *ctx,
struct md_object *mo, struct lu_rdpg *rdpg,
- struct lu_fid *fid, __u32 hash_end)
+ struct lu_fid *fid, int len)
{
struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
struct cmm_object *obj;
- int rc = 0, i;
+ int rc = 0;
ENTRY;
obj = cmm_object_find(ctx, cmm, fid);
if (IS_ERR(obj))
RETURN(PTR_ERR(obj));
- for (i = 0; i < rdpg->rp_npages; i++) {
- rc = mdc_send_page(cmm, ctx, md_object_next(&obj->cmo_obj),
- rdpg->rp_pages[i], hash_end);
- if (rc)
- break;
- }
+ rc = mdc_send_page(cmm, ctx, md_object_next(&obj->cmo_obj),
+ rdpg->rp_pages[0], len);
cmm_object_put(ctx, obj);
RETURN(rc);
}
+static int cmm_remove_entries(const struct lu_context *ctx,
+ struct md_object *mo, struct lu_rdpg *rdpg,
+ __u32 hash_end, __u32 *len)
+{
+ struct lu_dirpage *dp;
+ struct lu_dirent *ent;
+ int rc = 0, i;
+ ENTRY;
+
+ kmap(rdpg->rp_pages[0]);
+ dp = page_address(rdpg->rp_pages[0]);
+ for (ent = lu_dirent_start(dp); ent != NULL;
+ ent = lu_dirent_next(ent)) {
+ if (ent->lde_hash < hash_end) {
+ char *name;
+ /* FIXME: Here we allocate name for each name,
+ * maybe stupid, but can not find better way.
+ * will find better way */
+ OBD_ALLOC(name, ent->lde_namelen + 1);
+ memcpy(name, ent->lde_name, ent->lde_namelen);
+ rc = mdo_name_remove(ctx, md_object_next(mo), name);
+ CDEBUG(D_INFO, "remove name %s hash %lu \n",
+ name, ent->lde_hash);
+ OBD_FREE(name, ent->lde_namelen + 1);
+ if (rc) {
+ /* FIXME: Do not know why it return -ENOENT
+ * in some case
+ * */
+ if (rc != -ENOENT)
+ GOTO(unmap, rc);
+ }
+ } else {
+ if (ent != lu_dirent_start(dp))
+ *len = (int)((__u32)ent - (__u32)dp);
+ else
+ *len = 0;
+ GOTO(unmap, rc);
+ }
+ }
+ *len = CFS_PAGE_SIZE;
+unmap:
+ kunmap(rdpg->rp_pages[i]);
+ RETURN(rc);
+}
+
static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
struct lu_rdpg *rdpg, struct lu_fid *lf,
__u32 end)
{
- int rc, i;
+ int rc, done = 0;
ENTRY;
+ LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
+ "npages %d \n", rdpg->rp_npages);
/* Read splitted page and send them to the slave master */
do {
struct lu_dirpage *ldp;
+ __u32 len = 0;
/* init page with '0' */
- for (i = 0; i < rdpg->rp_npages; i++) {
- memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
- kunmap(rdpg->rp_pages[i]);
- }
+ memset(kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
+ kunmap(rdpg->rp_pages[0]);
rc = mo_readpage(ctx, md_object_next(mo), rdpg);
-
/* -E2BIG means it already reach the end of the dir */
- if (rc == -E2BIG)
- RETURN(0);
+ if (rc) {
+ if (rc == -E2BIG)
+ rc = 0;
+ RETURN(rc);
+ }
+
+ /* Remove the old entries */
+ rc = cmm_remove_entries(ctx, mo, rdpg, end, &len);
if (rc)
RETURN(rc);
- rc = cmm_send_split_pages(ctx, mo, rdpg, lf, end);
- if (rc)
- RETURN(rc);
+ /* Send page to slave object */
+ if (len > 0) {
+ rc = cmm_send_split_pages(ctx, mo, rdpg, lf, len);
+ if (rc)
+ RETURN(rc);
+ }
kmap(rdpg->rp_pages[0]);
ldp = page_address(rdpg->rp_pages[0]);
- if (ldp->ldp_hash_end == ~0ul)
- rc = -E2BIG;
+ if (ldp->ldp_hash_end >= end) {
+ done = 1;
+ }
rdpg->rp_hash = ldp->ldp_hash_end;
kunmap(rdpg->rp_pages[0]);
- } while (rc == 0);
+ } while (!done);
- /* it means already finish splitting this segment */
- if (rc == -E2BIG)
- rc = 0;
RETURN(rc);
}
-
-#if 0
-static int cmm_remove_entries(const struct lu_context *ctx,
- struct md_object *mo, struct lu_rdpg *rdpg)
-{
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
- int rc = 0, i;
- ENTRY;
-
- for (i = 0; i < rdpg->rp_npages; i++) {
- kmap(rdpg->rp_pages[i]);
- dp = page_address(rdpg->rp_pages[i]);
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent)) {
- rc = mdo_name_remove(ctx, md_object_next(mo),
- ent->lde_name);
- if (rc) {
- kunmap(rdpg->rp_pages[i]);
- RETURN(rc);
- }
- }
- kunmap(rdpg->rp_pages[i]);
- }
- RETURN(rc);
-}
-#endif
#define SPLIT_PAGE_COUNT 1
static int cmm_scan_and_split(const struct lu_context *ctx,
struct md_object *mo, struct md_attr *ma)
OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
OBD_FREE_PTR(ma);
-
RETURN(rc);
}
#ifdef HAVE_SPLIT_SUPPORT
int mdc_send_page(struct cmm_device *cm, const struct lu_context *ctx,
- struct md_object *mo, struct page *page, __u32 end)
+ struct md_object *mo, struct page *page, __u32 offset)
{
struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
- struct lu_dirpage *dp;
- struct lu_dirent *ent;
- int rc, offset = 0, rc1 = 0;
+ int rc;
ENTRY;
- kmap(page);
- dp = page_address(page);
-
- ent = lu_dirent_start(dp);
- if (ent->lde_hash > end)
- RETURN(-E2BIG);
-
- for (ent = lu_dirent_start(dp); ent != NULL;
- ent = lu_dirent_next(ent)) {
- if (ent->lde_hash > end) {
- offset = (int)((__u32)ent - (__u32)dp);
- rc1 = -E2BIG;
- goto send_page;
- }
-
- /* allocate new fid for each obj */
- rc = obd_fid_alloc(mc->mc_desc.cl_exp, &ent->lde_fid, NULL);
- if (rc > 0) {
- struct lu_site *ls;
-
- ls = cm->cmm_md_dev.md_lu_dev.ld_site;
- rc = fld_client_create(ls->ls_client_fld,
- fid_seq(&ent->lde_fid),
- mc->mc_num, ctx);
- }
-
- if (rc < 0) {
- kunmap(page);
- RETURN(rc);
- }
- }
- kunmap(page);
- offset = CFS_PAGE_SIZE;
-send_page:
- if (offset > 0) {
- rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
+ rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
page, offset);
- CDEBUG(D_INFO, "send page %p offset %d fid "DFID" rc %d \n",
- page, offset, PFID(lu_object_fid(&mo->mo_lu)),
- rc);
- }
- if (rc == 0)
- rc = rc1;
+ CDEBUG(D_INFO, "send page %p offset %d fid "DFID" rc %d \n",
+ page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
RETURN(rc);
}
#endif