From 060d5266b005504133b381c8e46f59e181d3d7dc Mon Sep 17 00:00:00 2001 From: wangdi Date: Sat, 9 Sep 2006 08:34:15 +0000 Subject: [PATCH] Branch: b_new_cmd some fixes about split dir --- lustre/cmm/cmm_device.c | 4 +- lustre/cmm/cmm_internal.h | 4 ++ lustre/cmm/cmm_split.c | 115 ++++++++++++++++++------------------- lustre/cmm/mdc_internal.h | 2 +- lustre/cmm/mdc_object.c | 23 ++++++-- lustre/include/lu_object.h | 2 - lustre/include/lustre_mdc.h | 2 +- lustre/include/lustre_req_layout.h | 1 + lustre/mdc/mdc_request.c | 6 +- lustre/mdt/mdt_handler.c | 2 - lustre/osd/osd_handler.c | 12 +--- lustre/ptlrpc/layout.c | 6 ++ 12 files changed, 94 insertions(+), 85 deletions(-) diff --git a/lustre/cmm/cmm_device.c b/lustre/cmm/cmm_device.c index f724c3f..ca18e79 100644 --- a/lustre/cmm/cmm_device.c +++ b/lustre/cmm/cmm_device.c @@ -51,8 +51,8 @@ static inline int lu_device_is_cmm(struct lu_device *d) return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &cmm_lu_ops); } -static int cmm_root_get(const struct lu_context *ctx, struct md_device *md, - struct lu_fid *fid) +int cmm_root_get(const struct lu_context *ctx, struct md_device *md, + struct lu_fid *fid) { struct cmm_device *cmm_dev = md2cmm_dev(md); /* valid only on master MDS */ diff --git a/lustre/cmm/cmm_internal.h b/lustre/cmm/cmm_internal.h index debf1b6..3274ed2 100644 --- a/lustre/cmm/cmm_internal.h +++ b/lustre/cmm/cmm_internal.h @@ -118,10 +118,14 @@ struct lu_object *cmm_object_alloc(const struct lu_context *ctx, const struct lu_object_header *hdr, struct lu_device *); +int cmm_root_get(const struct lu_context *ctx, struct md_device *md, + struct lu_fid *fid); + #ifdef HAVE_SPLIT_SUPPORT /* cmm_split.c */ int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo); #endif + #endif /* __KERNEL__ */ #endif /* _CMM_INTERNAL_H */ diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index 537ef80..53a7aac 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -39,47 +39,46 @@ #include "cmm_internal.h" #include "mdc_internal.h" -struct cmm_thread_info { - struct md_attr cti_ma; -}; - -struct lu_context_key cmm_thread_key; -struct cmm_thread_info *cmm_ctx_info(const struct lu_context *ctx) -{ - struct cmm_thread_info *info; - - info = lu_context_key_get(ctx, &cmm_thread_key); - LASSERT(info != NULL); - return info; -} - #define CMM_NO_SPLIT_EXPECTED 0 #define CMM_EXPECT_SPLIT 1 #define CMM_NO_SPLITTABLE 2 -#define SPLIT_SIZE 64*1024 +#define SPLIT_SIZE 8*1024 + +static inline struct lu_fid* cmm2_fid(struct cmm_object *obj) +{ + return &(obj->cmo_obj.mo_lu.lo_header->loh_fid); +} static int cmm_expect_splitting(const struct lu_context *ctx, struct md_object *mo, struct md_attr *ma) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); + struct lu_fid *fid = NULL; + int rc = CMM_EXPECT_SPLIT; ENTRY; - if (cmm->cmm_tgt_count == 1) - RETURN(CMM_NO_SPLIT_EXPECTED); + if (cmm->cmm_tgt_count == 0) + GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); if (ma->ma_attr.la_size < SPLIT_SIZE) - RETURN(CMM_NO_SPLIT_EXPECTED); + GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); if (ma->ma_lmv_size) - RETURN(CMM_NO_SPLIT_EXPECTED); - - RETURN(CMM_EXPECT_SPLIT); -} + GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); + + OBD_ALLOC_PTR(fid); + rc = cmm_root_get(ctx, &cmm->cmm_md_dev, fid); + if (rc) + GOTO(cleanup, rc); + + if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo)))) + GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); -static inline struct lu_fid* cmm2_fid(struct cmm_object *obj) -{ - return &(obj->cmo_obj.mo_lu.lo_header->loh_fid); +cleanup: + if (fid) + OBD_FREE_PTR(fid); + RETURN(rc); } #define cmm_md_size(stripes) \ @@ -97,9 +96,8 @@ static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm, spin_lock(&cmm->cmm_tgt_guard); list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) { - if (cmm->cmm_local_num == mc->mc_num) - continue; - + LASSERT(cmm->cmm_local_num != mc->mc_num); + rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL); if (rc < 0) { spin_unlock(&cmm->cmm_tgt_guard); @@ -107,7 +105,7 @@ static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm, } } spin_unlock(&cmm->cmm_tgt_guard); - LASSERT(i + 1 == count); + LASSERT(i == count); if (rc == 1) rc = 0; RETURN(rc); @@ -168,7 +166,7 @@ static int cmm_create_slave_objects(const struct lu_context *ctx, struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo)); ENTRY; - lmv_size = cmm_md_size(cmm->cmm_tgt_count); + lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1); /* This lmv will be free after finish splitting. */ OBD_ALLOC(lmv, lmv_size); @@ -202,7 +200,7 @@ cleanup: static int cmm_send_split_pages(const struct lu_context *ctx, struct md_object *mo, struct lu_rdpg *rdpg, - struct lu_fid *fid) + struct lu_fid *fid, __u32 hash_end) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct cmm_object *obj; @@ -215,49 +213,48 @@ static int cmm_send_split_pages(const struct lu_context *ctx, for (i = 0; i < rdpg->rp_npages; i++) { rc = mdc_send_page(ctx, md_object_next(&obj->cmo_obj), - rdpg->rp_pages[i]); + rdpg->rp_pages[i], hash_end); if (rc) - GOTO(cleanup, rc); + break; } -cleanup: cmm_object_put(ctx, obj); RETURN(rc); } static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo, - struct lu_rdpg *rdpg, struct lu_fid *lf) + struct lu_rdpg *rdpg, struct lu_fid *lf, + __u32 end) { - struct lu_dirpage *dp; - __u32 hash_end; int rc, i; ENTRY; - /* init page with '0' */ - for (i = 0; i < rdpg->rp_npages; i++) { - memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE); - kunmap(rdpg->rp_pages[i]); - } - /* Read splitted page and send them to the slave master */ do { + /* init page with '0' */ + for (i = 0; i < rdpg->rp_npages; i++) { + memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE); + kunmap(rdpg->rp_pages[i]); + } + rc = mo_readpage(ctx, md_object_next(mo), rdpg); + + /* -E2BIG means it already reach the end of the dir */ + if (rc == -E2BIG) + RETURN(0); if (rc) RETURN(rc); - rc = cmm_send_split_pages(ctx, mo, rdpg, lf); - if (rc) - RETURN(rc); + rc = cmm_send_split_pages(ctx, mo, rdpg, lf, end); - dp = kmap(rdpg->rp_pages[0]); - hash_end = dp->ldp_hash_end; - kunmap(rdpg->rp_pages[0]); - if (hash_end == ~0ul) - break; - } while (hash_end < rdpg->rp_hash_end); - + } while (rc == 0); + + /* it means already finish splitting this segment */ + if (rc == -E2BIG) + rc = 0; RETURN(rc); } +#if 0 static int cmm_remove_entries(const struct lu_context *ctx, struct md_object *mo, struct lu_rdpg *rdpg) { @@ -282,7 +279,7 @@ static int cmm_remove_entries(const struct lu_context *ctx, } RETURN(rc); } - +#endif #define MAX_HASH_SIZE 0x3fffffff #define SPLIT_PAGE_COUNT 1 static int cmm_scan_and_split(const struct lu_context *ctx, @@ -313,14 +310,12 @@ static int cmm_scan_and_split(const struct lu_context *ctx, hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count; for (i = 1; i < cmm->cmm_tgt_count; i++) { struct lu_fid *lf = &ma->ma_lmv->mea_ids[i]; + __u32 hash_end; rdpg->rp_hash = i * hash_segement; - rdpg->rp_hash_end = rdpg->rp_hash + hash_segement; - rc = cmm_remove_entries(ctx, mo, rdpg); - if (rc) - GOTO(cleanup, rc); - - rc = cmm_split_entries(ctx, mo, rdpg, lf); + hash_end = rdpg->rp_hash + hash_segement; + + rc = cmm_split_entries(ctx, mo, rdpg, lf, hash_end); if (rc) GOTO(cleanup, rc); } diff --git a/lustre/cmm/mdc_internal.h b/lustre/cmm/mdc_internal.h index 0141529..f5a5d30 100644 --- a/lustre/cmm/mdc_internal.h +++ b/lustre/cmm/mdc_internal.h @@ -97,7 +97,7 @@ struct lu_object *mdc_object_alloc(const struct lu_context *, struct lu_device *); #ifdef HAVE_SPLIT_SUPPORT int mdc_send_page(const struct lu_context *ctx, struct md_object *mo, - struct page *page); + struct page *page, __u32 end); #endif #endif /* __KERNEL__ */ diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c index bc7b991..ee96208 100644 --- a/lustre/cmm/mdc_object.c +++ b/lustre/cmm/mdc_object.c @@ -246,18 +246,24 @@ static int mdc_ref_del(const struct lu_context *ctx, struct md_object *mo, #ifdef HAVE_SPLIT_SUPPORT int mdc_send_page(const struct lu_context *ctx, struct md_object *mo, - struct page *page) + struct page *page, __u32 end) { struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); struct lu_dirpage *dp; struct lu_dirent *ent; - int rc; + int rc, offset = 0, rc1 = 0; ENTRY; kmap(page); dp = page_address(page); for (ent = lu_dirent_start(dp); ent != NULL; ent = lu_dirent_next(ent)) { + if (ent->lde_hash < end) { + offset = (int)((__u32)ent - (__u32)dp); + rc1 = -E2BIG; + goto send_page; + } + /* allocate new fid for each obj */ rc = obd_fid_alloc(mc->mc_desc.cl_exp, &ent->lde_fid, NULL); if (rc) { @@ -266,8 +272,17 @@ int mdc_send_page(const struct lu_context *ctx, struct md_object *mo, } } kunmap(page); - rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), - page); + offset = CFS_PAGE_SIZE; +send_page: + if (offset > 0) { + rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), + page, offset); + CDEBUG(D_INFO, "send page %p offset %d fid "DFID" rc %d \n", + page, offset, PFID(lu_object_fid(&mo->mo_lu)), + rc); + } + if (rc == 0) + rc = rc1; RETURN(rc); } #endif diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 2529ac1..f27f456 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -807,8 +807,6 @@ static inline const __u32 lu_object_attr(const struct lu_object *o) struct lu_rdpg { /* input params, should be filled out by mdt */ __u32 rp_hash; /* hash */ - __u32 rp_hash_end; /* hash end, means reading the - entry until this hash*/ int rp_count; /* count in bytes */ int rp_npages; /* number of pages */ struct page **rp_pages; /* pointers to pages */ diff --git a/lustre/include/lustre_mdc.h b/lustre/include/lustre_mdc.h index 8273421..3694e1c 100644 --- a/lustre/include/lustre_mdc.h +++ b/lustre/include/lustre_mdc.h @@ -37,6 +37,6 @@ void it_set_disposition(struct lookup_intent *it, int flag); int it_open_error(int phase, struct lookup_intent *it); #ifdef HAVE_SPLIT_SUPPORT int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid, - const struct page *page); + const struct page *page, int offset); #endif #endif diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index 6ea8165..2b65149 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -109,6 +109,7 @@ extern const struct req_format RQF_MDS_PIN; extern const struct req_format RQF_MDS_CONNECT; extern const struct req_format RQF_MDS_DISCONNECT; extern const struct req_format RQF_MDS_READPAGE; +extern const struct req_format RQF_MDS_WRITEPAGE; extern const struct req_format RQF_MDS_DONE_WRITING; #ifdef HAVE_SPLIT_SUPPORT extern const struct req_format RQF_MDS_WRITEPAGE; diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 3ed3ccf..8ed7055 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -730,7 +730,7 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data) #ifdef HAVE_SPLIT_SUPPORT int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid, - const struct page *page) + const struct page *page, int offset) { struct obd_import *imp = class_exp2cliimp(exp); struct ptlrpc_request *req = NULL; @@ -752,9 +752,9 @@ int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid, if (desc == NULL) GOTO(out, rc = -ENOMEM); /* NB req now owns desc and will free it when it gets freed */ - ptlrpc_prep_bulk_page(desc, (struct page*)page, 0, PAGE_CACHE_SIZE); + ptlrpc_prep_bulk_page(desc, (struct page*)page, 0, offset); - mdc_readdir_pack(req, REQ_REC_OFF, 0, PAGE_CACHE_SIZE, fid); + mdc_readdir_pack(req, REQ_REC_OFF, 0, offset, fid); ptlrpc_req_set_repsize(req, 2, size); rc = ptlrpc_queue_wait(req); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index e90bc79..02207d2 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -596,7 +596,6 @@ static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page) } } kunmap(page); - RETURN(rc); } @@ -704,7 +703,6 @@ static int mdt_readpage(struct mdt_thread_info *info) * reqbody->nlink contains number bytes to read. */ rdpg->rp_hash = reqbody->size; - rdpg->rp_hash_end = ~0ul; if ((__u64)rdpg->rp_hash != reqbody->size) { CERROR("Invalid hash: %#llx != %#llx\n", (__u64)rdpg->rp_hash, reqbody->size); diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 542eab5..30f517b 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -1077,8 +1077,7 @@ static int osd_xattr_del(const struct lu_context *ctxt, struct dt_object *dt, static int osd_dir_page_build(const struct lu_context *ctx, int first, void *area, int nob, struct dt_it_ops *iops, struct dt_it *it, - __u32 *start, __u32 *end, __u32 hash_end, - struct lu_dirent **last) + __u32 *start, __u32 *end, struct lu_dirent **last) { int result; struct osd_thread_info *info = lu_context_key_get(ctx, &osd_key); @@ -1109,12 +1108,6 @@ static int osd_dir_page_build(const struct lu_context *ctx, int first, recsize = (sizeof *ent + len + 3) & ~3; hash = iops->store(ctx, it); - if (hash > hash_end) { - *end = hash_end; - if (first && ent == area) - *start = hash_end; - break; - } *end = hash; CDEBUG(D_INODE, "%p %p %d "DFID": %#8.8x (%d)\"%*.*s\"\n", area, ent, nob, PFID(fid), hash, len, len, len, name); @@ -1194,8 +1187,7 @@ static int osd_readpage(const struct lu_context *ctxt, rc = osd_dir_page_build(ctxt, !i, kmap(pg), min_t(int, nob, CFS_PAGE_SIZE), iops, it, - &hash_start, &hash_end, - rdpg->rp_hash_end, &last); + &hash_start, &hash_end, &last); if (rc != 0 || i == rdpg->rp_npages - 1) last->lde_reclen = 0; kunmap(pg); diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index cf149d4..97686c2 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -292,6 +292,7 @@ static const struct req_format *req_formats[] = { &RQF_MDS_PIN, &RQF_MDS_READPAGE, &RQF_MDS_DONE_WRITING, + &RQF_MDS_WRITEPAGE, }; struct req_msg_field { @@ -612,6 +613,11 @@ const struct req_format RQF_MDS_READPAGE = mdt_body_only, mdt_body_only); EXPORT_SYMBOL(RQF_MDS_READPAGE); +const struct req_format RQF_MDS_WRITEPAGE = + DEFINE_REQ_FMT0("MDS_WRITEPAGE", + mdt_body_only, mdt_body_only); +EXPORT_SYMBOL(RQF_MDS_WRITEPAGE); + #if !defined(__REQ_LAYOUT_USER__) int req_layout_init(void) -- 1.8.3.1