From b339c5a672ed7d20747bb3653ee72865835b4d08 Mon Sep 17 00:00:00 2001 From: pschwan Date: Fri, 17 May 2002 16:18:11 +0000 Subject: [PATCH] * Split struct niobuf into niobuf_local and niobuf_remote - niobuf_remote is offset, length, xid, and flags - niobuf_local is all of the above, plus an address and sometimes a page - The former is sent across the network, the latter used internally * Small ldlm fixes brought over from the (now-defunct) ldlm_testing branch - SMP deadlock fix - comment fix * Bulk descriptor refactoring - You create a bulk descriptor and then n bulk pages that get hooked in - Pages sent all at once, optional callback per page - Another optional callback when the final ack has been received, although Eric tells me that elan doesn't guarantee packet ordering, so this needs revisited * A few key bugfixes in the MDC/MDS/OSC/OST bulk code; these probably bit us if we sent it a signal during bulk processing * A few LOV pieces (mostly in genops.c) - A temporary gen_multi_setup/cleanup to get the LOV rolling; it won't remain in this form I've tested these fixes, but not exhaustively. --- lustre/include/linux/lustre_idl.h | 40 +++--- lustre/include/linux/lustre_net.h | 32 +++-- lustre/include/linux/obd.h | 12 +- lustre/include/linux/obd_class.h | 12 +- lustre/include/linux/obd_ext2.h | 2 +- lustre/include/linux/obd_ost.h | 6 +- lustre/ldlm/ldlm_lock.c | 3 +- lustre/ldlm/ldlm_request.c | 1 - lustre/lib/obd_pack.c | 12 +- lustre/llite/file.c | 13 -- lustre/llite/namei.c | 2 +- lustre/mdc/mdc_reint.c | 7 +- lustre/mdc/mdc_request.c | 45 +++---- lustre/mds/handler.c | 56 +++----- lustre/obdclass/genops.c | 132 ++++++++---------- lustre/obdecho/echo.c | 28 ++-- lustre/obdfilter/filter.c | 277 ++++++++++++++++---------------------- lustre/osc/osc_request.c | 183 ++++++++++++------------- lustre/ost/ost_handler.c | 164 ++++++++++++---------- lustre/ptlrpc/client.c | 37 +++++ lustre/ptlrpc/events.c | 25 ++-- lustre/ptlrpc/niobuf.c | 162 ++++++++++++---------- lustre/tests/llmount.sh | 4 +- 23 files changed, 624 insertions(+), 631 deletions(-) diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 3e6478a..a6783e9 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -23,16 +23,16 @@ #ifndef __LUSTRE_IDL_H__ #define __LUSTRE_IDL_H__ -#ifdef __KERNEL__ -#include -#include -#include +#ifdef __KERNEL__ +# include +# include +# include #else -#define __KERNEL__ -#include -#undef __KERNEL__ -#include +# define __KERNEL__ +# include +# undef __KERNEL__ +# include #endif /* * this file contains all data structures used in Lustre interfaces: @@ -61,6 +61,21 @@ struct lustre_msg { __u32 buflens[0]; }; +struct niobuf_remote { + __u64 offset; + __u32 len; + __u32 xid; + __u32 flags; +}; + +struct niobuf_local { + __u64 addr; + __u64 offset; + __u32 len; + __u32 xid; + void *page; +}; + /* * OST requests: OBDO & OBD request records */ @@ -186,15 +201,6 @@ struct ll_fid { __u32 f_type; }; -struct niobuf { - __u64 addr; - __u64 offset; - __u32 len; - __u32 flags; - __u32 xid; - void *page; -}; - struct mds_body { struct ll_fid fid1; struct ll_fid fid2; diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index bd5f574..cdd4683 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -176,24 +176,33 @@ struct ptlrpc_request { struct ptlrpc_client *rq_client; }; -struct ptlrpc_bulk_desc { - int b_flags; - struct ptlrpc_connection *b_connection; - __u32 b_portal; +struct ptlrpc_bulk_page { + struct ptlrpc_bulk_desc *b_desc; + struct list_head b_link; char *b_buf; int b_buflen; - int (*b_cb)(struct ptlrpc_bulk_desc *, void *); struct page *b_page; - struct obd_conn b_conn; __u32 b_xid; - - wait_queue_head_t b_waitq; + int (*b_cb)(struct ptlrpc_bulk_page *); ptl_md_t b_md; ptl_handle_md_t b_md_h; ptl_handle_me_t b_me_h; }; +struct ptlrpc_bulk_desc { + int b_flags; + struct ptlrpc_connection *b_connection; + __u32 b_portal; + int (*b_cb)(struct ptlrpc_bulk_desc *); + struct obd_conn b_conn; + + wait_queue_head_t b_waitq; + struct list_head b_page_list; + __u32 b_page_count; + __u32 b_finished_count; +}; + struct ptlrpc_thread { struct list_head t_link; @@ -245,8 +254,7 @@ void ptlrpc_init_connection(void); void ptlrpc_cleanup_connection(void); /* rpc/niobuf.c */ -int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *); -int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *, int portal); +int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *); int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *); int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk); int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req); @@ -272,10 +280,12 @@ void ptlrpc_restart_req(struct ptlrpc_request *req); struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, struct ptlrpc_connection *u, int opcode, int count, int *lengths, char **bufs); -void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk); void ptlrpc_free_req(struct ptlrpc_request *request); void ptlrpc_req_finished(struct ptlrpc_request *request); struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *); +void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk); +struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc); +void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page); int ptlrpc_check_status(struct ptlrpc_request *req, int err); /* rpc/service.c */ diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index c98e785..8640d95 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -156,6 +156,11 @@ struct osc_obd { struct ptlrpc_connection *osc_conn; }; +struct lov_obd { + int lov_count; + struct obd_conn *lov_targets; +}; + /* corresponds to one of the obd's */ #define MAX_MULTI 16 struct obd_device { @@ -186,6 +191,7 @@ struct obd_device { struct echo_obd echo; struct recovd_obd recovd; struct trace_obd trace; + struct lov_obd lov; #if 0 struct raid1_obd raid1; struct snap_obd snap; @@ -236,11 +242,11 @@ struct obd_ops { obd_id *startid, obd_gr group, void *data); int (*o_preprw)(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf *nb, - struct niobuf *res); + int niocount, struct niobuf_remote *remote, + struct niobuf_local *local); int (*o_commitrw)(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf *res); + int niocount, struct niobuf_local *local); int (*o_enqueue)(struct obd_conn *conn, struct ldlm_namespace *ns, struct ldlm_handle *parent_lock, __u64 *res_id, __u32 type, struct ldlm_extent *, __u32 mode, diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index a428858..8c6b65d 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -308,28 +308,28 @@ static inline int obd_brw(int rw, struct obd_conn *conn, obd_count num_oa, static inline int obd_preprw(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf *nb, - struct niobuf *res) + int niocount, struct niobuf_remote *remote, + struct niobuf_local *local) { int rc; OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn, preprw); - rc = OBP(conn->oc_dev, preprw)(cmd, conn, objcount, obj, niocount, nb, - res); + rc = OBP(conn->oc_dev, preprw)(cmd, conn, objcount, obj, niocount, + remote, local); RETURN(rc); } static inline int obd_commitrw(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf *res) + int niocount, struct niobuf_local *local) { int rc; OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn, commitrw); rc = OBP(conn->oc_dev, commitrw)(cmd, conn, objcount, obj, niocount, - res); + local); RETURN(rc); } diff --git a/lustre/include/linux/obd_ext2.h b/lustre/include/linux/obd_ext2.h index 84e015e..73b4b0b 100644 --- a/lustre/include/linux/obd_ext2.h +++ b/lustre/include/linux/obd_ext2.h @@ -20,7 +20,7 @@ extern struct file_operations *obd_fso; /* ext2_obd.c */ extern struct obd_ops ext2_obd_ops; - +#include #include /* super.c */ diff --git a/lustre/include/linux/obd_ost.h b/lustre/include/linux/obd_ost.h index e1c8f92..8ad6e6a 100644 --- a/lustre/include/linux/obd_ost.h +++ b/lustre/include/linux/obd_ost.h @@ -33,9 +33,9 @@ #define LUSTRE_OSC_NAME "osc" /* ost/ost_pack.c */ -void ost_pack_niobuf(void **tmp, void *addr, __u64 offset, __u32 len, - __u32 flags, __u32 xid); -void ost_unpack_niobuf(void **tmp, struct niobuf **nbp); +void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags, + __u32 xid); +void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp); void ost_pack_ioo(void **tmp, struct obdo *oa, int bufcnt); void ost_unpack_ioo(void **tmp, struct obd_ioobj **ioop); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 05bd152..512836d 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -276,7 +276,8 @@ int ldlm_local_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, return rc; } -/* Must be called without the resource lock held. */ +/* Must be called without the resource lock held. Returns a referenced, + * unlocked ldlm_lock. */ ldlm_error_t ldlm_local_lock_create(struct ldlm_namespace *ns, struct ldlm_handle *parent_lock_handle, __u64 *res_id, __u32 type, diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index eaee614..ec32a68 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -42,7 +42,6 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, lock = ldlm_handle2object(lockh); - spin_unlock(&lock->l_lock); req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 2, size, bufs); if (!req) GOTO(out, rc = -ENOMEM); diff --git a/lustre/lib/obd_pack.c b/lustre/lib/obd_pack.c index 5b8ad4e..b4f0254 100644 --- a/lustre/lib/obd_pack.c +++ b/lustre/lib/obd_pack.c @@ -52,13 +52,12 @@ void ost_unpack_ioo(void **tmp, struct obd_ioobj **ioop) *tmp = c + sizeof(*ioo); } -void ost_pack_niobuf(void **tmp, void *addr, __u64 offset, __u32 len, - __u32 flags, __u32 xid) +void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags, + __u32 xid) { - struct niobuf *nb = *tmp; + struct niobuf_remote *nb = *tmp; char *c = *tmp; - nb->addr = HTON__u64((__u64)(unsigned long)addr); nb->offset = HTON__u64(offset); nb->len = HTON__u32(len); nb->flags = HTON__u32(flags); @@ -66,14 +65,13 @@ void ost_pack_niobuf(void **tmp, void *addr, __u64 offset, __u32 len, *tmp = c + sizeof(*nb); } -void ost_unpack_niobuf(void **tmp, struct niobuf **nbp) +void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp) { char *c = *tmp; - struct niobuf *nb = *tmp; + struct niobuf_remote *nb = *tmp; *nbp = *tmp; - nb->addr = NTOH__u64(nb->addr); nb->offset = NTOH__u64(nb->offset); nb->len = NTOH__u32(nb->len); nb->flags = NTOH__u32(nb->flags); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 8026454..16d29e3 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -166,18 +166,15 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; -#if 0 struct ll_sb_info *sbi = ll_i2sbi(inode); struct ldlm_extent extent; struct ldlm_handle lockh; __u64 res_id[RES_NAME_SIZE] = {inode->i_ino}; int flags = 0; ldlm_error_t err; -#endif ssize_t retval; ENTRY; -#if 0 extent.start = *ppos; extent.end = *ppos + count; CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n", @@ -189,7 +186,6 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, if (err != ELDLM_OK) CERROR("lock enqueue: err: %d\n", err); ldlm_lock_dump((void *)(unsigned long)lockh.addr); -#endif CDEBUG(D_INFO, "Reading inode %ld, %d bytes, offset %Ld\n", inode->i_ino, count, *ppos); @@ -201,11 +197,9 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, ll_setattr(filp->f_dentry, &attr); } -#if 0 err = obd_cancel(&sbi->ll_conn, LCK_PR, &lockh); if (err != ELDLM_OK) CERROR("lock cancel: err: %d\n", err); -#endif RETURN(retval); } @@ -217,18 +211,15 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) { struct inode *inode = file->f_dentry->d_inode; -#if 0 struct ll_sb_info *sbi = ll_i2sbi(inode); struct ldlm_extent extent; struct ldlm_handle lockh; __u64 res_id[RES_NAME_SIZE] = {inode->i_ino}; int flags = 0; ldlm_error_t err; -#endif ssize_t retval; ENTRY; -#if 0 extent.start = *ppos; extent.end = *ppos + count; CDEBUG(D_INFO, "Locking inode %ld, start %Lu end %Lu\n", @@ -240,19 +231,15 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) if (err != ELDLM_OK) CERROR("lock enqueue: err: %d\n", err); ldlm_lock_dump((void *)(unsigned long)lockh.addr); -#endif CDEBUG(D_INFO, "Writing inode %ld, %ld bytes, offset %Ld\n", inode->i_ino, (long)count, *ppos); retval = generic_file_write(file, buf, count, ppos); - -#if 0 err = obd_cancel(&sbi->ll_conn, LCK_PW, &lockh); if (err != ELDLM_OK) CERROR("lock cancel: err: %d\n", err); -#endif RETURN(retval); } diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 27650f8..e6e1307 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -180,7 +180,7 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, out: ptlrpc_free_req(request); return inode; -} /* ll_new_inode */ +} int ll_mdc_unlink(struct inode *dir, struct inode *child, const char *name, int len) diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index 842f54a..59b893a 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -104,10 +104,11 @@ int mdc_create(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, level = LUSTRE_CONN_FULL; resend: rc = mdc_reint(cl, req, level); - if (rc == -ERESTARTSYS ) { - struct mds_update_record_hdr *hdr = lustre_msg_buf(req->rq_reqmsg, 0); + if (rc == -ERESTARTSYS) { + struct mds_update_record_hdr *hdr = + lustre_msg_buf(req->rq_reqmsg, 0); level = LUSTRE_CONN_RECOVD; - CERROR("Lost reply: re-create rep.\n"); + CERROR("Lost reply: re-create rep.\n"); req->rq_flags = 0; hdr->ur_opcode = NTOH__u32(REINT_RECREATE); goto resend; diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 8c9eb8c..0e906a4 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -181,36 +181,32 @@ int mdc_readpage(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, struct ptlrpc_request **request) { struct ptlrpc_request *req = NULL; - struct ptlrpc_bulk_desc *bulk = NULL; - struct niobuf niobuf; + struct ptlrpc_bulk_desc *desc = NULL; + struct ptlrpc_bulk_page *bulk = NULL; struct mds_body *body; - int rc, size[2] = {sizeof(*body), sizeof(struct niobuf)}; - char *bufs[2] = {NULL, (char *)&niobuf}; - - niobuf.addr = (__u64) (long) addr; + int rc, size = sizeof(*body); + ENTRY; CDEBUG(D_INODE, "inode: %ld\n", (long)ino); - bulk = ptlrpc_prep_bulk(conn); - if (bulk == NULL) { - CERROR("%s: cannot init bulk desc\n", __FUNCTION__); - rc = -ENOMEM; - goto out; - } + desc = ptlrpc_prep_bulk(conn); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); - req = ptlrpc_prep_req(cl, conn, MDS_READPAGE, 2, size, bufs); + req = ptlrpc_prep_req(cl, conn, MDS_READPAGE, 1, &size, NULL); if (!req) - GOTO(out, rc = -ENOMEM); + GOTO(out2, rc = -ENOMEM); + bulk = ptlrpc_prep_bulk_page(desc); bulk->b_buflen = PAGE_SIZE; - bulk->b_buf = (void *)(long)niobuf.addr; - bulk->b_portal = MDS_BULK_PORTAL; + bulk->b_buf = addr; bulk->b_xid = req->rq_reqmsg->xid; + desc->b_portal = MDS_BULK_PORTAL; - rc = ptlrpc_register_bulk(bulk); + rc = ptlrpc_register_bulk(desc); if (rc) { CERROR("couldn't setup bulk sink: error %d.\n", rc); - GOTO(out, rc); + GOTO(out2, rc); } body = lustre_msg_buf(req->rq_reqmsg, 0); @@ -218,21 +214,20 @@ int mdc_readpage(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, body->fid1.f_type = type; body->size = offset; - req->rq_replen = lustre_msg_size(1, size); + req->rq_replen = lustre_msg_size(1, &size); req->rq_level = LUSTRE_CONN_FULL; rc = ptlrpc_queue_wait(req); if (rc) { CERROR("error in handling %d\n", rc); - ptlrpc_abort_bulk(bulk); - GOTO(out, rc); - } + ptlrpc_abort_bulk(desc); + } else + mds_unpack_rep_body(req); - mds_unpack_rep_body(req); EXIT; - + out2: + ptlrpc_free_bulk(desc); out: *request = req; - ptlrpc_free_bulk(bulk); return rc; } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index d4eef8a..e22bbee 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -22,53 +22,45 @@ #include #include -int mds_sendpage(struct ptlrpc_request *req, struct file *file, - __u64 offset, struct niobuf *dst) +int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset) { int rc = 0; mm_segment_t oldfs = get_fs(); - struct ptlrpc_bulk_desc *bulk; + struct ptlrpc_bulk_desc *desc; + struct ptlrpc_bulk_page *bulk; char *buf; + ENTRY; - bulk = ptlrpc_prep_bulk(req->rq_connection); - if (bulk == NULL) { - rc = -ENOMEM; - GOTO(out, rc); - } + desc = ptlrpc_prep_bulk(req->rq_connection); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); - bulk->b_xid = req->rq_reqmsg->xid; + bulk = ptlrpc_prep_bulk_page(desc); + if (bulk == NULL) + GOTO(cleanup_bulk, rc = -ENOMEM); OBD_ALLOC(buf, PAGE_SIZE); - if (!buf) { - rc = -ENOMEM; - GOTO(cleanup_bulk, rc); - } + if (buf == NULL) + GOTO(cleanup_bulk, rc = -ENOMEM); set_fs(KERNEL_DS); rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE, (loff_t *)&offset); set_fs(oldfs); - if (rc != PAGE_SIZE) { - rc = -EIO; - GOTO(cleanup_buf, rc); - } + if (rc != PAGE_SIZE) + GOTO(cleanup_buf, rc = -EIO); + bulk->b_xid = req->rq_reqmsg->xid; bulk->b_buf = buf; bulk->b_buflen = PAGE_SIZE; + desc->b_portal = MDS_BULK_PORTAL; - rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL); + rc = ptlrpc_send_bulk(desc); if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { CERROR("obd_fail_loc=%x, fail operation rc=%d\n", OBD_FAIL_MDS_SENDPAGE, rc); - PtlMDUnlink(bulk->b_md_h); - GOTO(cleanup_buf, rc); - } - wait_event_interruptible(bulk->b_waitq, - ptlrpc_check_bulk_sent(bulk)); - - if (bulk->b_flags & PTL_RPC_FL_INTR) { - rc = -EINTR; + ptlrpc_abort_bulk(desc); GOTO(cleanup_buf, rc); } @@ -76,7 +68,7 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, cleanup_buf: OBD_FREE(buf, PAGE_SIZE); cleanup_bulk: - ptlrpc_free_bulk(bulk); + ptlrpc_free_bulk(desc); out: return rc; } @@ -462,7 +454,6 @@ int mds_readpage(struct ptlrpc_request *req) struct vfsmount *mnt; struct dentry *de; struct file *file; - struct niobuf *niobuf; struct mds_body *body; int rc, size = sizeof(*body); ENTRY; @@ -490,17 +481,10 @@ int mds_readpage(struct ptlrpc_request *req) RETURN(0); } - niobuf = lustre_msg_buf(req->rq_reqmsg, 1); - if (!niobuf) { - req->rq_status = -EINVAL; - LBUG(); - RETURN(0); - } - /* to make this asynchronous make sure that the handling function doesn't send a reply when this function completes. Instead a callback function would send the reply */ - rc = mds_sendpage(req, file, body->size, niobuf); + rc = mds_sendpage(req, file, body->size); filp_close(file, 0); req->rq_status = rc; diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 51336ff..8ef7600 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -10,15 +10,8 @@ * */ -#include -#include -#include -#include -#include - #define DEBUG_SUBSYSTEM S_CLASS -#include #include extern struct obd_device obd_dev[MAX_OBD_DEVICES]; @@ -33,17 +26,14 @@ int obd_init_obdo_cache(void) sizeof(struct obdo), 0, SLAB_HWCACHE_ALIGN, NULL, NULL); - if (obdo_cachep == NULL) { - EXIT; - return -ENOMEM; - } else { + if (obdo_cachep == NULL) + RETURN(-ENOMEM); + else CDEBUG(D_CACHE, "allocated cache at %p\n", obdo_cachep); - } } else { CDEBUG(D_CACHE, "using existing cache at %p\n", obdo_cachep); } - EXIT; - return 0; + RETURN(0); } void obd_cleanup_obdo_cache(void) @@ -75,16 +65,16 @@ struct obd_client *gen_client(const struct obd_conn *conn) lh = next = &obddev->obd_gen_clients; while ((lh = lh->next) != &obddev->obd_gen_clients) { cli = list_entry(lh, struct obd_client, cli_chain); - + if (cli->cli_id == conn->oc_id) return cli; } return NULL; -} /* obd_client */ +} /* gen_client */ -/* a connection defines a context in which preallocation can be managed. */ +/* a connection defines a context in which preallocation can be managed. */ int gen_connect (struct obd_conn *conn) { struct obd_client * cli; @@ -105,7 +95,7 @@ int gen_connect (struct obd_conn *conn) CDEBUG(D_INFO, "connect: new ID %u\n", cli->cli_id); conn->oc_id = cli->cli_id; return 0; -} /* gen_obd_connect */ +} /* gen_connect */ int gen_disconnect(struct obd_conn *conn) @@ -116,7 +106,7 @@ int gen_disconnect(struct obd_conn *conn) if (!(cli = gen_client(conn))) { CDEBUG(D_IOCTL, "disconnect: attempting to free " "nonexistent client %u\n", conn->oc_id); - return -EINVAL; + RETURN(-EINVAL); } @@ -125,58 +115,49 @@ int gen_disconnect(struct obd_conn *conn) CDEBUG(D_INFO, "disconnect: ID %u\n", conn->oc_id); - EXIT; - return 0; + RETURN(0); } /* gen_obd_disconnect */ - -/* - * raid1 defines a number of connections to child devices, - * used to make calls to these devices. - * data holds nothing - */ +/* FIXME: Data is a space- or comma-separated list of device IDs. This will + * have to change. */ int gen_multi_setup(struct obd_device *obddev, uint32_t len, void *data) { - int i; + int count, rc; + char *p; + ENTRY; - for (i = 0 ; i < obddev->obd_multi_count ; i++ ) { - int rc; - struct obd_conn *ch_conn = &obddev->obd_multi_conn[i]; - rc = OBP(ch_conn->oc_dev, connect)(ch_conn); + for (p = data, count = 0; p < (char *)data + len; count++) { + char *end; + int tmp = simple_strtoul(p, &end, 0); - if ( rc != 0 ) { - int j; + if (p == end) { + CERROR("invalid device ID starting at: %s\n", p); + GOTO(err_disconnect, rc = -EINVAL); + } - for (j = --i; j >= 0; --j) { - ch_conn = &obddev->obd_multi_conn[i]; - OBP(ch_conn->oc_dev, disconnect)(ch_conn); - } - return -EINVAL; + obddev->obd_multi_conn[count].oc_dev = &obd_dev[tmp]; + rc = obd_connect(&obddev->obd_multi_conn[count]); + if (rc) { + CERROR("cannot connect to device %d: rc = %d\n", tmp, + rc); + GOTO(err_disconnect, rc); } - } - return 0; -} + CDEBUG(D_INFO, "target OBD %d is of type %s\n", count, + obd_dev[tmp].obd_type->typ_name); -#if 0 -int gen_multi_attach(struct obd_device *obddev, int len, void *data) -{ - int i; - int count; - struct obd_device *rdev = obddev->obd_multi_dev[0]; + p = end + 1; + } - count = len/sizeof(int); obddev->obd_multi_count = count; - for (i=0 ; iobd_type->typ_name); - } - return 0; -} -#endif + RETURN(0); + + err_disconnect: + for (count--; count >= 0; count--) + obd_disconnect(&obddev->obd_multi_conn[count]); + return rc; +} /* * remove all connections to this device @@ -187,18 +168,14 @@ int gen_multi_cleanup(struct obd_device *obddev) { int i; - for (i = 0 ; i < obddev->obd_multi_count ; i++ ) { - struct obd_conn *ch_conn = &obddev->obd_multi_conn[i]; - int rc; - rc = OBP(ch_conn->oc_dev, disconnect)(ch_conn); - - if ( rc != 0 ) { + for (i = 0; i < obddev->obd_multi_count; i++) { + int rc = obd_disconnect(&obddev->obd_multi_conn[i]); + if (rc) CERROR("disconnect failure %d\n", - ch_conn->oc_dev->obd_minor); - } - } + obddev->obd_multi_conn[i].oc_dev->obd_minor); + } return 0; -} /* gen_multi_cleanup_device */ +} /* @@ -254,17 +231,15 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst, int err = 0; ENTRY; - CDEBUG(D_INFO, "src: ino %Ld blocks %Ld, size %Ld, dst: ino %Ld\n", + CDEBUG(D_INFO, "src: ino %Ld blocks %Ld, size %Ld, dst: ino %Ld\n", (unsigned long long)src->o_id, (unsigned long long)src->o_blocks, (unsigned long long)src->o_size, (unsigned long long)dst->o_id); page = alloc_page(GFP_USER); - if ( !page ) { - EXIT; - return -ENOMEM; - } - + if (page == NULL) + RETURN(-ENOMEM); + lck_page(page); - + /* XXX with brw vector I/O, we could batch up reads and writes here, * all we need to do is allocate multiple pages to handle the I/Os * and arrays to handle the request parameters. @@ -276,7 +251,7 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst, obd_off brw_offset = (page->index) << PAGE_SHIFT; obd_flag flagr = 0; obd_flag flagw = OBD_BRW_CREATE; - + page->index = index; err = OBP(src_conn->oc_dev, brw)(READ, src_conn, num_oa, &src, &num_buf, &page, &brw_count, @@ -299,7 +274,7 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst, } CDEBUG(D_INFO, "Wrote page %ld ...\n", page->index); - + index++; } dst->o_size = src->o_size; @@ -308,6 +283,5 @@ int gen_copy_data(struct obd_conn *dst_conn, struct obdo *dst, UnlockPage(page); __free_page(page); - EXIT; - return err; + RETURN(err); } diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index 1b3dc02..52e5a18 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -66,10 +66,10 @@ static int echo_getattr(struct obd_conn *conn, struct obdo *oa) } int echo_preprw(int cmd, struct obd_conn *conn, int objcount, - struct obd_ioobj *obj, int niocount, struct niobuf *nb, - struct niobuf *res) + struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb, + struct niobuf_local *res) { - struct niobuf *r = res; + struct niobuf_local *r = res; int rc = 0; int i; @@ -91,9 +91,7 @@ int echo_preprw(int cmd, struct obd_conn *conn, int objcount, CERROR("can't get new page %d/%d for id %Ld\n", j, obj->ioo_bufcnt, (unsigned long long)obj->ioo_id); - rc = -ENOMEM; - EXIT; - goto preprw_cleanup; + GOTO(preprw_cleanup, rc = -ENOMEM); } echo_pages++; @@ -117,8 +115,7 @@ int echo_preprw(int cmd, struct obd_conn *conn, int objcount, } CDEBUG(D_PAGE, "%ld pages allocated after prep\n", echo_pages); - EXIT; - return 0; + RETURN(0); preprw_cleanup: /* It is possible that we would rather handle errors by allow @@ -139,12 +136,11 @@ preprw_cleanup: } int echo_commitrw(int cmd, struct obd_conn *conn, int objcount, - struct obd_ioobj *obj, int niocount, struct niobuf *res) + struct obd_ioobj *obj, int niocount, struct niobuf_local *res) { - struct niobuf *r = res; + struct niobuf_local *r = res; int rc = 0; int i; - ENTRY; CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n", @@ -152,8 +148,7 @@ int echo_commitrw(int cmd, struct obd_conn *conn, int objcount, if (niocount && !r) { CERROR("NULL res niobuf with niocount %d\n", niocount); - EXIT; - return -EINVAL; + RETURN(-EINVAL); } for (i = 0; i < objcount; i++, obj++) { @@ -167,9 +162,7 @@ int echo_commitrw(int cmd, struct obd_conn *conn, int objcount, CERROR("bad page %p, id %Ld (%d), buf %d/%d\n", page, (unsigned long long)obj->ioo_id, i, j, obj->ioo_bufcnt); - rc = -EFAULT; - EXIT; - goto commitrw_cleanup; + GOTO(commitrw_cleanup, rc = -EFAULT); } free_pages(addr, 0); @@ -177,8 +170,7 @@ int echo_commitrw(int cmd, struct obd_conn *conn, int objcount, } } CDEBUG(D_PAGE, "%ld pages remain after commit\n", echo_pages); - EXIT; - return 0; + RETURN(0); commitrw_cleanup: CERROR("cleaning up %ld pages (%d obdos)\n", diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 063466b..24ce273 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -12,19 +12,9 @@ */ #define EXPORT_SYMTAB - -#include -#include -#include -#include -#include -#include -#include -#include - #define DEBUG_SUBSYSTEM S_FILTER -#include +#include #include #include @@ -36,11 +26,11 @@ long filter_memory; #define S_SHIFT 12 static char * obd_type_by_mode[S_IFMT >> S_SHIFT] = { [0] "", - [S_IFREG >> S_SHIFT] "R", + [S_IFREG >> S_SHIFT] "R", [S_IFDIR >> S_SHIFT] "D", [S_IFCHR >> S_SHIFT] "C", - [S_IFBLK >> S_SHIFT] "B", - [S_IFIFO >> S_SHIFT] "F", + [S_IFBLK >> S_SHIFT] "B", + [S_IFIFO >> S_SHIFT] "F", [S_IFSOCK >> S_SHIFT] "S", [S_IFLNK >> S_SHIFT] "L" }; @@ -85,7 +75,7 @@ static int filter_prep(struct obd_device *obddev) filter_id(rootid, FILTER_ROOTINO, S_IFDIR); file = filp_open(rootid, O_RDWR | O_CREAT, 00755); if (IS_ERR(file)) { - CERROR("OBD filter: cannot make root directory"); + CERROR("OBD filter: cannot make root directory"); GOTO(out, rc = PTR_ERR(file)); } filp_close(file, 0); @@ -121,7 +111,7 @@ static int filter_prep(struct obd_device *obddev) } } obddev->u.filter.fo_lastino = lastino; - filp_close(file, 0); + filp_close(file, 0); rc = 0; out: @@ -136,22 +126,22 @@ static void filter_post(struct obd_device *obddev) struct obd_run_ctxt saved; long rc; struct file *file; - loff_t off = 0; + loff_t off = 0; push_ctxt(&saved, &obddev->u.filter.fo_ctxt); file = filp_open("D/status", O_RDWR | O_CREAT, 0700); - if ( !file || IS_ERR(file)) { + if ( !file || IS_ERR(file)) { CERROR("OBD filter: cannot create status file\n"); goto out; } - rc = file->f_op->write(file, (char *)&obddev->u.filter.fo_lastino, + rc = file->f_op->write(file, (char *)&obddev->u.filter.fo_lastino, sizeof(obddev->u.filter.fo_lastino), &off); - if (rc != sizeof(obddev->u.filter.fo_lastino) ) { + if (rc != sizeof(obddev->u.filter.fo_lastino) ) { CERROR("OBD filter: error writing lastino\n"); } - rc = filp_close(file, NULL); - if (rc) { + rc = filp_close(file, NULL); + if (rc) { CERROR("OBD filter: cannot close status file\n"); } out: @@ -159,7 +149,7 @@ static void filter_post(struct obd_device *obddev) } -static __u64 filter_next_id(struct obd_device *obddev) +static __u64 filter_next_id(struct obd_device *obddev) { __u64 id; spin_lock(&obddev->u.filter.fo_lock); @@ -170,7 +160,7 @@ static __u64 filter_next_id(struct obd_device *obddev) } /* how to get files, dentries, inodes from object id's */ -static struct file *filter_obj_open(struct obd_device *obddev, +static struct file *filter_obj_open(struct obd_device *obddev, __u64 id, __u32 type) { struct obd_run_ctxt saved; @@ -191,12 +181,12 @@ static struct file *filter_obj_open(struct obd_device *obddev, RETURN(NULL); } - if ( ! (type & S_IFMT) ) { - CERROR("OBD filter_obj_open, no type (%Ld), mode %o!\n", + if ( ! (type & S_IFMT) ) { + CERROR("OBD filter_obj_open, no type (%Ld), mode %o!\n", (unsigned long long)id, type); } - filter_id(name, id, type); + filter_id(name, id, type); push_ctxt(&saved, &obddev->u.filter.fo_ctxt); file = filp_open(name, O_RDONLY | O_LARGEFILE, 0); pop_ctxt(&saved); @@ -213,25 +203,25 @@ static struct file *filter_parent(obd_id id, obd_mode mode) sprintf(path, "O/%s", obd_type_by_mode[(mode & S_IFMT) >> S_SHIFT]); - file = filp_open(path, O_RDONLY, 0); + file = filp_open(path, O_RDONLY, 0); return file; } -static struct inode *filter_inode_from_obj(struct obd_device *obddev, +static struct inode *filter_inode_from_obj(struct obd_device *obddev, __u64 id, __u32 type) { struct file *file; - struct inode *inode; + struct inode *inode; file = filter_obj_open(obddev, id, type); - if ( !file ) { - CERROR("filter_inode_from_obdo failed\n"); + if ( !file ) { + CERROR("filter_inode_from_obdo failed\n"); return NULL; } - inode = iget(file->f_dentry->d_inode->i_sb, - file->f_dentry->d_inode->i_ino); + inode = iget(file->f_dentry->d_inode->i_sb, + file->f_dentry->d_inode->i_ino); filp_close(file, 0); return inode; } @@ -318,9 +308,6 @@ static int filter_cleanup(struct obd_device * obddev) ENTRY; - if ( !(obddev->obd_flags & OBD_SET_UP) ) - RETURN(0); - if ( !list_empty(&obddev->obd_gen_clients) ) { CERROR("still has clients!\n"); RETURN(-EBUSY); @@ -333,7 +320,7 @@ static int filter_cleanup(struct obd_device * obddev) filter_post(obddev); unlock_kernel(); - mntput(obddev->u.filter.fo_vfsmnt); + mntput(obddev->u.filter.fo_vfsmnt); obddev->u.filter.fo_sb = 0; kfree(obddev->u.filter.fo_fstype); @@ -388,27 +375,23 @@ static inline void filter_from_inode(struct obdo *oa, struct inode *inode) static int filter_getattr(struct obd_conn *conn, struct obdo *oa) { struct inode *inode; - ENTRY; + if ( !gen_client(conn) ) { CDEBUG(D_IOCTL, "fatal: invalid client %u\n", conn->oc_id); - EXIT; - return -EINVAL; + RETURN(-EINVAL); } - if ( !(inode = filter_inode_from_obj(conn->oc_dev, - oa->o_id, oa->o_mode)) ) { - EXIT; - return -ENOENT; - } + inode = filter_inode_from_obj(conn->oc_dev, oa->o_id, oa->o_mode); + if (inode == NULL) + RETURN(-ENOENT); oa->o_valid &= ~OBD_MD_FLID; filter_from_inode(oa, inode); - + iput(inode); - EXIT; - return 0; -} + RETURN(0); +} static int filter_setattr(struct obd_conn *conn, struct obdo *oa) { @@ -416,31 +399,28 @@ static int filter_setattr(struct obd_conn *conn, struct obdo *oa) struct iattr iattr; int rc; struct dentry de; + ENTRY; if (!gen_client(conn)) { CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id); - return -EINVAL; + RETURN(-EINVAL); } - inode = filter_inode_from_obj(conn->oc_dev, oa->o_id, oa->o_mode); - if ( !inode ) { - EXIT; - return -ENOENT; - } + inode = filter_inode_from_obj(conn->oc_dev, oa->o_id, oa->o_mode); + if ( !inode ) + RETURN(-ENOENT); iattr_from_obdo(&iattr, oa); iattr.ia_mode &= ~S_IFMT; iattr.ia_mode |= S_IFREG; de.d_inode = inode; - if ( inode->i_op->setattr ) { + if ( inode->i_op->setattr ) rc = inode->i_op->setattr(&de, &iattr); - } else { + else rc = inode_setattr(inode, &iattr); - } iput(inode); - EXIT; - return rc; + RETURN(rc); } static int filter_open(struct obd_conn *conn, struct obdo *oa) @@ -491,7 +471,7 @@ static int filter_create (struct obd_conn* conn, struct obdo *oa) } oa->o_id = filter_next_id(conn->oc_dev); - if ( !(oa->o_mode && S_IFMT) ) { + if ( !(oa->o_mode && S_IFMT) ) { CERROR("filter obd: no type!\n"); return -ENOENT; } @@ -500,15 +480,15 @@ static int filter_create (struct obd_conn* conn, struct obdo *oa) push_ctxt(&saved, &obddev->u.filter.fo_ctxt); mode = oa->o_mode; mode &= ~S_IFMT; - mode |= S_IFREG; + mode |= S_IFREG; file = filp_open(name, O_RDONLY | O_CREAT, mode); pop_ctxt(&saved); - if (IS_ERR(file)) { + if (IS_ERR(file)) { CERROR("Error mknod obj %s, err %ld\n", name, PTR_ERR(file)); return -ENOENT; } filp_close(file, 0); - + /* Set flags for fields we have set in ext2_new_inode */ oa->o_valid |= OBD_MD_FLID | OBD_MD_FLBLKSZ | OBD_MD_FLBLOCKS | OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME | @@ -525,53 +505,48 @@ static int filter_destroy(struct obd_conn *conn, struct obdo *oa) struct file *object; int rc; struct obd_run_ctxt saved; + ENTRY; if (!(cli = gen_client(conn))) { CERROR("invalid client %u\n", conn->oc_id); - EXIT; - return -EINVAL; + RETURN(-EINVAL); } obddev = conn->oc_dev; object = filter_obj_open(obddev, oa->o_id, oa->o_mode); - if (!object || IS_ERR(object)) { - EXIT; - return -ENOENT; - } - + if (!object || IS_ERR(object)) + RETURN(-ENOENT); + inode = object->f_dentry->d_inode; inode->i_nlink = 1; inode->i_mode = 010000; push_ctxt(&saved, &obddev->u.filter.fo_ctxt); dir = filter_parent(oa->o_id, oa->o_mode); - if (IS_ERR(dir)) { - rc = PTR_ERR(dir); - EXIT; - goto out; - } - dget(dir->f_dentry); + if (IS_ERR(dir)) + GOTO(out, rc = PTR_ERR(dir)); + dget(dir->f_dentry); dget(object->f_dentry); rc = vfs_unlink(dir->f_dentry->d_inode, object->f_dentry); filp_close(dir, 0); filp_close(object, 0); + EXIT; out: pop_ctxt(&saved); - EXIT; return rc; } -static int filter_truncate(struct obd_conn *conn, struct obdo *oa, obd_size count, - obd_off offset) +static int filter_truncate(struct obd_conn *conn, struct obdo *oa, + obd_size count, obd_off offset) { int error; + ENTRY; error = filter_setattr(conn, oa); oa->o_valid = OBD_MD_FLBLOCKS | OBD_MD_FLCTIME | OBD_MD_FLMTIME; - EXIT; - return error; + RETURN(error); } /* buffer must lie in user memory here */ @@ -581,18 +556,16 @@ static int filter_read(struct obd_conn *conn, struct obdo *oa, char *buf, struct file * file; unsigned long retval; int err; + ENTRY; if (!gen_client(conn)) { CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id); - EXIT; - return -EINVAL; + RETURN(-EINVAL); } - file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode); - if (!file || IS_ERR(file)) { - EXIT; - return -PTR_ERR(file); - } + file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode); + if (!file || IS_ERR(file)) + RETURN(-PTR_ERR(file)); /* count doubles as retval */ retval = file->f_op->read(file, buf, *count, (loff_t *)&offset); @@ -611,25 +584,22 @@ static int filter_read(struct obd_conn *conn, struct obdo *oa, char *buf, /* buffer must lie in user memory here */ -static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf, +static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf, obd_size *count, obd_off offset) { int err; struct file * file; unsigned long retval; - ENTRY; + if (!gen_client(conn)) { CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id); - EXIT; - return -EINVAL; + RETURN(-EINVAL); } - file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode); - if (!file || IS_ERR(file)) { - EXIT; - return -PTR_ERR(file); - } + file = filter_obj_open(conn->oc_dev, oa->o_id, oa->o_mode); + if (!file || IS_ERR(file)) + RETURN(-PTR_ERR(file)); /* count doubles as retval */ retval = file->f_op->write(file, buf, *count, (loff_t *)&offset); @@ -648,13 +618,13 @@ static int filter_write(struct obd_conn *conn, struct obdo *oa, char *buf, return err; } /* filter_write */ -static int filter_pgcache_brw(int rw, struct obd_conn *conn, +static int filter_pgcache_brw(int rw, struct obd_conn *conn, obd_count num_oa, - struct obdo **oa, - obd_count *oa_bufs, + struct obdo **oa, + obd_count *oa_bufs, struct page **pages, - obd_size *count, - obd_off *offset, + obd_size *count, + obd_off *offset, obd_flag *flags) { struct super_block *sb; @@ -664,49 +634,44 @@ static int filter_pgcache_brw(int rw, struct obd_conn *conn, unsigned long retval; int error; struct file *file; - ENTRY; if (!gen_client(conn)) { CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id); - EXIT; - return -EINVAL; + RETURN(-EINVAL); } sb = conn->oc_dev->u.filter.fo_sb; oldfs = get_fs(); - set_fs(KERNEL_DS); + set_fs(KERNEL_DS); pnum = 0; /* pnum indexes buf 0..num_pages */ for (onum = 0; onum < num_oa; onum++) { - int pg; - - file = filter_obj_open(conn->oc_dev, oa[onum]->o_id, - oa[onum]->o_mode); - if (!file || IS_ERR(file)) { - EXIT; - error = -ENOENT; - goto ERROR; - } + int pg; + + file = filter_obj_open(conn->oc_dev, oa[onum]->o_id, + oa[onum]->o_mode); + if (!file || IS_ERR(file)) + GOTO(ERROR, error = -ENOENT); /* count doubles as retval */ for (pg = 0; pg < oa_bufs[onum]; pg++) { CDEBUG(D_INODE, "OP %d obdo no/pno: (%d,%d) (%ld,%ld) " - "off count (%Ld,%Ld)\n", + "off count (%Ld,%Ld)\n", rw, onum, pnum, file->f_dentry->d_inode->i_ino, (unsigned long)offset[pnum] >> PAGE_CACHE_SHIFT, (unsigned long long)offset[pnum], (unsigned long long)count[pnum]); - if (rw == WRITE) { - loff_t off; + if (rw == WRITE) { + loff_t off; char *buffer; - off = offset[pnum]; - buffer = kmap(pages[pnum]); + off = offset[pnum]; + buffer = kmap(pages[pnum]); retval = file->f_op->write(file, buffer, count[pnum], &off); kunmap(pages[pnum]); - CDEBUG(D_INODE, "retval %ld\n", retval); - } else { - loff_t off = offset[pnum]; + CDEBUG(D_INODE, "retval %ld\n", retval); + } else { + loff_t off = offset[pnum]; char *buffer = kmap(pages[pnum]); if (off >= file->f_dentry->d_inode->i_size) { @@ -714,24 +679,22 @@ static int filter_pgcache_brw(int rw, struct obd_conn *conn, retval = count[pnum]; } else { retval = file->f_op->read(file, buffer, count[pnum], &off); - } + } kunmap(pages[pnum]); if ( retval != count[pnum] ) { filp_close(file, 0); - retval = -EIO; - EXIT; - goto ERROR; + GOTO(ERROR, retval = -EIO); } - CDEBUG(D_INODE, "retval %ld\n", retval); + CDEBUG(D_INODE, "retval %ld\n", retval); } pnum++; } /* sizes and blocks are set by generic_file_write */ - /* ctimes/mtimes will follow with a setattr call */ + /* ctimes/mtimes will follow with a setattr call */ filp_close(file, 0); } - + EXIT; ERROR: set_fs(oldfs); @@ -744,17 +707,16 @@ struct inode *ioobj_to_inode(struct obd_conn *conn, struct obd_ioobj *o) { struct inode *inode = NULL; struct super_block *sb = conn->oc_dev->u.ext2.e2_sb; + ENTRY; if (!sb || !sb->s_dev) { CDEBUG(D_SUPER, "fatal: device not initialized.\n"); - EXIT; - return NULL; + RETURN(NULL); } if ( !o->ioo_id ) { CDEBUG(D_INODE, "fatal: invalid obdo %lu\n", (long)o->ioo_id); - EXIT; - return NULL; + RETURN(NULL); } inode = filter_inode_from_obj(conn->oc_dev, o->ioo_id, S_IFREG); @@ -764,21 +726,20 @@ struct inode *ioobj_to_inode(struct obd_conn *conn, struct obd_ioobj *o) "no links" : "NULL"); if (inode) iput(inode); - EXIT; - return NULL; + RETURN(NULL); } - return inode; + RETURN(inode); } static int filter_preprw(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf *nb, - struct niobuf *res) + int niocount, struct niobuf_remote *nb, + struct niobuf_local *res) { struct obd_ioobj *o = obj; - struct niobuf *b = nb; - struct niobuf *r = res; + struct niobuf_remote *b = nb; + struct niobuf_local *r = res; int i; ENTRY; @@ -813,10 +774,10 @@ static int filter_preprw(int cmd, struct obd_conn *conn, static int filter_commitrw(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf *res) + int niocount, struct niobuf_local *res) { struct obd_ioobj *o = obj; - struct niobuf *r = res; + struct niobuf_local *r = res; int i; ENTRY; @@ -847,20 +808,17 @@ static int filter_statfs (struct obd_conn *conn, struct statfs * statfs) { struct super_block *sb; int err; - ENTRY; if (!gen_client(conn)) { CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id); - EXIT; - return -EINVAL; + RETURN(-EINVAL); } sb = conn->oc_dev->u.filter.fo_sb; err = sb->s_op->statfs(sb, statfs); - EXIT; - return err; + RETURN(err); } /* filter_statfs */ @@ -873,37 +831,34 @@ static int filter_get_info(struct obd_conn *conn, obd_count keylen, if (!(cli = gen_client(conn))) { CDEBUG(D_IOCTL, "invalid client %u\n", conn->oc_id); - return -EINVAL; + RETURN(-EINVAL); } obddev = conn->oc_dev; - + if ( keylen == strlen("blocksize") && memcmp(key, "blocksize", keylen) == 0 ) { *vallen = sizeof(int); *val = (void *)obddev->u.filter.fo_sb->s_blocksize; - EXIT; - return 0; + RETURN(0); } if ( keylen == strlen("blocksize_bits") && memcmp(key, "blocksize_bits", keylen) == 0 ){ *vallen = sizeof(int); *val = (void *)(int)obddev->u.filter.fo_sb->s_blocksize_bits; - EXIT; - return 0; + RETURN(0); } if ( keylen == strlen("root_ino") && memcmp(key, "root_ino", keylen) == 0 ){ *vallen = sizeof(int); *val = (void *)(int) FILTER_ROOTINO; - EXIT; - return 0; + RETURN(0); } - + CDEBUG(D_IOCTL, "invalid key\n"); - return -EINVAL; + RETURN(-EINVAL); } @@ -949,7 +904,7 @@ static void __exit obdfilter_exit(void) MODULE_AUTHOR("Peter J. Braam "); MODULE_DESCRIPTION("Lustre Filtering OBD driver v1.0"); -MODULE_LICENSE("GPL"); +MODULE_LICENSE("GPL"); module_init(obdfilter_init); module_exit(obdfilter_exit); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 70a6260..fb0094e 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -348,131 +348,116 @@ static int osc_destroy(struct obd_conn *conn, struct obdo *oa) return 0; } -int osc_sendpage(struct obd_conn *conn, struct ptlrpc_request *req, - struct niobuf *dst, struct niobuf *src) +static int osc_sendpage(struct ptlrpc_bulk_desc *desc, + struct niobuf_remote *dst, struct niobuf_local *src) { - struct ptlrpc_client *cl; - struct ptlrpc_connection *connection; - struct ptlrpc_bulk_desc *bulk; - int rc; + struct ptlrpc_bulk_page *page; ENTRY; - osc_con2cl(conn, &cl, &connection); - - bulk = ptlrpc_prep_bulk(connection); - if (bulk == NULL) + page = ptlrpc_prep_bulk_page(desc); + if (page == NULL) RETURN(-ENOMEM); - bulk->b_buf = (void *)(unsigned long)src->addr; - bulk->b_buflen = src->len; - bulk->b_xid = dst->xid; - rc = ptlrpc_send_bulk(bulk, OSC_BULK_PORTAL); - if (rc != 0) { - CERROR("send_bulk failed: %d\n", rc); - ptlrpc_free_bulk(bulk); - LBUG(); - RETURN(rc); - } - wait_event_interruptible(bulk->b_waitq, ptlrpc_check_bulk_sent(bulk)); - - if (bulk->b_flags & PTL_RPC_FL_INTR) { - ptlrpc_free_bulk(bulk); - RETURN(-EINTR); - } + page->b_buf = (void *)(unsigned long)src->addr; + page->b_buflen = src->len; + page->b_xid = dst->xid; - ptlrpc_free_bulk(bulk); RETURN(0); } -int osc_brw_read(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, - obd_count *oa_bufs, struct page **buf, obd_size *count, - obd_off *offset, obd_flag *flags) +static int osc_brw_read(struct obd_conn *conn, obd_count num_oa, + struct obdo **oa, obd_count *oa_bufs, struct page **buf, + obd_size *count, obd_off *offset, obd_flag *flags) { struct ptlrpc_client *cl; struct ptlrpc_connection *connection; struct ptlrpc_request *request; struct ost_body *body; + struct list_head *tmp, *next; int pages, rc, i, j, size[3] = {sizeof(*body)}; void *ptr1, *ptr2; - struct ptlrpc_bulk_desc **bulk; + struct ptlrpc_bulk_desc *desc;; ENTRY; size[1] = num_oa * sizeof(struct obd_ioobj); pages = 0; for (i = 0; i < num_oa; i++) pages += oa_bufs[i]; - size[2] = pages * sizeof(struct niobuf); - - OBD_ALLOC(bulk, pages * sizeof(*bulk)); - if (bulk == NULL) - RETURN(-ENOMEM); + size[2] = pages * sizeof(struct niobuf_remote); osc_con2cl(conn, &cl, &connection); request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL); if (!request) - GOTO(out, rc = -ENOMEM); + GOTO(out3, rc = -ENOMEM); body = lustre_msg_buf(request->rq_reqmsg, 0); body->data = OBD_BRW_READ; + desc = ptlrpc_prep_bulk(connection); + if (desc == NULL) + GOTO(out2, rc = -ENOMEM); + desc->b_portal = OST_BULK_PORTAL; + ptr1 = lustre_msg_buf(request->rq_reqmsg, 1); ptr2 = lustre_msg_buf(request->rq_reqmsg, 2); for (pages = 0, i = 0; i < num_oa; i++) { ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); for (j = 0; j < oa_bufs[i]; j++, pages++) { - bulk[pages] = ptlrpc_prep_bulk(connection); - if (bulk[pages] == NULL) + struct ptlrpc_bulk_page *page; + page = ptlrpc_prep_bulk_page(desc); + if (page == NULL) GOTO(out, rc = -ENOMEM); spin_lock(&connection->c_lock); - bulk[pages]->b_xid = ++connection->c_xid_out; + page->b_xid = ++connection->c_xid_out; spin_unlock(&connection->c_lock); - bulk[pages]->b_buf = kmap(buf[pages]); - bulk[pages]->b_buflen = PAGE_SIZE; - bulk[pages]->b_portal = OST_BULK_PORTAL; - ost_pack_niobuf(&ptr2, bulk[pages]->b_buf, - offset[pages], count[pages], - flags[pages], bulk[pages]->b_xid); - - rc = ptlrpc_register_bulk(bulk[pages]); - if (rc) - GOTO(out, rc); + page->b_buf = kmap(buf[pages]); + page->b_buflen = PAGE_SIZE; + ost_pack_niobuf(&ptr2, offset[pages], count[pages], + flags[pages], page->b_xid); } } + rc = ptlrpc_register_bulk(desc); + if (rc) + GOTO(out, rc); + request->rq_replen = lustre_msg_size(1, size); rc = ptlrpc_queue_wait(request); + if (rc) + ptlrpc_abort_bulk(desc); GOTO(out, rc); out: - /* FIXME: if we've called ptlrpc_wait_bulk but rc != 0, we need to - * abort those bulk listeners. */ + list_for_each_safe(tmp, next, &desc->b_page_list) { + struct ptlrpc_bulk_page *page; + page = list_entry(tmp, struct ptlrpc_bulk_page, b_link); - for (pages = 0, i = 0; i < num_oa; i++) { - for (j = 0; j < oa_bufs[i]; j++, pages++) { - if (bulk[pages] == NULL) - continue; - kunmap(buf[pages]); - ptlrpc_free_bulk(bulk[pages]); - } + if (page->b_buf != NULL) + kunmap(page->b_buf); } - OBD_FREE(bulk, pages * sizeof(*bulk)); + ptlrpc_free_bulk(desc); + out2: ptlrpc_free_req(request); + out3: return rc; } -int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, - obd_count *oa_bufs, struct page **buf, obd_size *count, - obd_off *offset, obd_flag *flags) +static int osc_brw_write(struct obd_conn *conn, obd_count num_oa, + struct obdo **oa, obd_count *oa_bufs, + struct page **buf, obd_size *count, obd_off *offset, + obd_flag *flags) { struct ptlrpc_client *cl; struct ptlrpc_connection *connection; struct ptlrpc_request *request; + struct ptlrpc_bulk_desc *desc; struct obd_ioobj ioo; struct ost_body *body; - struct niobuf *src; + struct niobuf_local *local; + struct niobuf_remote *remote; long pages; int rc, i, j, size[3] = {sizeof(*body)}; void *ptr1, *ptr2; @@ -482,16 +467,16 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, pages = 0; for (i = 0; i < num_oa; i++) pages += oa_bufs[i]; - size[2] = pages * sizeof(*src); + size[2] = pages * sizeof(*remote); - OBD_ALLOC(src, size[2]); - if (!src) + OBD_ALLOC(local, pages * sizeof(*local)); + if (local == NULL) RETURN(-ENOMEM); osc_con2cl(conn, &cl, &connection); request = ptlrpc_prep_req(cl, connection, OST_BRW, 3, size, NULL); if (!request) - RETURN(-ENOMEM); + GOTO(out3, rc = -ENOMEM); body = lustre_msg_buf(request->rq_reqmsg, 0); body->data = OBD_BRW_WRITE; @@ -500,50 +485,64 @@ int osc_brw_write(struct obd_conn *conn, obd_count num_oa, struct obdo **oa, for (pages = 0, i = 0; i < num_oa; i++) { ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); for (j = 0; j < oa_bufs[i]; j++, pages++) { - ost_pack_niobuf(&ptr2, kmap(buf[pages]), offset[pages], - count[pages], flags[pages], 0); + local[pages].addr = (__u64)(long)kmap(buf[pages]); + local[pages].offset = offset[pages]; + local[pages].len = count[pages]; + ost_pack_niobuf(&ptr2, offset[pages], count[pages], + flags[pages], 0); } } - memcpy(src, lustre_msg_buf(request->rq_reqmsg, 2), size[2]); - size[1] = pages * sizeof(struct niobuf); + size[1] = pages * sizeof(struct niobuf_remote); request->rq_replen = lustre_msg_size(2, size); rc = ptlrpc_queue_wait(request); if (rc) - GOTO(out, rc); + GOTO(out2, rc); ptr2 = lustre_msg_buf(request->rq_repmsg, 1); if (ptr2 == NULL) - GOTO(out, rc = -EINVAL); + GOTO(out2, rc = -EINVAL); - if (request->rq_repmsg->buflens[1] != pages * sizeof(struct niobuf)) { + if (request->rq_repmsg->buflens[1] != + pages * sizeof(struct niobuf_remote)) { CERROR("buffer length wrong (%d vs. %ld)\n", request->rq_repmsg->buflens[1], - pages * sizeof(struct niobuf)); - GOTO(out, rc = -EINVAL); + pages * sizeof(struct niobuf_remote)); + GOTO(out2, rc = -EINVAL); } + desc = ptlrpc_prep_bulk(connection); + desc->b_portal = OSC_BULK_PORTAL; + for (pages = 0, i = 0; i < num_oa; i++) { for (j = 0; j < oa_bufs[i]; j++, pages++) { - struct niobuf *dst; - ost_unpack_niobuf(&ptr2, &dst); - osc_sendpage(conn, request, dst, &src[pages]); + ost_unpack_niobuf(&ptr2, &remote); + rc = osc_sendpage(desc, remote, &local[pages]); + if (rc) + GOTO(out, rc); } } - OBD_FREE(src, size[2]); + + rc = ptlrpc_send_bulk(desc); + GOTO(out, rc); + out: + ptlrpc_free_bulk(desc); + out2: + ptlrpc_free_req(request); for (pages = 0, i = 0; i < num_oa; i++) for (j = 0; j < oa_bufs[i]; j++, pages++) kunmap(buf[pages]); + out3: + OBD_FREE(local, pages * sizeof(*local)); - ptlrpc_free_req(request); - return 0; + return rc; } -int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, - struct obdo **oa, obd_count *oa_bufs, struct page **buf, - obd_size *count, obd_off *offset, obd_flag *flags) +static int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, + struct obdo **oa, obd_count *oa_bufs, struct page **buf, + obd_size *count, obd_off *offset, obd_flag *flags) { if (rw == OBD_BRW_READ) return osc_brw_read(conn, num_oa, oa, oa_bufs, buf, count, @@ -553,10 +552,11 @@ int osc_brw(int rw, struct obd_conn *conn, obd_count num_oa, offset, flags); } -int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns, - struct ldlm_handle *parent_lock, __u64 *res_id, __u32 type, - struct ldlm_extent *extent, __u32 mode, int *flags, void *data, - int datalen, struct ldlm_handle *lockh) +static int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns, + struct ldlm_handle *parent_lock, __u64 *res_id, + __u32 type, struct ldlm_extent *extent, __u32 mode, + int *flags, void *data, int datalen, + struct ldlm_handle *lockh) { struct ptlrpc_connection *conn; struct ptlrpc_client *cl; @@ -610,7 +610,8 @@ int osc_enqueue(struct obd_conn *oconn, struct ldlm_namespace *ns, return rc; } -int osc_cancel(struct obd_conn *oconn, __u32 mode, struct ldlm_handle *lockh) +static int osc_cancel(struct obd_conn *oconn, __u32 mode, + struct ldlm_handle *lockh) { struct ldlm_lock *lock; ENTRY; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 8d56058..6c9e27d 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -252,10 +252,11 @@ static int ost_get_info(struct ost_obd *ost, struct ptlrpc_request *req) static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req) { - struct ptlrpc_bulk_desc *bulk = NULL; + struct ptlrpc_bulk_desc *desc; struct obd_conn conn; void *tmp1, *tmp2, *end2; - struct niobuf *nb, *dst, *res = NULL; + struct niobuf_remote *remote_nb; + struct niobuf_local *local_nb = NULL; struct obd_ioobj *ioo; struct ost_body *body; int rc, cmd, i, j, objcount, niocount, size = sizeof(*body); @@ -266,7 +267,7 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req) tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2]; objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo); - niocount = req->rq_reqmsg->buflens[2] / sizeof(*nb); + niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb); cmd = body->data; conn.oc_id = body->connid; @@ -276,75 +277,71 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req) ost_unpack_ioo(&tmp1, &ioo); if (tmp2 + ioo->ioo_bufcnt > end2) { LBUG(); - rc = -EFAULT; - break; - } - for (j = 0; j < ioo->ioo_bufcnt; j++) { - ost_unpack_niobuf(&tmp2, &nb); + GOTO(out, rc = -EFAULT); } + for (j = 0; j < ioo->ioo_bufcnt; j++) + ost_unpack_niobuf(&tmp2, &remote_nb); } rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) RETURN(rc); - OBD_ALLOC(res, sizeof(*res) * niocount); - if (res == NULL) + OBD_ALLOC(local_nb, sizeof(*local_nb) * niocount); + if (local_nb == NULL) RETURN(-ENOMEM); /* The unpackers move tmp1 and tmp2, so reset them before using */ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); req->rq_status = obd_preprw(cmd, &conn, objcount, - tmp1, niocount, tmp2, res); + tmp1, niocount, tmp2, local_nb); if (req->rq_status) - GOTO(out_res, 0); + GOTO(out_local, 0); - for (i = 0; i < niocount; i++) { - bulk = ptlrpc_prep_bulk(req->rq_connection); - if (bulk == NULL) { - CERROR("cannot alloc bulk desc\n"); - GOTO(out_res, rc = -ENOMEM); - } + desc = ptlrpc_prep_bulk(req->rq_connection); + if (desc == NULL) + GOTO(out_local, rc = -ENOMEM); + desc->b_portal = OST_BULK_PORTAL; - dst = &(((struct niobuf *)tmp2)[i]); - bulk->b_xid = dst->xid; - bulk->b_buf = (void *)(unsigned long)res[i].addr; + for (i = 0; i < niocount; i++) { + struct ptlrpc_bulk_page *bulk; + bulk = ptlrpc_prep_bulk_page(desc); + if (bulk == NULL) + GOTO(out_bulk, rc = -ENOMEM); + remote_nb = &(((struct niobuf_remote *)tmp2)[i]); + bulk->b_xid = remote_nb->xid; + bulk->b_buf = (void *)(unsigned long)local_nb[i].addr; bulk->b_buflen = PAGE_SIZE; - rc = ptlrpc_send_bulk(bulk, OST_BULK_PORTAL); - if (rc) - GOTO(out_bulk, rc); - wait_event_interruptible(bulk->b_waitq, - ptlrpc_check_bulk_sent(bulk)); + } - if (bulk->b_flags & PTL_RPC_FL_INTR) - GOTO(out_bulk, 0); + rc = ptlrpc_send_bulk(desc); + if (rc) + GOTO(out_bulk, rc); - ptlrpc_free_bulk(bulk); - } - bulk = NULL; + ptlrpc_free_bulk(desc); /* The unpackers move tmp1 and tmp2, so reset them before using */ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); req->rq_status = obd_commitrw(cmd, &conn, objcount, - tmp1, niocount, res); + tmp1, niocount, local_nb); - EXIT; -out_bulk: - if (bulk != NULL) - ptlrpc_free_bulk(bulk); -out_res: - if (res != NULL) - OBD_FREE(res, sizeof(*res) * niocount); + RETURN(rc); + out_bulk: + ptlrpc_free_bulk(desc); + out_local: + if (local_nb != NULL) + OBD_FREE(local_nb, sizeof(*local_nb) * niocount); + out: return 0; } static int ost_commit_page(struct obd_conn *conn, struct page *page) { struct obd_ioobj obj; - struct niobuf buf; + struct niobuf_local buf; int rc; ENTRY; @@ -358,23 +355,31 @@ static int ost_commit_page(struct obd_conn *conn, struct page *page) RETURN(rc); } -static int ost_brw_write_cb(struct ptlrpc_bulk_desc *bulk, void *data) +static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk) { int rc; - ENTRY; - rc = ost_commit_page(&bulk->b_conn, bulk->b_page); + rc = ost_commit_page(&bulk->b_desc->b_conn, bulk->b_page); if (rc) CERROR("ost_commit_page failed: %d\n", rc); RETURN(rc); } +static int ost_brw_write_finished_cb(struct ptlrpc_bulk_desc *desc) +{ + ptlrpc_free_bulk(desc); + + return 0; +} + static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) { + struct ptlrpc_bulk_desc *desc; struct obd_conn conn; - struct niobuf *nb, *res; + struct niobuf_remote *remote_nb; + struct niobuf_local *local_nb, *lnb; struct obd_ioobj *ioo; struct ost_body *body; int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)}; @@ -386,7 +391,7 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); end2 = (char *)tmp2 + req->rq_reqmsg->buflens[2]; objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo); - niocount = req->rq_reqmsg->buflens[2] / sizeof(*nb); + niocount = req->rq_reqmsg->buflens[2] / sizeof(*remote_nb); cmd = body->data; conn.oc_id = body->connid; @@ -398,56 +403,73 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) rc = -EFAULT; break; } - for (j = 0; j < ioo->ioo_bufcnt; j++) { - ost_unpack_niobuf((void *)&tmp2, &nb); - } + for (j = 0; j < ioo->ioo_bufcnt; j++) + ost_unpack_niobuf((void *)&tmp2, &remote_nb); } - size[1] = niocount * sizeof(*nb); + size[1] = niocount * sizeof(*remote_nb); rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) - RETURN(rc); + GOTO(fail, rc); + remote_nb = lustre_msg_buf(req->rq_repmsg, 1); - res = lustre_msg_buf(req->rq_repmsg, 1); + OBD_ALLOC(local_nb, niocount * sizeof(*local_nb)); + if (local_nb == NULL) + GOTO(fail, rc = -ENOMEM); /* The unpackers move tmp1 and tmp2, so reset them before using */ tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); req->rq_status = obd_preprw(cmd, &conn, objcount, - tmp1, niocount, tmp2, res); - + tmp1, niocount, tmp2, local_nb); if (req->rq_status) - GOTO(out, 0); + GOTO(success, 0); - for (i = 0; i < niocount; i++, res++) { - struct ptlrpc_bulk_desc *bulk; + desc = ptlrpc_prep_bulk(req->rq_connection); + if (desc == NULL) + GOTO(fail_preprw, rc = -ENOMEM); + desc->b_cb = ost_brw_write_finished_cb; + desc->b_portal = OSC_BULK_PORTAL; + memcpy(&(desc->b_conn), &conn, sizeof(conn)); + + for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) { struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service; + struct ptlrpc_bulk_page *bulk; - bulk = ptlrpc_prep_bulk(req->rq_connection); + bulk = ptlrpc_prep_bulk_page(desc); if (bulk == NULL) - GOTO(out, rc = -ENOMEM); + GOTO(fail_bulk, rc = -ENOMEM); spin_lock(&srv->srv_lock); bulk->b_xid = srv->srv_xid++; spin_unlock(&srv->srv_lock); - res->xid = HTON__u32(bulk->b_xid); - - bulk->b_buf = (void *)(unsigned long)res->addr; - bulk->b_cb = ost_brw_write_cb; - bulk->b_page = res->page; - memcpy(&(bulk->b_conn), &conn, sizeof(conn)); + bulk->b_buf = (void *)(unsigned long)lnb->addr; + bulk->b_page = lnb->page; bulk->b_buflen = PAGE_SIZE; - bulk->b_portal = OSC_BULK_PORTAL; - rc = ptlrpc_register_bulk(bulk); - if (rc) - GOTO(out, rc); + bulk->b_cb = ost_brw_write_cb; + + /* this advances remote_nb */ + ost_pack_niobuf((void **)&remote_nb, lnb->offset, lnb->len, 0, + bulk->b_xid); } + rc = ptlrpc_register_bulk(desc); + if (rc) + GOTO(fail_bulk, rc); + EXIT; - out: - /* FIXME: should we return 'rc' here? */ + success: + OBD_FREE(local_nb, niocount * sizeof(*local_nb)); return 0; + + fail_bulk: + ptlrpc_free_bulk(desc); + fail_preprw: + OBD_FREE(local_nb, niocount * sizeof(*local_nb)); + /* FIXME: how do we undo the preprw? */ + fail: + return rc; } static int ost_brw(struct ost_obd *obddev, struct ptlrpc_request *req) @@ -595,11 +617,9 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf) err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); if (err) GOTO(error_disc, err = -EINVAL); -#if 1 err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost"); if (err) GOTO(error_disc, err = -EINVAL); -#endif RETURN(0); diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 167e67d..288d43f 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -93,25 +93,62 @@ struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn) if (bulk != NULL) { bulk->b_connection = ptlrpc_connection_addref(conn); init_waitqueue_head(&bulk->b_waitq); + INIT_LIST_HEAD(&bulk->b_page_list); } return bulk; } +struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc) +{ + struct ptlrpc_bulk_page *page; + + OBD_ALLOC(page, sizeof(*page)); + if (page != NULL) { + page->b_desc = desc; + ptl_set_inv_handle(&page->b_md_h); + ptl_set_inv_handle(&page->b_me_h); + list_add(&page->b_link, &desc->b_page_list); + desc->b_page_count++; + } + return page; +} + void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk) { + struct list_head *tmp, *next; ENTRY; if (bulk == NULL) { EXIT; return; } + list_for_each_safe(tmp, next, &bulk->b_page_list) { + struct ptlrpc_bulk_page *page; + page = list_entry(tmp, struct ptlrpc_bulk_page, b_link); + ptlrpc_free_bulk_page(page); + } + ptlrpc_put_connection(bulk->b_connection); OBD_FREE(bulk, sizeof(*bulk)); EXIT; } +void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *page) +{ + ENTRY; + if (page == NULL) { + EXIT; + return; + } + + list_del(&page->b_link); + page->b_desc->b_page_count--; + OBD_FREE(page, sizeof(*page)); + EXIT; +} + struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, int opcode, int count, int *lengths, diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 5c0930d..a016f35 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -133,15 +133,16 @@ int request_in_callback(ptl_event_t *ev, void *data) static int bulk_source_callback(ptl_event_t *ev, void *data) { - struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr; + struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr; + struct ptlrpc_bulk_desc *desc = bulk->b_desc; ENTRY; if (ev->type == PTL_EVENT_SENT) { CDEBUG(D_NET, "got SENT event\n"); } else if (ev->type == PTL_EVENT_ACK) { CDEBUG(D_NET, "got ACK event\n"); - bulk->b_flags |= PTL_BULK_FL_SENT; - wake_up_interruptible(&bulk->b_waitq); + desc->b_flags |= PTL_BULK_FL_SENT; + wake_up_interruptible(&desc->b_waitq); } else { CERROR("Unexpected event type!\n"); LBUG(); @@ -152,25 +153,27 @@ static int bulk_source_callback(ptl_event_t *ev, void *data) static int bulk_sink_callback(ptl_event_t *ev, void *data) { - struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr; + struct ptlrpc_bulk_page *bulk = ev->mem_desc.user_ptr; + struct ptlrpc_bulk_desc *desc = bulk->b_desc; ENTRY; if (ev->type == PTL_EVENT_PUT) { if (bulk->b_buf != ev->mem_desc.start + ev->offset) CERROR("bulkbuf != mem_desc -- why?\n"); - bulk->b_flags |= PTL_BULK_FL_RCVD; + desc->b_finished_count++; + if (desc->b_finished_count == desc->b_page_count) { + desc->b_flags |= PTL_BULK_FL_RCVD; + wake_up_interruptible(&desc->b_waitq); + if (desc->b_cb != NULL) + desc->b_cb(desc); + } if (bulk->b_cb != NULL) - bulk->b_cb(bulk, data); - wake_up_interruptible(&bulk->b_waitq); + bulk->b_cb(bulk); } else { CERROR("Unexpected event type!\n"); LBUG(); } - /* FIXME: This should happen unconditionally */ - if (bulk->b_cb != NULL) - ptlrpc_free_bulk(bulk); - RETURN(1); } diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 755032d..5fc28f9 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -21,20 +21,15 @@ */ #define EXPORT_SYMTAB - #define DEBUG_SUBSYSTEM S_RPC #include -extern ptl_handle_eq_t request_out_eq, - reply_in_eq, - reply_out_eq, - bulk_source_eq, - bulk_sink_eq; +extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, + bulk_source_eq, bulk_sink_eq; static ptl_process_id_t local_id = {PTL_ID_ANY, PTL_ID_ANY}; - -int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk) +static int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk) { ENTRY; @@ -108,92 +103,121 @@ static int ptl_send_buf(struct ptlrpc_request *request, return rc; } -int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal) +int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc) { int rc; + struct list_head *tmp, *next; ptl_process_id_t remote_id; + ENTRY; - bulk->b_md.start = bulk->b_buf; - bulk->b_md.length = bulk->b_buflen; - bulk->b_md.eventq = bulk_source_eq; - bulk->b_md.threshold = 2; /* SENT and ACK events */ - bulk->b_md.options = PTL_MD_OP_PUT; - bulk->b_md.user_ptr = bulk; - - rc = PtlMDBind(bulk->b_connection->c_peer.peer_ni, bulk->b_md, - &bulk->b_md_h); - if (rc != 0) { - CERROR("PtlMDBind failed: %d\n", rc); - LBUG(); - return rc; + list_for_each_safe(tmp, next, &desc->b_page_list) { + /* only request an ACK for the last page */ + int ack = (next == &desc->b_page_list); + struct ptlrpc_bulk_page *bulk; + bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); + + bulk->b_md.start = bulk->b_buf; + bulk->b_md.length = bulk->b_buflen; + bulk->b_md.eventq = bulk_source_eq; + bulk->b_md.threshold = 1 + ack; /* SENT and (if last) ACK */ + bulk->b_md.options = PTL_MD_OP_PUT; + bulk->b_md.user_ptr = bulk; + + rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, bulk->b_md, + &bulk->b_md_h); + if (rc != 0) { + CERROR("PtlMDBind failed: %d\n", rc); + LBUG(); + RETURN(rc); + } + + remote_id.nid = desc->b_connection->c_peer.peer_nid; + remote_id.pid = 0; + + CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n", + bulk->b_md.length, desc->b_portal, bulk->b_xid); + + rc = PtlPut(bulk->b_md_h, (ack ? PTL_ACK_REQ : PTL_NOACK_REQ), + remote_id, desc->b_portal, 0, bulk->b_xid, 0, 0); + if (rc != PTL_OK) { + CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, + desc->b_portal, bulk->b_xid, rc); + PtlMDUnlink(bulk->b_md_h); + LBUG(); + RETURN(rc); + } } - remote_id.nid = bulk->b_connection->c_peer.peer_nid; - remote_id.pid = 0; + wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc)); - CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n", - bulk->b_md.length, portal, bulk->b_xid); + if (desc->b_flags & PTL_RPC_FL_INTR) + RETURN(-EINTR); - rc = PtlPut(bulk->b_md_h, PTL_ACK_REQ, remote_id, portal, 0, - bulk->b_xid, 0, 0); - if (rc != PTL_OK) { - CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, - portal, bulk->b_xid, rc); - PtlMDUnlink(bulk->b_md_h); - LBUG(); - } - - return rc; + RETURN(0); } -int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *bulk) +int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc) { + struct list_head *tmp, *next; int rc; ENTRY; - rc = PtlMEAttach(bulk->b_connection->c_peer.peer_ni, bulk->b_portal, - local_id, bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER, - &bulk->b_me_h); - if (rc != PTL_OK) { - CERROR("PtlMEAttach failed: %d\n", rc); - LBUG(); - GOTO(cleanup, rc); - } - - bulk->b_md.start = bulk->b_buf; - bulk->b_md.length = bulk->b_buflen; - bulk->b_md.threshold = 1; - bulk->b_md.options = PTL_MD_OP_PUT; - bulk->b_md.user_ptr = bulk; - bulk->b_md.eventq = bulk_sink_eq; - - rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h); - //CERROR("MDAttach (bulk sink): %Lu\n", (__u64)bulk->b_md_h); - if (rc != PTL_OK) { - CERROR("PtlMDAttach failed: %d\n", rc); - LBUG(); - GOTO(cleanup, rc); + list_for_each_safe(tmp, next, &desc->b_page_list) { + struct ptlrpc_bulk_page *bulk; + bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); + + rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni, + desc->b_portal, local_id, bulk->b_xid, 0, + PTL_UNLINK, PTL_INS_AFTER, &bulk->b_me_h); + if (rc != PTL_OK) { + CERROR("PtlMEAttach failed: %d\n", rc); + LBUG(); + GOTO(cleanup, rc); + } + + bulk->b_md.start = bulk->b_buf; + bulk->b_md.length = bulk->b_buflen; + bulk->b_md.threshold = 1; + bulk->b_md.options = PTL_MD_OP_PUT; + bulk->b_md.user_ptr = bulk; + bulk->b_md.eventq = bulk_sink_eq; + + rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, + &bulk->b_md_h); + if (rc != PTL_OK) { + CERROR("PtlMDAttach failed: %d\n", rc); + LBUG(); + GOTO(cleanup, rc); + } + + CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, " + "portal %u\n", bulk->b_buflen, bulk->b_xid, + desc->b_portal); } - CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n", - bulk->b_buflen, bulk->b_xid, bulk->b_portal); RETURN(0); - // XXX Confirm that this is safe! cleanup: - PtlMEUnlink(bulk->b_me_h); + ptlrpc_abort_bulk(desc); + return rc; } -int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *bulk) +int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc) { - int rc; + struct list_head *tmp, *next; - rc = PtlMEUnlink(bulk->b_me_h); - if (rc != PTL_OK) - CERROR("PtlMEUnlink failed: %d\n", rc); + list_for_each_safe(tmp, next, &desc->b_page_list) { + struct ptlrpc_bulk_page *bulk; + bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); - return rc; + /* This should be safe: these handles are initialized to be + * invalid in ptlrpc_prep_bulk_page() */ + PtlMDUnlink(bulk->b_md_h); + PtlMEUnlink(bulk->b_me_h); + } + + return 0; } int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req) diff --git a/lustre/tests/llmount.sh b/lustre/tests/llmount.sh index f42ddb7..410bb3c 100755 --- a/lustre/tests/llmount.sh +++ b/lustre/tests/llmount.sh @@ -13,10 +13,10 @@ setup_lustre echo -n "Hit return to continue..." read -new_fs ext2 /tmp/ost 10000 +new_fs ext2 /tmp/ost 10001 OST=$LOOPDEV MDSFS=ext3 -new_fs ${MDSFS} /tmp/mds 10000 +new_fs ${MDSFS} /tmp/mds 10001 MDS=$LOOPDEV echo 0xffffffff > /proc/sys/portals/debug -- 1.8.3.1