Whamcloud - gitweb
Branch: b_new_cmd
authorwangdi <wangdi>
Thu, 31 Aug 2006 09:59:44 +0000 (09:59 +0000)
committerwangdi <wangdi>
Thu, 31 Aug 2006 09:59:44 +0000 (09:59 +0000)
some fixes and updates for splitting dir

lustre/cmm/cmm_split.c
lustre/mdt/mdt_handler.c
lustre/osd/osd_handler.c

index 921f883..7ea4174 100644 (file)
@@ -93,8 +93,8 @@ static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
         
         LASSERT(count == cmm->cmm_tgt_count);
         
-        /*FIXME: this spin_lock maybe not proper, 
-         * because fid_alloc may need RPC*/
+        /* FIXME: this spin_lock maybe not proper, 
+         * because fid_alloc may need RPC */
         spin_lock(&cmm->cmm_tgt_guard);
         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
                                  mc_linkage) {
@@ -166,7 +166,7 @@ static int cmm_create_slave_objects(const struct lu_context *ctx,
         lmv->mea_count = cmm->cmm_tgt_count + 1;
 
         lmv->mea_ids[0] = *lf;
-        /*create object*/
+        
         rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1], cmm->cmm_tgt_count);
         if (rc)
                 GOTO(cleanup, rc);
@@ -184,10 +184,96 @@ cleanup:
         RETURN(rc);
 }
 
+static int cmm_send_split_pages(const struct lu_context *ctx, 
+                                struct md_object *mo, struct lu_rdpg *rdpg)
+{
+        RETURN(0);
+}
+
+static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
+                             struct lu_rdpg *rdpg)
+{
+        struct lu_dirpage *dp;
+        __u32 hash_end;
+        int rc;
+        ENTRY;
+
+        do {
+                rc = mo_readpage(ctx, md_object_next(mo), rdpg);
+                if (rc)
+                        RETURN(rc);
+
+                rc = cmm_send_split_pages(ctx, mo, rdpg);
+                if (rc)
+                        RETURN(rc);
+
+                dp = kmap(rdpg->rp_pages[0]);
+                hash_end = dp->ldp_hash_end;
+                kunmap(rdpg->rp_pages[0]);
+                if (hash_end == ~0ul)
+                        break;
+        } while (hash_end < rdpg->rp_hash_end);
+        
+        RETURN(rc);
+}
+
+static int cmm_remove_entries(const struct lu_context *ctx, 
+                              struct md_object *mo, __u32 start, __u32 end)
+{
+        RETURN(0);
+}
+#define MAX_HASH_SIZE 0x3fffffff
+#define SPLIT_PAGE_COUNT 1
+
 static int cmm_scan_and_split(const struct lu_context *ctx,
                               struct md_object *mo, struct md_attr *ma)
 {
-        RETURN(0);
+        struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+        __u32 hash_segement;
+        struct lu_rdpg   *rdpg = NULL;
+        int rc = 0, i;
+
+        OBD_ALLOC_PTR(rdpg);
+        if (!rdpg)
+                RETURN(-ENOMEM);
+
+        rdpg->rp_npages = SPLIT_PAGE_COUNT;
+        rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
+
+        OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
+        if (rdpg->rp_pages == NULL)
+                GOTO(free_rdpg, rc = -ENOMEM);
+
+        for (i = 0; i < rdpg->rp_npages; i++) {
+                rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
+                if (rdpg->rp_pages[i] == NULL)
+                        GOTO(cleanup, rc = -ENOMEM);
+        }
+        
+        hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
+        for (i = 1; i < cmm->cmm_tgt_count; i++) {
+                rdpg->rp_hash = i * hash_segement;
+                rdpg->rp_hash_end = rdpg->rp_hash + hash_segement;
+                rc = cmm_split_entries(ctx, mo, rdpg);
+                if (rc)
+                        GOTO(cleanup, rc);
+                rc = cmm_remove_entries(ctx, mo, rdpg->rp_hash, 
+                                        rdpg->rp_hash_end);
+                if (rc)
+                        GOTO(cleanup, rc);
+        }
+cleanup:
+        for (i = 0; i < rdpg->rp_npages; i++)
+                if (rdpg->rp_pages[i] != NULL)
+                        __free_pages(rdpg->rp_pages[i], 0);
+        if (rdpg->rp_pages)
+                OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
+                                         sizeof rdpg->rp_pages[0]);
+free_rdpg:
+        if (rdpg)
+                OBD_FREE_PTR(rdpg);
+
+        RETURN(rc);
 }
 
 int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
@@ -207,21 +293,20 @@ int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
         if (rc)
                 GOTO(cleanup, ma);
 
-        /*step1: checking whether the dir need to be splitted*/
+        /* step1: checking whether the dir need to be splitted */
         rc = cmm_expect_splitting(ctx, mo, ma);
         if (rc != CMM_EXPECT_SPLIT)
                 GOTO(cleanup, rc = 0);
 
-        /*step2: create slave objects*/
+        /* step2: create slave objects */
         rc = cmm_create_slave_objects(ctx, mo, ma);
         if (rc)
                 GOTO(cleanup, ma);
 
-        /*step3: scan and split the object*/
+        /* step3: scan and split the object */
         rc = cmm_scan_and_split(ctx, mo, ma);
 cleanup:
         OBD_FREE_PTR(ma);
 
         RETURN(rc);
 }
-
index 1b7ebbd..137e594 100644 (file)
@@ -585,7 +585,7 @@ static int mdt_readpage(struct mdt_thread_info *info)
          * reqbody->nlink contains number bytes to read.
          */
         rdpg->rp_hash = reqbody->size;
-        rdpg->rp_hash_end = -1;
+        rdpg->rp_hash_end = ~0ul;
         if ((__u64)rdpg->rp_hash != reqbody->size) {
                 CERROR("Invalid hash: %#llx != %#llx\n",
                        (__u64)rdpg->rp_hash, reqbody->size);
index 96b1fee..3866147 100644 (file)
@@ -1103,6 +1103,12 @@ static int osd_dir_page_build(const struct lu_context *ctx, int first,
 
                 recsize = (sizeof *ent + len + 3) & ~3;
                 hash = iops->store(ctx, it);
+                if (hash > hash_end) {
+                        *end = hash_end;
+                        if (first && ent == area)
+                                *start = hash_end;
+                        break;
+                }
                 *end = hash;
                 CDEBUG(D_INODE, "%p %p %d "DFID": %#8.8x (%d)\"%*.*s\"\n",
                        area, ent, nob, PFID(fid), hash, len, len, len, name);
@@ -1117,8 +1123,6 @@ static int osd_dir_page_build(const struct lu_context *ctx, int first,
                         *last = ent;
                         ent = (void *)ent + recsize;
                         nob -= recsize;
-                        if (hash >= hash_end)
-                                break;
                         result = iops->next(ctx, it);
                 } else {
                         /*