Whamcloud - gitweb
Branch: b_new_cmd
authorwangdi <wangdi>
Sat, 9 Sep 2006 08:34:15 +0000 (08:34 +0000)
committerwangdi <wangdi>
Sat, 9 Sep 2006 08:34:15 +0000 (08:34 +0000)
some fixes about split dir

12 files changed:
lustre/cmm/cmm_device.c
lustre/cmm/cmm_internal.h
lustre/cmm/cmm_split.c
lustre/cmm/mdc_internal.h
lustre/cmm/mdc_object.c
lustre/include/lu_object.h
lustre/include/lustre_mdc.h
lustre/include/lustre_req_layout.h
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/osd/osd_handler.c
lustre/ptlrpc/layout.c

index f724c3f..ca18e79 100644 (file)
@@ -51,8 +51,8 @@ static inline int lu_device_is_cmm(struct lu_device *d)
        return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops);
 }
 
-static int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
-                        struct lu_fid *fid)
+int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
+                 struct lu_fid *fid)
 {
         struct cmm_device *cmm_dev = md2cmm_dev(md);
         /* valid only on master MDS */
index debf1b6..3274ed2 100644 (file)
@@ -118,10 +118,14 @@ struct lu_object *cmm_object_alloc(const struct lu_context *ctx,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *);
 
+int cmm_root_get(const struct lu_context *ctx, struct md_device *md,
+                 struct lu_fid *fid);
+
 #ifdef HAVE_SPLIT_SUPPORT
 /* cmm_split.c */
 int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo);
 #endif
+
 #endif /* __KERNEL__ */
 #endif /* _CMM_INTERNAL_H */
 
index 537ef80..53a7aac 100644 (file)
 #include "cmm_internal.h"
 #include "mdc_internal.h"
 
-struct cmm_thread_info {
-        struct md_attr   cti_ma;
-};
-
-struct lu_context_key cmm_thread_key;
-struct cmm_thread_info *cmm_ctx_info(const struct lu_context *ctx)
-{
-        struct cmm_thread_info *info;
-
-        info = lu_context_key_get(ctx, &cmm_thread_key);
-        LASSERT(info != NULL);
-        return info;
-}
-
 #define CMM_NO_SPLIT_EXPECTED   0
 #define CMM_EXPECT_SPLIT        1
 #define CMM_NO_SPLITTABLE       2
 
-#define SPLIT_SIZE 64*1024
+#define SPLIT_SIZE 8*1024
+
+static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
+{
+       return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
+}
 
 static int cmm_expect_splitting(const struct lu_context *ctx,
                                 struct md_object *mo, struct md_attr *ma)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
+        struct lu_fid *fid = NULL;
+        int rc = CMM_EXPECT_SPLIT;
         ENTRY;
 
-        if (cmm->cmm_tgt_count == 1)
-                RETURN(CMM_NO_SPLIT_EXPECTED);
+        if (cmm->cmm_tgt_count == 0)
+                GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
         if (ma->ma_attr.la_size < SPLIT_SIZE)
-                RETURN(CMM_NO_SPLIT_EXPECTED);
+                GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
 
         if (ma->ma_lmv_size)
-                RETURN(CMM_NO_SPLIT_EXPECTED);
-                       
-        RETURN(CMM_EXPECT_SPLIT);
-}
+                GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
+                   
+        OBD_ALLOC_PTR(fid);
+        rc = cmm_root_get(ctx, &cmm->cmm_md_dev, fid);
+        if (rc)
+                GOTO(cleanup, rc);
+        
+        if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
+                GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); 
 
-static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
-{
-       return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
+cleanup:
+        if (fid)
+                OBD_FREE_PTR(fid);
+        RETURN(rc);
 }
 
 #define cmm_md_size(stripes)                            \
@@ -97,9 +96,8 @@ static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
         spin_lock(&cmm->cmm_tgt_guard);
         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
                                  mc_linkage) {
-                if (cmm->cmm_local_num == mc->mc_num)
-                        continue;
-
+                LASSERT(cmm->cmm_local_num != mc->mc_num);
+                
                 rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL);
                 if (rc < 0) {
                         spin_unlock(&cmm->cmm_tgt_guard);
@@ -107,7 +105,7 @@ static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
                 }
         }
         spin_unlock(&cmm->cmm_tgt_guard);
-        LASSERT(i + 1 == count);
+        LASSERT(i == count);
         if (rc == 1)
                 rc = 0;
         RETURN(rc);
@@ -168,7 +166,7 @@ static int cmm_create_slave_objects(const struct lu_context *ctx,
         struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
         ENTRY;
 
-        lmv_size = cmm_md_size(cmm->cmm_tgt_count);
+        lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
 
         /* This lmv will be free after finish splitting. */
         OBD_ALLOC(lmv, lmv_size);
@@ -202,7 +200,7 @@ cleanup:
 
 static int cmm_send_split_pages(const struct lu_context *ctx, 
                                 struct md_object *mo, struct lu_rdpg *rdpg, 
-                                struct lu_fid *fid)
+                                struct lu_fid *fid, __u32 hash_end)
 {
         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
         struct cmm_object *obj;
@@ -215,49 +213,48 @@ static int cmm_send_split_pages(const struct lu_context *ctx,
 
         for (i = 0; i < rdpg->rp_npages; i++) {
                 rc = mdc_send_page(ctx, md_object_next(&obj->cmo_obj),
-                                   rdpg->rp_pages[i]);
+                                   rdpg->rp_pages[i], hash_end);
                 if (rc)
-                        GOTO(cleanup, rc);
+                        break;
         }
-cleanup:
         cmm_object_put(ctx, obj);
         RETURN(rc);
 }
 
 static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
-                             struct lu_rdpg *rdpg, struct lu_fid *lf)
+                             struct lu_rdpg *rdpg, struct lu_fid *lf, 
+                             __u32 end)
 {
-        struct lu_dirpage *dp;
-        __u32 hash_end;
         int rc, i;
         ENTRY;
 
-        /* init page with '0' */
-        for (i = 0; i < rdpg->rp_npages; i++) {
-                memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
-                kunmap(rdpg->rp_pages[i]);
-        }
-
         /* Read splitted page and send them to the slave master */
         do {
+                /* init page with '0' */
+                for (i = 0; i < rdpg->rp_npages; i++) {
+                        memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
+                        kunmap(rdpg->rp_pages[i]);
+                }
+
                 rc = mo_readpage(ctx, md_object_next(mo), rdpg);
+                
+                /* -E2BIG means it already reach the end of the dir */
+                if (rc == -E2BIG)                         
+                        RETURN(0);
                 if (rc)
                         RETURN(rc);
 
-                rc = cmm_send_split_pages(ctx, mo, rdpg, lf);
-                if (rc)
-                        RETURN(rc);
+                rc = cmm_send_split_pages(ctx, mo, rdpg, lf, end);
 
-                dp = kmap(rdpg->rp_pages[0]);
-                hash_end = dp->ldp_hash_end;
-                kunmap(rdpg->rp_pages[0]);
-                if (hash_end == ~0ul)
-                        break;
-        } while (hash_end < rdpg->rp_hash_end);
-        
+        } while (rc == 0);
+
+        /* it means already finish splitting this segment */
+        if (rc == -E2BIG)
+                rc = 0;
         RETURN(rc);
 }
 
+#if 0
 static int cmm_remove_entries(const struct lu_context *ctx, 
                               struct md_object *mo, struct lu_rdpg *rdpg)
 {
@@ -282,7 +279,7 @@ static int cmm_remove_entries(const struct lu_context *ctx,
         }
         RETURN(rc);
 }
-
+#endif
 #define MAX_HASH_SIZE 0x3fffffff
 #define SPLIT_PAGE_COUNT 1
 static int cmm_scan_and_split(const struct lu_context *ctx,
@@ -313,14 +310,12 @@ static int cmm_scan_and_split(const struct lu_context *ctx,
         hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
         for (i = 1; i < cmm->cmm_tgt_count; i++) {
                 struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
+                __u32 hash_end;
                 
                 rdpg->rp_hash = i * hash_segement;
-                rdpg->rp_hash_end = rdpg->rp_hash + hash_segement;
-                rc = cmm_remove_entries(ctx, mo, rdpg);
-                if (rc)
-                        GOTO(cleanup, rc);
-
-                rc = cmm_split_entries(ctx, mo, rdpg, lf);
+                hash_end = rdpg->rp_hash + hash_segement;
+                
+                rc = cmm_split_entries(ctx, mo, rdpg, lf, hash_end);
                 if (rc)
                         GOTO(cleanup, rc);
         }
index 0141529..f5a5d30 100644 (file)
@@ -97,7 +97,7 @@ struct lu_object *mdc_object_alloc(const struct lu_context *,
                                    struct lu_device *);
 #ifdef HAVE_SPLIT_SUPPORT
 int mdc_send_page(const struct lu_context *ctx, struct md_object *mo,
-                  struct page *page);
+                  struct page *page, __u32 end);
 #endif
 
 #endif /* __KERNEL__ */
index bc7b991..ee96208 100644 (file)
@@ -246,18 +246,24 @@ static int mdc_ref_del(const struct lu_context *ctx, struct md_object *mo,
 
 #ifdef HAVE_SPLIT_SUPPORT
 int mdc_send_page(const struct lu_context *ctx, struct md_object *mo,
-                  struct page *page)
+                  struct page *page, __u32 end)
 {
         struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo));
         struct lu_dirpage *dp;
         struct lu_dirent  *ent;
-        int rc;
+        int rc, offset = 0, rc1 = 0;
         ENTRY;
 
         kmap(page);
         dp = page_address(page);
         for (ent = lu_dirent_start(dp); ent != NULL;
                           ent = lu_dirent_next(ent)) {
+                if (ent->lde_hash < end) {
+                        offset = (int)((__u32)ent - (__u32)dp);
+                        rc1 = -E2BIG;
+                        goto send_page;
+                }
+                        
                 /* allocate new fid for each obj */
                 rc = obd_fid_alloc(mc->mc_desc.cl_exp, &ent->lde_fid, NULL);
                 if (rc) {
@@ -266,8 +272,17 @@ int mdc_send_page(const struct lu_context *ctx, struct md_object *mo,
                 }
         }
         kunmap(page);
-        rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
-                          page);
+        offset = CFS_PAGE_SIZE;
+send_page:
+        if (offset > 0) {
+                rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu),
+                                  page, offset);
+                CDEBUG(D_INFO, "send page %p  offset %d fid "DFID" rc %d \n",
+                                page, offset, PFID(lu_object_fid(&mo->mo_lu)),
+                                rc);
+        }
+        if (rc == 0)
+                rc = rc1;
         RETURN(rc);
 }
 #endif
index 2529ac1..f27f456 100644 (file)
@@ -807,8 +807,6 @@ static inline const __u32 lu_object_attr(const struct lu_object *o)
 struct lu_rdpg {
         /* input params, should be filled out by mdt */
         __u32                   rp_hash;        /* hash */
-        __u32                   rp_hash_end;    /* hash end, means reading the
-                                                   entry until this hash*/
         int                     rp_count;       /* count in bytes       */
         int                     rp_npages;      /* number of pages      */
         struct page           **rp_pages;       /* pointers to pages    */
index 8273421..3694e1c 100644 (file)
@@ -37,6 +37,6 @@ void it_set_disposition(struct lookup_intent *it, int flag);
 int it_open_error(int phase, struct lookup_intent *it);
 #ifdef HAVE_SPLIT_SUPPORT
 int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
-                 const struct page *page);
+                 const struct page *page, int offset);
 #endif
 #endif
index 6ea8165..2b65149 100644 (file)
@@ -109,6 +109,7 @@ extern const struct req_format RQF_MDS_PIN;
 extern const struct req_format RQF_MDS_CONNECT;
 extern const struct req_format RQF_MDS_DISCONNECT;
 extern const struct req_format RQF_MDS_READPAGE;
+extern const struct req_format RQF_MDS_WRITEPAGE;
 extern const struct req_format RQF_MDS_DONE_WRITING;
 #ifdef HAVE_SPLIT_SUPPORT
 extern const struct req_format RQF_MDS_WRITEPAGE;
index 3ed3ccf..8ed7055 100644 (file)
@@ -730,7 +730,7 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data)
 
 #ifdef HAVE_SPLIT_SUPPORT
 int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
-                 const struct page *page)
+                 const struct page *page, int offset)
 {
         struct obd_import *imp = class_exp2cliimp(exp);
         struct ptlrpc_request *req = NULL;
@@ -752,9 +752,9 @@ int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
         if (desc == NULL)
                 GOTO(out, rc = -ENOMEM);
         /* NB req now owns desc and will free it when it gets freed */
-        ptlrpc_prep_bulk_page(desc, (struct page*)page, 0, PAGE_CACHE_SIZE);
+        ptlrpc_prep_bulk_page(desc, (struct page*)page, 0, offset);
 
-        mdc_readdir_pack(req, REQ_REC_OFF, 0, PAGE_CACHE_SIZE, fid);
+        mdc_readdir_pack(req, REQ_REC_OFF, 0, offset, fid);
 
         ptlrpc_req_set_repsize(req, 2, size);
         rc = ptlrpc_queue_wait(req);
index e90bc79..02207d2 100644 (file)
@@ -596,7 +596,6 @@ static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
                 }
         }
         kunmap(page);
-
         RETURN(rc);
 }
 
@@ -704,7 +703,6 @@ static int mdt_readpage(struct mdt_thread_info *info)
          * reqbody->nlink contains number bytes to read.
          */
         rdpg->rp_hash = reqbody->size;
-        rdpg->rp_hash_end = ~0ul;
         if ((__u64)rdpg->rp_hash != reqbody->size) {
                 CERROR("Invalid hash: %#llx != %#llx\n",
                        (__u64)rdpg->rp_hash, reqbody->size);
index 542eab5..30f517b 100644 (file)
@@ -1077,8 +1077,7 @@ static int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt,
 static int osd_dir_page_build(const struct lu_context *ctx, int first,
                               void *area, int nob,
                               struct dt_it_ops  *iops, struct dt_it *it,
-                              __u32 *start, __u32 *end, __u32 hash_end,
-                              struct lu_dirent **last)
+                              __u32 *start, __u32 *end, struct lu_dirent **last)
 {
         int result;
         struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key);
@@ -1109,12 +1108,6 @@ static int osd_dir_page_build(const struct lu_context *ctx, int first,
 
                 recsize = (sizeof *ent + len + 3) & ~3;
                 hash = iops->store(ctx, it);
-                if (hash > hash_end) {
-                        *end = hash_end;
-                        if (first && ent == area)
-                                *start = hash_end;
-                        break;
-                }
                 *end = hash;
                 CDEBUG(D_INODE, "%p %p %d "DFID": %#8.8x (%d)\"%*.*s\"\n",
                        area, ent, nob, PFID(fid), hash, len, len, len, name);
@@ -1194,8 +1187,7 @@ static int osd_readpage(const struct lu_context *ctxt,
                         rc = osd_dir_page_build(ctxt, !i, kmap(pg),
                                                 min_t(int, nob, CFS_PAGE_SIZE),
                                                 iops, it,
-                                                &hash_start, &hash_end,
-                                                rdpg->rp_hash_end, &last);
+                                                &hash_start, &hash_end, &last);
                         if (rc != 0 || i == rdpg->rp_npages - 1)
                                 last->lde_reclen = 0;
                         kunmap(pg);
index cf149d4..97686c2 100644 (file)
@@ -292,6 +292,7 @@ static const struct req_format *req_formats[] = {
         &RQF_MDS_PIN,
         &RQF_MDS_READPAGE,
         &RQF_MDS_DONE_WRITING,
+        &RQF_MDS_WRITEPAGE,
 };
 
 struct req_msg_field {
@@ -612,6 +613,11 @@ const struct req_format RQF_MDS_READPAGE =
                         mdt_body_only, mdt_body_only);
 EXPORT_SYMBOL(RQF_MDS_READPAGE);
 
+const struct req_format RQF_MDS_WRITEPAGE =
+        DEFINE_REQ_FMT0("MDS_WRITEPAGE",
+                        mdt_body_only, mdt_body_only);
+EXPORT_SYMBOL(RQF_MDS_WRITEPAGE);
+
 #if !defined(__REQ_LAYOUT_USER__)
 
 int req_layout_init(void)