Whamcloud - gitweb
Branch: b_new_cmd
authorwangdi <wangdi>
Sat, 16 Sep 2006 18:15:48 +0000 (18:15 +0000)
committerwangdi <wangdi>
Sat, 16 Sep 2006 18:15:48 +0000 (18:15 +0000)
some fixes about readdir and split dir

lustre/cmm/cmm_split.c
lustre/cmm/mdc_object.c
lustre/llite/dir.c
lustre/lmv/lmv_obd.c

index 34f2a5c..dc9acfc 100644 (file)
@@ -226,96 +226,117 @@ cleanup:
 
 static int cmm_send_split_pages(const struct lu_context *ctx,
                                 struct md_object *mo, struct lu_rdpg *rdpg,
-                                struct lu_fid *fid, __u32 hash_end)
+                                struct lu_fid *fid, int len)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct cmm_object *obj;
-        int rc = 0, i;
+        int rc = 0;
         ENTRY;
 
         obj = cmm_object_find(ctx, cmm, fid);
         if (IS_ERR(obj))
                 RETURN(PTR_ERR(obj));
 
-        for (i = 0; i < rdpg->rp_npages; i++) {
-                rc = mdc_send_page(cmm, ctx, md_object_next(&obj->cmo_obj),
-                                   rdpg->rp_pages[i], hash_end);
-                if (rc)
-                        break;
-        }
+        rc = mdc_send_page(cmm, ctx, md_object_next(&obj->cmo_obj),
+                           rdpg->rp_pages[0], len);
         cmm_object_put(ctx, obj);
         RETURN(rc);
 }
 
+static int cmm_remove_entries(const struct lu_context *ctx,
+                              struct md_object *mo, struct lu_rdpg *rdpg,
+                              __u32 hash_end, __u32 *len)
+{
+        struct lu_dirpage *dp;
+        struct lu_dirent  *ent;
+        int rc = 0, i;
+        ENTRY;
+
+        kmap(rdpg->rp_pages[0]);
+        dp = page_address(rdpg->rp_pages[0]);
+        for (ent = lu_dirent_start(dp); ent != NULL;
+                          ent = lu_dirent_next(ent)) {
+                if (ent->lde_hash < hash_end) {
+                        char *name;
+                        /* FIXME: Here we allocate name for each name,
+                         * maybe stupid, but can not find better way.
+                         * will find better way */
+                        OBD_ALLOC(name, ent->lde_namelen + 1);
+                        memcpy(name, ent->lde_name, ent->lde_namelen);
+                        rc = mdo_name_remove(ctx, md_object_next(mo), name);
+                        CDEBUG(D_INFO, "remove name %s hash %lu \n",
+                                        name, ent->lde_hash); 
+                        OBD_FREE(name, ent->lde_namelen + 1);
+                        if (rc) {
+                                /* FIXME: Do not know why it return -ENOENT
+                                 * in some case 
+                                 * */
+                                if (rc != -ENOENT)
+                                        GOTO(unmap, rc);
+                        }
+                } else {
+                        if (ent != lu_dirent_start(dp))
+                                *len = (int)((__u32)ent - (__u32)dp);
+                        else
+                                *len = 0;
+                        GOTO(unmap, rc);
+                }
+        }
+        *len = CFS_PAGE_SIZE;
+unmap:
+        kunmap(rdpg->rp_pages[i]);
+        RETURN(rc);
+}
+
 static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
                              struct lu_rdpg *rdpg, struct lu_fid *lf,
                              __u32 end)
 {
-        int rc, i;
+        int rc, done = 0;
         ENTRY;
 
+        LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
+                        "npages %d \n", rdpg->rp_npages);
         /* Read splitted page and send them to the slave master */
         do {
                 struct lu_dirpage *ldp;
+                __u32  len = 0;
 
                 /* init page with '0' */
-                for (i = 0; i < rdpg->rp_npages; i++) {
-                        memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
-                        kunmap(rdpg->rp_pages[i]);
-                }
+                memset(kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
+                kunmap(rdpg->rp_pages[0]);
 
                 rc = mo_readpage(ctx, md_object_next(mo), rdpg);
-
                 /* -E2BIG means it already reach the end of the dir */
-                if (rc == -E2BIG)
-                        RETURN(0);
+                if (rc) { 
+                        if (rc == -E2BIG)
+                                rc = 0;
+                        RETURN(rc);
+                }
+                
+                /* Remove the old entries */
+                rc = cmm_remove_entries(ctx, mo, rdpg, end, &len);
                 if (rc)
                         RETURN(rc);
 
-                rc = cmm_send_split_pages(ctx, mo, rdpg, lf, end);
-                if (rc) 
-                        RETURN(rc);
+                /* Send page to slave object */ 
+                if (len > 0) {
+                        rc = cmm_send_split_pages(ctx, mo, rdpg, lf, len);
+                        if (rc) 
+                                RETURN(rc);
+                }
                 
                 kmap(rdpg->rp_pages[0]);
                 ldp = page_address(rdpg->rp_pages[0]);
-                if (ldp->ldp_hash_end == ~0ul)
-                        rc = -E2BIG;
+                if (ldp->ldp_hash_end >= end) {
+                        done = 1;
+                }
                 rdpg->rp_hash = ldp->ldp_hash_end;
                 kunmap(rdpg->rp_pages[0]); 
-        } while (rc == 0);
+        } while (!done);
 
-        /* it means already finish splitting this segment */
-        if (rc == -E2BIG)
-                rc = 0;
         RETURN(rc);
 }
-
-#if 0
-static int cmm_remove_entries(const struct lu_context *ctx,
-                              struct md_object *mo, struct lu_rdpg *rdpg)
-{
-        struct lu_dirpage *dp;
-        struct lu_dirent  *ent;
-        int rc = 0, i;
-        ENTRY;
-
-        for (i = 0; i < rdpg->rp_npages; i++) {
-                kmap(rdpg->rp_pages[i]);
-                dp = page_address(rdpg->rp_pages[i]);
-                for (ent = lu_dirent_start(dp); ent != NULL;
-                                  ent = lu_dirent_next(ent)) {
-                        rc = mdo_name_remove(ctx, md_object_next(mo),
-                                             ent->lde_name);
-                        if (rc) {
-                                kunmap(rdpg->rp_pages[i]);
-                                RETURN(rc);
-                        }
-                }
-                kunmap(rdpg->rp_pages[i]);
-        }
-        RETURN(rc);
-}
-#endif
 #define SPLIT_PAGE_COUNT 1
 static int cmm_scan_and_split(const struct lu_context *ctx,
                               struct md_object *mo, struct md_attr *ma)
@@ -411,6 +432,5 @@ cleanup:
                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
 
         OBD_FREE_PTR(ma);
-
         RETURN(rc);
 }
index cc87e7d..ff5ee14 100644 (file)
@@ -283,57 +283,16 @@ static int mdc_ref_del(const struct lu_context *ctx, struct md_object *mo,
 
 #ifdef HAVE_SPLIT_SUPPORT
 int mdc_send_page(struct cmm_device *cm, const struct lu_context *ctx,
-                  struct md_object *mo, struct page *page, __u32 end)
+                  struct md_object *mo, struct page *page, __u32 offset)
 {
         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
-        struct lu_dirpage *dp;
-        struct lu_dirent  *ent;
-        int rc, offset = 0, rc1 = 0;
+        int rc;
         ENTRY;
 
-        kmap(page);
-        dp = page_address(page);
-
-        ent = lu_dirent_start(dp);
-        if (ent->lde_hash > end)
-                RETURN(-E2BIG);
-
-        for (ent = lu_dirent_start(dp); ent != NULL;
-             ent = lu_dirent_next(ent)) {
-                if (ent->lde_hash > end) {
-                        offset = (int)((__u32)ent - (__u32)dp);
-                        rc1 = -E2BIG;
-                        goto send_page;
-                }
-
-                /* allocate new fid for each obj */
-                rc = obd_fid_alloc(mc->mc_desc.cl_exp, &ent->lde_fid, NULL);
-                if (rc > 0) {
-                        struct lu_site *ls;
-
-                        ls = cm->cmm_md_dev.md_lu_dev.ld_site;
-                        rc = fld_client_create(ls->ls_client_fld,
-                                               fid_seq(&ent->lde_fid),
-                                               mc->mc_num, ctx);
-                }
-
-                if (rc < 0) {
-                        kunmap(page);
-                        RETURN(rc);
-                }
-        }
-        kunmap(page);
-        offset = CFS_PAGE_SIZE;
-send_page:
-        if (offset > 0) {
-                rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
+        rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
                                   page, offset);
-                CDEBUG(D_INFO, "send page %p  offset %d fid "DFID" rc %d \n",
-                                page, offset, PFID(lu_object_fid(&mo->mo_lu)),
-                                rc);
-        }
-        if (rc == 0)
-                rc = rc1;
+        CDEBUG(D_INFO, "send page %p  offset %d fid "DFID" rc %d \n",
+                        page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc);
         RETURN(rc);
 }
 #endif
index bf68c18..01af337 100644 (file)
@@ -142,7 +142,7 @@ static int ll_dir_readpage(struct file *file, struct page *page)
         struct inode *inode = page->mapping->host;
         struct ptlrpc_request *request;
         struct mdt_body *body;
-        __u32 hash;
+        __u64 hash;
         int rc;
         ENTRY;
 
@@ -236,6 +236,9 @@ static struct page *ll_dir_page_locate(struct inode *dir, unsigned long hash,
                         LASSERT(*start <= hash);
                         if (hash > *end || (*end != *start && hash == *end)) {
                                 kunmap(page);
+                                lock_page(page); 
+                                ll_truncate_complete_page(page);
+                                unlock_page(page); 
                                 page_cache_release(page);
                                 page = NULL;
                         }
index 918c6a1..7b98c8c 100644 (file)
@@ -1927,7 +1927,6 @@ static int lmv_readpage(struct obd_export *exp,
                 LASSERT(obj->lo_objcount > 0);
                 do_div(seg, obj->lo_objcount);
                 do_div(index, seg);
-                offset -= index * seg;  
                 i = (int)index;
                 rid = obj->lo_inodes[i].li_fid;