From 6f10e43848928a3ff4c0a1591542afc139163284 Mon Sep 17 00:00:00 2001 From: yury Date: Sun, 23 Jul 2006 10:02:00 +0000 Subject: [PATCH] - added basic readdir stuff; - fixed bug in mdt_handle_common() --- lustre/include/dt_object.h | 3 ++ lustre/include/lu_object.h | 11 ++++ lustre/include/md_object.h | 10 ++++ lustre/llite/dir.c | 1 + lustre/mdd/mdd_handler.c | 15 +++++- lustre/mdt/mdt_handler.c | 132 +++++++++++++++++++++++++++++++++++++++++++-- lustre/osd/osd_handler.c | 35 +++++++++++- 7 files changed, 202 insertions(+), 5 deletions(-) diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index db80aea..98d3589 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -212,6 +212,9 @@ struct dt_object_operations { */ void (*do_ref_del)(const struct lu_context *ctxt, struct dt_object *dt, struct thandle *th); + + int (*do_readpage)(const struct lu_context *ctxt, + struct dt_object *dt, struct lu_rdpg *rdpg); }; /* diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 5779c8e..f6c0e2d 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -771,6 +771,17 @@ static inline int lu_object_assert_not_exists(const struct lu_context *ctx, return lu_object_exists(ctx, o) <= 0; } +struct lu_rdpg { + /* input params, should be filled out by mdt */ + loff_t rp_offset; + int rp_count; + int rp_npages; + struct page **rp_pages; + + /* output params, filled by osd */ + __u64 rp_size; +}; + /* * lu_context. Execution context for lu_object methods. Currently associated * with thread. diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 1bbfc87..4a1eacc 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -83,6 +83,9 @@ struct md_object_operations { struct md_attr *); int (*moo_open)(const struct lu_context *, struct md_object *); int (*moo_close)(const struct lu_context *, struct md_object *); + + int (*moo_readpage)(const struct lu_context *, struct md_object *, + struct lu_rdpg *); }; /* @@ -247,6 +250,13 @@ static inline int mo_close(const struct lu_context *cx, struct md_object *m) return m->mo_ops->moo_close(cx, m); } +static inline int mo_readpage(const struct lu_context *cx, struct md_object *m, + struct lu_rdpg *rdpg) +{ + LASSERT(m->mo_ops->moo_readpage); + return m->mo_ops->moo_readpage(cx, m, rdpg); +} + static inline int mo_object_create(const struct lu_context *cx, struct md_object *m, struct md_attr *at) { diff --git a/lustre/llite/dir.c b/lustre/llite/dir.c index 1da3a5d..7e38503 100644 --- a/lustre/llite/dir.c +++ b/lustre/llite/dir.c @@ -75,6 +75,7 @@ static int ll_dir_readpage(struct file *file, struct page *page) LASSERT (body != NULL); /* checked by md_readpage() */ LASSERT_REPSWABBED (request, 0); /* swabbed by md_readpage() */ + LASSERT((body->valid & OBD_MD_FLSIZE) != 0); inode->i_size = body->size; SetPageUptodate(page); } diff --git a/lustre/mdd/mdd_handler.c b/lustre/mdd/mdd_handler.c index d588915..8c807f5 100644 --- a/lustre/mdd/mdd_handler.c +++ b/lustre/mdd/mdd_handler.c @@ -1159,6 +1159,18 @@ static int mdd_close(const struct lu_context *ctxt, struct md_object *obj) return 0; } +static int mdd_readpage(const struct lu_context *ctxt, struct md_object *obj, + struct md_rdpg *rdpg) +{ + struct dt_object *next; + int rc; + + LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj))); + next = mdd_object_child(obj); + rc = next->do_ops->do_readpage(ctxt, next, rdpg); + return rc; +} + struct md_device_operations mdd_ops = { .mdo_root_get = mdd_root_get, .mdo_statfs = mdd_statfs, @@ -1186,7 +1198,8 @@ static struct md_object_operations mdd_obj_ops = { .moo_ref_add = mdd_ref_add, .moo_ref_del = mdd_ref_del, .moo_open = mdd_open, - .moo_close = mdd_close + .moo_close = mdd_close, + .moo_readpage = mdd_readpage }; static struct obd_ops mdd_obd_device_ops = { diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 46208ef..5871426 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -517,9 +517,133 @@ static int mdt_disconnect(struct mdt_thread_info *info) return target_handle_disconnect(mdt_info_req(info)); } +static int mdt_sendpage(struct mdt_thread_info *info, + struct mdt_object *object, + struct lu_rdpg *rdpg) +{ + int rc = 0, npages, i, tmpcount, tmpsize = 0; + struct ptlrpc_bulk_desc *desc; + struct ptlrpc_request *req; + struct l_wait_info lwi; + ENTRY; + + LASSERT((rdpg->rp_offset & (PAGE_SIZE - 1)) == 0); + + req = info->mti_pill.rc_req; + npages = (rdpg->rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT; + desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE, + MDS_BULK_PORTAL); + if (desc == NULL) + GOTO(free_desc, rc = -ENOMEM); + + for (i = 0, tmpcount = rdpg->rp_count; i < npages; i++, tmpcount -= tmpsize) { + tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount; + ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize); + } + + LASSERT(desc->bd_nob == rdpg->rp_count); + rc = ptlrpc_start_bulk_transfer(desc); + if (rc) + GOTO(free_desc, rc); + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { + CERROR("obd_fail_loc=%x, fail operation rc=%d\n", + OBD_FAIL_MDS_SENDPAGE, rc); + GOTO(abort_bulk, rc); + } + + lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL); + rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi); + LASSERT (rc == 0 || rc == -ETIMEDOUT); + + if (rc == 0) { + if (desc->bd_success && + desc->bd_nob_transferred == rdpg->rp_count) + GOTO(free_desc, rc); + + rc = -ETIMEDOUT; /* XXX should this be a different errno? */ + } + + DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n", + (rc == -ETIMEDOUT) ? "timeout" : "network error", + desc->bd_nob_transferred, rdpg->rp_count, + req->rq_export->exp_client_uuid.uuid, + req->rq_export->exp_connection->c_remote_uuid.uuid); + + class_fail_export(req->rq_export); + + EXIT; +abort_bulk: + ptlrpc_abort_bulk(desc); +free_desc: + ptlrpc_free_bulk(desc); + return rc; +} + static int mdt_readpage(struct mdt_thread_info *info) { - return -EOPNOTSUPP; + struct mdt_body *reqbody, *repbody; + int rc, i, tmpcount, tmpsize = 0; + struct lu_rdpg rdpg = { 0 }; + struct mdt_object *child; + ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) + RETURN(-ENOMEM); + + req_capsule_set(&info->mti_pill, &RQF_MDS_READPAGE); + rc = req_capsule_pack(&info->mti_pill); + if (rc) + RETURN(rc); + + reqbody = req_capsule_client_get(&info->mti_pill, + &RMF_MDT_BODY); + if (reqbody == NULL) + RETURN(-EFAULT); + + child = mdt_object_find(info->mti_ctxt, + info->mti_mdt, &reqbody->fid1); + if (IS_ERR(child)) + RETURN(PTR_ERR(child)); + + repbody = req_capsule_server_get(&info->mti_pill, + &RMF_MDT_BODY); + + if (repbody == NULL) + GOTO(out_child, -EFAULT); + + /* prepare @rdpg before calling lower layers and transfer itself. */ + rdpg.rp_offset = reqbody->size; + rdpg.rp_count = reqbody->nlink; + rdpg.rp_npages = (rdpg.rp_count + PAGE_SIZE - 1) >> PAGE_SHIFT; + + for (i = 0, tmpcount = rdpg.rp_count; + i < rdpg.rp_npages; i++, tmpcount -= tmpsize) { + rdpg.rp_pages[i] = alloc_pages(GFP_KERNEL, 0); + if (rdpg.rp_pages[i] == NULL) + GOTO(free_rdpg, rc = -ENOMEM); + } + + /* call lower layers to fill allocated pages with directory data */ + rc = mo_readpage(info->mti_ctxt, + mdt_object_child(child), &rdpg); + if (rc) + GOTO(free_rdpg, rc); + + repbody->size = rdpg.rp_size; + repbody->valid = OBD_MD_FLSIZE; + + /* send pages to client */ + rc = mdt_sendpage(info, child, &rdpg); + + EXIT; +free_rdpg: + for (i = 0; i < rdpg.rp_npages; i++) + if (rdpg.rp_pages[i]) + __free_pages(rdpg.rp_pages[i], 0); +out_child: + mdt_object_put(info->mti_ctxt, child); + return rc; } static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op) @@ -1059,7 +1183,6 @@ static int mdt_req_handle(struct mdt_thread_info *info, result = mdt_unpack_req_pack_rep(info, flags); } - if (result == 0 && flags & MUTABOR && req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY) result = -EROFS; @@ -1309,7 +1432,7 @@ static int mdt_handle_common(struct ptlrpc_request *req, mdt_thread_info_init(req, info); - result = mdt_handle0(req, info, mdt_handlers); + result = mdt_handle0(req, info, supported); mdt_thread_info_fini(info); RETURN(result); @@ -2795,6 +2918,9 @@ static struct mdt_opc_slice mdt_readpage_handlers[] = { .mos_opc_start = MDS_READPAGE, .mos_opc_end = MDS_READPAGE + 1, .mos_hs = mdt_mds_readpage_ops + }, + { + .mos_hs = NULL } }; diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index 3bec91d..ae5404b 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -820,6 +820,38 @@ int osd_xattr_set(const struct lu_context *ctxt, struct dt_object *dt, return 0; } +int osd_readpage(const struct lu_context *ctxt, + struct dt_object *dt, struct lu_rdpg *rdpg) +{ + struct osd_object *obj = osd_dt_obj(dt); + LASSERT(lu_object_exists(ctxt, &dt->do_lu)); + LASSERT(osd_invariant(obj)); + + LASSERT(rdpg->rp_pages != NULL); + + /* check input params */ + if ((rdpg->rp_offset & (obj->oo_inode->i_blksize - 1)) != 0) { + CERROR("offset "LPU64" not on a block boundary of %lu\n", + rdpg->rp_offset, obj->oo_inode->i_blksize); + return -EFAULT; + } + + if (rdpg->rp_count & (obj->oo_inode->i_blksize - 1)) { + CERROR("size %u is not multiple of blocksize %lu\n", + rdpg->rp_count, obj->oo_inode->i_blksize); + return -EFAULT; + } + + /* prepare output */ + rdpg->rp_size = obj->oo_inode->i_size; + + /* + * XXX: here should be filling pages by dir data in ext3 format and + * ->rp_pages already should be allocated. + */ + return 0; +} + static struct dt_object_operations osd_obj_ops = { .do_lock = osd_object_lock, .do_unlock = osd_object_unlock, @@ -830,7 +862,8 @@ static struct dt_object_operations osd_obj_ops = { .do_ref_add = osd_object_ref_add, .do_ref_del = osd_object_ref_del, .do_xattr_get = osd_xattr_get, - .do_xattr_set = osd_xattr_set + .do_xattr_set = osd_xattr_set, + .do_readpage = osd_readpage }; static struct dt_body_operations osd_body_ops = { -- 1.8.3.1