From: wangdi Date: Fri, 1 Sep 2006 13:01:50 +0000 (+0000) Subject: Branch: b_new_cmd X-Git-Tag: v1_8_0_110~486^2~1041 X-Git-Url: https://git.whamcloud.com/gitweb?a=commitdiff_plain;h=b77685b6ef2b965fff8a60ccbfc211d725942982;p=fs%2Flustre-release.git Branch: b_new_cmd some updates and fixes for splitting dir --- diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index 7ea4174..00ba740 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -125,6 +125,12 @@ struct cmm_object *cmm_object_find(const struct lu_context *ctxt, RETURN(m); } +static inline void cmm_object_put(const struct lu_context *ctxt, + struct cmm_object *o) +{ + lu_object_put(ctxt, &o->cmo_obj.mo_lu); +} + static int cmm_creat_remote_obj(const struct lu_context *ctx, struct cmm_device *cmm, struct lu_fid *fid, struct md_attr *ma) @@ -143,7 +149,8 @@ static int cmm_creat_remote_obj(const struct lu_context *ctx, spec, ma); OBD_FREE_PTR(spec); - RETURN(0); + cmm_object_put(ctx, obj); + RETURN(rc); } static int cmm_create_slave_objects(const struct lu_context *ctx, @@ -157,16 +164,17 @@ static int cmm_create_slave_objects(const struct lu_context *ctx, lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1); + /* This lmv will be free after finish splitting. */ OBD_ALLOC(lmv, lmv_size); if (!lmv) RETURN(-ENOMEM); - lmv->mea_master = -1; + lmv->mea_master = -1; lmv->mea_magic = MEA_MAGIC_ALL_CHARS; lmv->mea_count = cmm->cmm_tgt_count + 1; lmv->mea_ids[0] = *lf; - + rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1], cmm->cmm_tgt_count); if (rc) GOTO(cleanup, rc); @@ -177,33 +185,60 @@ static int cmm_create_slave_objects(const struct lu_context *ctx, GOTO(cleanup, rc); } - rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size, + rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size, MDS_LMV_MD_NAME, 0); + + ma->ma_lmv_size = lmv_size; + ma->ma_lmv = lmv; cleanup: - OBD_FREE(lmv, lmv_size); RETURN(rc); } static int cmm_send_split_pages(const struct lu_context *ctx, - struct md_object *mo, struct lu_rdpg *rdpg) + struct md_object *mo, struct lu_rdpg *rdpg, + struct lu_fid *fid) { - RETURN(0); + struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); + struct cmm_object *obj; + int rc = 0, i; + ENTRY; + + obj = cmm_object_find(ctx, cmm, fid); + if (IS_ERR(obj)) + RETURN(PTR_ERR(obj)); + + for (i = 0; i < rdpg->rp_npages; i++) { + rc = mo_sendpage(ctx, md_object_next(&obj->cmo_obj), + rdpg->rp_pages[i]); + if (rc) + GOTO(cleanup, rc); + } +cleanup: + cmm_object_put(ctx, obj); + RETURN(rc); } static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo, - struct lu_rdpg *rdpg) + struct lu_rdpg *rdpg, struct lu_fid *lf) { struct lu_dirpage *dp; __u32 hash_end; - int rc; + int rc, i; ENTRY; + /* init page with '0' */ + for (i = 0; i < rdpg->rp_npages; i++) { + memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE); + kunmap(rdpg->rp_pages[i]); + } + + /* Read splitted page and send them to the slave master */ do { rc = mo_readpage(ctx, md_object_next(mo), rdpg); if (rc) RETURN(rc); - rc = cmm_send_split_pages(ctx, mo, rdpg); + rc = cmm_send_split_pages(ctx, mo, rdpg, lf); if (rc) RETURN(rc); @@ -218,13 +253,32 @@ static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo, } static int cmm_remove_entries(const struct lu_context *ctx, - struct md_object *mo, __u32 start, __u32 end) + struct md_object *mo, struct lu_rdpg *rdpg) { - RETURN(0); + struct lu_dirpage *dp; + struct lu_dirent *ent; + int rc = 0, i; + ENTRY; + + for (i = 0; i < rdpg->rp_npages; i++) { + kmap(rdpg->rp_pages[i]); + dp = page_address(rdpg->rp_pages[i]); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + rc = mdo_name_remove(ctx, md_object_next(mo), + ent->lde_name); + if (rc) { + kunmap(rdpg->rp_pages[i]); + RETURN(rc); + } + } + kunmap(rdpg->rp_pages[i]); + } + RETURN(rc); } + #define MAX_HASH_SIZE 0x3fffffff #define SPLIT_PAGE_COUNT 1 - static int cmm_scan_and_split(const struct lu_context *ctx, struct md_object *mo, struct md_attr *ma) { @@ -249,16 +303,17 @@ static int cmm_scan_and_split(const struct lu_context *ctx, if (rdpg->rp_pages[i] == NULL) GOTO(cleanup, rc = -ENOMEM); } - + hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count; for (i = 1; i < cmm->cmm_tgt_count; i++) { + struct lu_fid *lf = &ma->ma_lmv->mea_ids[i]; + rdpg->rp_hash = i * hash_segement; rdpg->rp_hash_end = rdpg->rp_hash + hash_segement; - rc = cmm_split_entries(ctx, mo, rdpg); + rc = cmm_split_entries(ctx, mo, rdpg, lf); if (rc) GOTO(cleanup, rc); - rc = cmm_remove_entries(ctx, mo, rdpg->rp_hash, - rdpg->rp_hash_end); + rc = cmm_remove_entries(ctx, mo, rdpg); if (rc) GOTO(cleanup, rc); } @@ -305,7 +360,11 @@ int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo) /* step3: scan and split the object */ rc = cmm_scan_and_split(ctx, mo, ma); + cleanup: + if (ma->ma_lmv_size && ma->ma_lmv) + OBD_FREE(ma->ma_lmv, ma->ma_lmv_size); + OBD_FREE_PTR(ma); RETURN(rc); diff --git a/lustre/cmm/mdc_object.c b/lustre/cmm/mdc_object.c index 316f6f5..491b31e 100644 --- a/lustre/cmm/mdc_object.c +++ b/lustre/cmm/mdc_object.c @@ -236,10 +236,28 @@ static int mdc_ref_del(const struct lu_context *ctx, struct md_object *mo, RETURN(rc); } +#ifdef HAVE_SPLIT_SUPPORT +static int mdc_sendpage(const struct lu_context *ctx, struct md_object *mo, + const struct page *page) +{ + struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); + int rc; + ENTRY; + + rc = md_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), + page); + + RETURN(rc); +} +#endif + static struct md_object_operations mdc_mo_ops = { .moo_object_create = mdc_object_create, .moo_ref_add = mdc_ref_add, .moo_ref_del = mdc_ref_del, +#ifdef HAVE_SPLIT_SUPPORT + .moo_sendpage = mdc_sendpage, +#endif }; /* md_dir_operations */ diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 325811b..b6d1ddf 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -294,6 +294,22 @@ struct lu_dirpage { struct lu_dirent ldp_entries[0]; }; +static inline struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp) +{ + return dp->ldp_entries; +} + +static inline struct lu_dirent *lu_dirent_next(struct lu_dirent *ent) +{ + struct lu_dirent *next; + + if (ent->lde_reclen != 0) + next = ((void *)ent) + le16_to_cpu(ent->lde_reclen); + else + next = NULL; + return next; +} + #define MEA_MAGIC_LAST_CHAR 0xb2221ca1 #define MEA_MAGIC_ALL_CHARS 0xb222a11c @@ -813,6 +829,7 @@ typedef enum { MDS_QUOTACTL = 48, MDS_GETXATTR = 49, MDS_SETXATTR = 50, + MDS_WRITEPAGE = 51, MDS_LAST_OPC } mds_cmd_t; diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index 44b876f..e9c7fdd 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -103,6 +103,9 @@ extern const struct req_format RQF_MDS_CONNECT; extern const struct req_format RQF_MDS_DISCONNECT; extern const struct req_format RQF_MDS_READPAGE; extern const struct req_format RQF_MDS_DONE_WRITING; +#ifdef HAVE_SPLIT_SUPPORT +extern const struct req_format RQF_MDS_WRITEPAGE; +#endif /* * This is format of direct (non-intent) MDS_GETATTR_NAME request. diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 3cd8723e..048919c 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -62,7 +62,7 @@ struct md_attr { struct lu_attr ma_attr; struct lov_mds_md *ma_lmm; int ma_lmm_size; - struct lmv_mds_md *ma_lmv; + struct lmv_stripe_md *ma_lmv; int ma_lmv_size; struct llog_cookie *ma_cookie; int ma_cookie_size; @@ -111,6 +111,12 @@ struct md_object_operations { int (*moo_readpage)(const struct lu_context *, struct md_object *, const struct lu_rdpg *); +#ifdef HAVE_SPLIT_SUPPORT + int (*moo_writepage)(const struct lu_context *, struct md_object *, + const struct page *page); + int (*moo_sendpage)(const struct lu_context *, struct md_object *, + const struct page*); +#endif int (*moo_readlink)(const struct lu_context *ctxt, struct md_object *obj, void *buf, int buf_len); @@ -318,6 +324,22 @@ static inline int mo_readpage(const struct lu_context *cx, struct md_object *m, return m->mo_ops->moo_readpage(cx, m, rdpg); } +#ifdef HAVE_SPLIT_SUPPORT +static inline int mo_writepage(const struct lu_context *cx, struct md_object *m, + const struct page *pg) +{ + LASSERT(m->mo_ops->moo_writepage); + return m->mo_ops->moo_writepage(cx, m, pg); +} + +static inline int mo_sendpage(const struct lu_context *cx, struct md_object *m, + const struct page *pg) +{ + LASSERT(m->mo_ops->moo_sendpage); + return m->mo_ops->moo_sendpage(cx, m, pg); +} +#endif + static inline int mo_object_create(const struct lu_context *cx, struct md_object *m, const struct md_create_spec *spc, diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 6b029ed..5abc211 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1126,6 +1126,10 @@ struct md_ops { struct ptlrpc_request **); int (*m_readpage)(struct obd_export *, const struct lu_fid *, __u64, struct page *, struct ptlrpc_request **); +#ifdef HAVE_SPLIT_SUPPORT + int (*m_sendpage)(struct obd_export *, const struct lu_fid *, + const struct page *); +#endif int (*m_unlink)(struct obd_export *, struct md_op_data *, struct ptlrpc_request **); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 7e36cbb..216a927 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1751,6 +1751,20 @@ static inline int md_readpage(struct obd_export *exp, RETURN(rc); } +#ifdef HAVE_SPLIT_SUPPORT +static inline int md_sendpage(struct obd_export *exp, + const struct lu_fid *fid, + const struct page *page) +{ + int rc; + ENTRY; + EXP_CHECK_MD_OP(exp, sendpage); + MD_COUNTER_INCREMENT(exp->exp_obd, sendpage); + rc = MDP(exp->exp_obd, sendpage)(exp, fid, page); + RETURN(rc); +} +#endif + static inline int md_unlink(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 40e57cd..e38e2b1 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -92,6 +92,8 @@ extern int obd_race_state; #define OBD_FAIL_MDS_SETXATTR_NET 0x132 #define OBD_FAIL_MDS_SETXATTR 0x133 #define OBD_FAIL_MDS_SETXATTR_WRITE 0x134 +#define OBD_FAIL_MDS_WRITEPAGE_NET 0x135 +#define OBD_FAIL_MDS_WRITEPAGE_PACK 0x136 #define OBD_FAIL_OST 0x200 #define OBD_FAIL_OST_CONNECT_NET 0x201 diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 983db99..9b361d2 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -393,22 +393,6 @@ static loff_t ll_llseek(struct file *filp, loff_t off, int whence) return default_llseek(filp, off, whence); } -static struct lu_dirent *lu_dirent_start(struct lu_dirpage *dp) -{ - return dp->ldp_entries; -} - -static struct lu_dirent *lu_dirent_next(struct lu_dirent *ent) -{ - struct lu_dirent *next; - - if (ent->lde_reclen != 0) - next = ((void *)ent) + le16_to_cpu(ent->lde_reclen); - else - next = NULL; - return next; -} - int ll_readdir(struct file *filp, void *cookie, filldir_t filldir) { struct inode *inode = filp->f_dentry->d_inode; diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 7eeb499..d754d82 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -728,6 +728,42 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data) RETURN(rc); } +#ifdef HAVE_SPLIT_SUPPORT +int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid, + const struct page *page) +{ + struct obd_import *imp = class_exp2cliimp(exp); + struct ptlrpc_request *req = NULL; + struct ptlrpc_bulk_desc *desc = NULL; + struct mdt_body *body; + int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + ENTRY; + + CDEBUG(D_INODE, "object: "DFID"\n", PFID(fid)); + + req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_WRITEPAGE, 2, size, + NULL); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + req->rq_request_portal = MDS_READPAGE_PORTAL; + + desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); + /* NB req now owns desc and will free it when it gets freed */ + ptlrpc_prep_bulk_page(desc, (struct page*)page, 0, PAGE_CACHE_SIZE); + + mdc_readdir_pack(req, REQ_REC_OFF, 0, PAGE_CACHE_SIZE, fid); + + ptlrpc_req_set_repsize(req, 2, size); + rc = ptlrpc_queue_wait(req); +out: + ptlrpc_req_finished(req); + RETURN(rc); +} +#endif + int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid, __u64 offset, struct page *page, struct ptlrpc_request **request) @@ -1374,6 +1410,9 @@ struct md_ops mdc_md_ops = { .m_getxattr = mdc_getxattr, .m_sync = mdc_sync, .m_readpage = mdc_readpage, +#ifdef HAVE_SPLIT_SUPPORT + .m_sendpage = mdc_sendpage, +#endif .m_unlink = mdc_unlink, .m_cancel_unused = mdc_cancel_unused, .m_init_ea_size = mdc_init_ea_size, diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index c4069a3..4be567e 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -561,6 +561,122 @@ out: return rc; } +#ifdef HAVE_SPLIT_SUPPORT +/* + * Retrieve dir entry from the page and insert it to the + * slave object, actually, this should be in osd layer, + * but since it will not in the final product, so just do + * it here and do not define more moo api anymore for + * this. + */ +static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page) +{ + struct mdt_object *object = info->mti_object; + struct lu_dirpage *dp; + struct lu_dirent *ent; + int rc = 0; + + kmap(page); + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + struct lu_fid *lf = &ent->lde_fid; + + /* FIXME: check isdir */ + rc = mdo_name_insert(info->mti_ctxt, + md_object_next(&object->mot_obj), + ent->lde_name, lf, 0); + /*FIXME: add cross_flags*/ + if (rc) { + kunmap(page); + RETURN(rc); + } + } + kunmap(page); + + RETURN(rc); +} + +static int mdt_bulk_timeout(void *data) +{ + ENTRY; + /* We don't fail the connection here, because having the export + * killed makes the (vital) call to commitrw very sad. + */ + RETURN(1); +} + +static int mdt_writepage(struct mdt_thread_info *info) +{ + struct ptlrpc_request *req = mdt_info_req(info); + struct l_wait_info *lwi; + struct ptlrpc_bulk_desc *desc; + struct page *page; + int rc; + ENTRY; + + desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL); + if (desc) + RETURN(-ENOMEM); + + /* allocate the page for the desc */ + page = alloc_pages(GFP_KERNEL, 0); + if (!page) + GOTO(desc_cleanup, rc = -ENOMEM); + + ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE); + + /* FIXME: following parts are copied from ost_brw_write */ + + /* Check if client was evicted while we were doing i/o before touching + network */ + OBD_ALLOC_PTR(lwi); + if (!lwi) + GOTO(cleanup_page, rc = -ENOMEM); + + if (desc->bd_export->exp_failed) + rc = -ENOTCONN; + else + rc = ptlrpc_start_bulk_transfer (desc); + if (rc == 0) { + *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ, + mdt_bulk_timeout, desc); + rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) || + desc->bd_export->exp_failed, lwi); + LASSERT(rc == 0 || rc == -ETIMEDOUT); + if (rc == -ETIMEDOUT) { + DEBUG_REQ(D_ERROR, req, "timeout on bulk GET"); + ptlrpc_abort_bulk(desc); + } else if (desc->bd_export->exp_failed) { + DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET"); + rc = -ENOTCONN; + ptlrpc_abort_bulk(desc); + } else if (!desc->bd_success || + desc->bd_nob_transferred != desc->bd_nob) { + DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)", + desc->bd_success ? + "truncated" : "network error on", + desc->bd_nob_transferred, desc->bd_nob); + /* XXX should this be a different errno? */ + rc = -ETIMEDOUT; + } + } else { + DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc); + } + if (rc) + GOTO(cleanup_lwi, rc); + rc = mdt_write_dir_page(info, page); + +cleanup_lwi: + OBD_FREE_PTR(lwi); +cleanup_page: + __free_pages(page, 0); +desc_cleanup: + ptlrpc_free_bulk(desc); + RETURN(rc); +} +#endif + static int mdt_readpage(struct mdt_thread_info *info) { struct mdt_object *object = info->mti_object; @@ -3015,6 +3131,9 @@ static struct mdt_opc_slice mdt_regular_handlers[] = { static struct mdt_handler mdt_readpage_ops[] = { DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage), +#ifdef HAVE_SPLIT_SUPPORT + DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage), +#endif /* * XXX: this is ugly and should be fixed one day, see mdc_close() for diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index 6c872ed..5226cba 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -611,6 +611,13 @@ const struct req_format RQF_MDS_READPAGE = mdt_body_only, mdt_body_only); EXPORT_SYMBOL(RQF_MDS_READPAGE); +#ifdef HAVE_SPLIT_SUPPORT +const struct req_format RQF_MDS_WRITEPAGE = + DEFINE_REQ_FMT0("MDS_WRITEPAGE", + mdt_body_only, mdt_body_only); +EXPORT_SYMBOL(RQF_MDS_WRITEPAGE); +#endif + #if !defined(__REQ_LAYOUT_USER__) int req_layout_init(void)