From 18dfabe7161183d8aef320e4fce320a501567a1b Mon Sep 17 00:00:00 2001 From: braam Date: Fri, 1 Feb 2002 19:32:21 +0000 Subject: [PATCH] - zero out the request structure after allocation - do readpage via portals - added bulk moving functionality to rpc.c - cleaned up one small MD leak --- lustre/include/linux/lustre_net.h | 12 +++++ lustre/mdc/mdc_request.c | 2 + lustre/mds/handler.c | 42 ++++++++++++++---- lustre/ptlrpc/rpc.c | 93 ++++++++++++++++++++++++++++++++++++--- 4 files changed, 134 insertions(+), 15 deletions(-) diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 4c3b375..1af56f1 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -33,6 +33,10 @@ #define MDS_REPLY_PORTAL 4 #define OST_REQUEST_PORTAL 5 #define OST_REPLY_PORTAL 6 +#define MDC_BULK_PORTAL 7 +#define MDS_BULK_PORTAL 8 +#define OSC_BULK_PORTAL 9 +#define OST_BULK_PORTAL 10 struct ptlrpc_service { char *srv_buf; @@ -70,14 +74,22 @@ struct ptlrpc_request { struct ptlrep_hdr *rq_rephdr; union ptl_rep rq_rep; + char *rq_bulkbuf; + int rq_bulklen; + int (*rq_bulk_cb)(struct ptlrpc_request *, void *); + void * rq_reply_handle; wait_queue_head_t rq_wait_for_rep; + wait_queue_head_t rq_wait_for_bulk; ptl_md_t rq_reply_md; ptl_handle_md_t rq_reply_md_h; ptl_md_t rq_req_md; + ptl_md_t rq_bulk_md; + ptl_handle_md_t rq_bulk_md_h; __u32 rq_reply_portal; __u32 rq_req_portal; + __u32 rq_bulk_portal; struct lustre_peer rq_peer; }; diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index b59305f..fba4cc1 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -50,6 +50,7 @@ struct ptlrpc_request *mds_prep_req(int opcode, int namelen, char *name, int tgt return NULL; } + memset(request, 0, sizeof(*request)); request->rq_xid = mdc_xid++; rc = mds_pack_req(name, namelen, tgt, tgtlen, @@ -178,6 +179,7 @@ int mdc_readpage(struct lustre_peer *peer, ino_t ino, int type, __u64 offset, request->rq_req.mds->size = offset; request->rq_req.mds->tgtlen = sizeof(niobuf); + request->rq_bulk_portal = MDS_BULK_PORTAL; request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 72bfa8b..19b9691 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -77,14 +77,40 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, { int rc; mm_segment_t oldfs = get_fs(); - /* dst->addr is a user address, but in a different task! */ - set_fs(KERNEL_DS); - rc = generic_file_read(file, (char *)(long)dst->addr, - PAGE_SIZE, &offset); - set_fs(oldfs); - - if (rc != PAGE_SIZE) - return -EIO; + + if (req->rq_peer.peer_nid == 0) { + /* dst->addr is a user address, but in a different task! */ + set_fs(KERNEL_DS); + rc = generic_file_read(file, (char *)(long)dst->addr, + PAGE_SIZE, &offset); + set_fs(oldfs); + + if (rc != PAGE_SIZE) + return -EIO; + } else { + char *buf; + + buf = kmalloc(PAGE_SIZE, GFP_KERNEL); + if (!buf) { + return -ENOMEM; + } + + set_fs(KERNEL_DS); + rc = generic_file_read(file, buf, PAGE_SIZE, &offset); + set_fs(oldfs); + + if (rc != PAGE_SIZE) + return -EIO; + + req->rq_bulkbuf = buf; + req->rq_bulklen = PAGE_SIZE; + rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0); + init_waitqueue_head(&req->rq_wait_for_bulk); + sleep_on(&req->rq_wait_for_bulk); + kfree(buf); + req->rq_bulklen = 0; /* FIXME: eek. */ + } + return 0; } diff --git a/lustre/ptlrpc/rpc.c b/lustre/ptlrpc/rpc.c index 9c0e38a..4969603 100644 --- a/lustre/ptlrpc/rpc.c +++ b/lustre/ptlrpc/rpc.c @@ -29,7 +29,7 @@ #include #include -static ptl_handle_eq_t req_eq; +static ptl_handle_eq_t req_eq, bulk_source_eq, bulk_sink_eq; static int request_callback(ptl_event_t *ev, void *data) { @@ -64,6 +64,42 @@ static int incoming_callback(ptl_event_t *ev, void *data) return 0; } +static int bulk_source_callback(ptl_event_t *ev, void *data) +{ + struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; + + ENTRY; + + if (ev->type == PTL_EVENT_SENT) { + ; + } else if (ev->type == PTL_EVENT_ACK) { + wake_up_interruptible(&rpc->rq_wait_for_bulk); + } else { + printk("Unexpected event type in " __FUNCTION__ "!\n"); + } + + EXIT; + return 1; +} + +static int bulk_sink_callback(ptl_event_t *ev, void *data) +{ + struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; + + ENTRY; + + if (ev->type == PTL_EVENT_PUT) { + if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset) + printk(__FUNCTION__ ": bulkbuf != mem_desc -- why?\n"); + wake_up_interruptible(&rpc->rq_wait_for_bulk); + } else { + printk("Unexpected event type in " __FUNCTION__ "!\n"); + } + + EXIT; + return 1; +} + int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, int portal, int is_request) { @@ -71,17 +107,23 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, ptl_process_id_t remote_id; ptl_handle_md_t md_h; - if (is_request) { + /* FIXME: This is bad. */ + if (request->rq_bulklen) { + request->rq_req_md.start = request->rq_bulkbuf; + request->rq_req_md.length = request->rq_bulklen; + request->rq_req_md.eventq = bulk_source_eq; + } else if (is_request) { request->rq_req_md.start = request->rq_reqbuf; request->rq_req_md.length = request->rq_reqlen; + request->rq_req_md.eventq = req_eq; } else { request->rq_req_md.start = request->rq_repbuf; request->rq_req_md.length = request->rq_replen; + request->rq_req_md.eventq = req_eq; } - request->rq_req_md.threshold = PTL_MD_THRESH_INF; + request->rq_req_md.threshold = 1; request->rq_req_md.options = PTL_MD_OP_PUT; request->rq_req_md.user_ptr = request; - request->rq_req_md.eventq = req_eq; rc = PtlMDBind(peer->peer_ni, request->rq_req_md, &md_h); if (rc != 0) { @@ -93,8 +135,13 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, remote_id.nid = peer->peer_nid; remote_id.pid = 0; - rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, request->rq_xid, - 0, 0); + if (request->rq_bulklen) { + rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, + request->rq_xid, 0, 0); + } else { + rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, + request->rq_xid, 0, 0); + } if (rc != PTL_OK) { printk(__FUNCTION__ ": PtlPut failed: %d\n", rc); /* FIXME: tear down md */ @@ -105,7 +152,7 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) { - ptl_handle_me_t me_h; + ptl_handle_me_t me_h, bulk_me_h; ptl_process_id_t local_id; int rc; @@ -143,6 +190,30 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) return rc; } + if (request->rq_bulklen != 0) { + rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal, + local_id, request->rq_xid, 0, PTL_UNLINK, + &bulk_me_h); + if (rc != PTL_OK) { + EXIT; + return rc; + } + + request->rq_bulk_md.start = request->rq_bulkbuf; + request->rq_bulk_md.length = request->rq_bulklen; + request->rq_bulk_md.threshold = 1; + request->rq_bulk_md.options = PTL_MD_OP_PUT; + request->rq_bulk_md.user_ptr = request; + request->rq_bulk_md.eventq = bulk_sink_eq; + + rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK, + &request->rq_bulk_md_h); + if (rc != PTL_OK) { + EXIT; + return rc; + } + } + return ptl_send_buf(request, peer, request->rq_req_portal, 1); } @@ -219,6 +290,14 @@ static int req_init_portals(void) if (rc != PTL_OK) printk("PtlEQAlloc failed: %d\n", rc); + rc = PtlEQAlloc(ni, 128, bulk_source_callback, NULL, &bulk_source_eq); + if (rc != PTL_OK) + printk("PtlEQAlloc failed: %d\n", rc); + + rc = PtlEQAlloc(ni, 128, bulk_sink_callback, NULL, &bulk_sink_eq); + if (rc != PTL_OK) + printk("PtlEQAlloc failed: %d\n", rc); + return rc; } -- 1.8.3.1