From 610bb8ab3e5890171db615bb1a57f0cceaff5057 Mon Sep 17 00:00:00 2001 From: pschwan Date: Thu, 28 Feb 2002 03:05:25 +0000 Subject: [PATCH] - added rq_type field to ptlrpc_request - this field must be one of PTLRPC_REQUEST, PTLRPC_REPLY, PTLRPC_BULK - mdc_request now waits for directory pages to be sent via the bulk portal - change MDS bulk waitq to the new style - ptl_send_buf no longer takes the 'is_request' argument; this is encoded in the rq_type mentioned above - added ptl_abort_rpc() to tear down reply structures for aborted requests. - refactored ptl_send_buf code to rid ourselves of some of the more gross bulklen hacks --- lustre/include/linux/lustre_net.h | 17 +++++--- lustre/mdc/mdc_request.c | 6 +-- lustre/mds/handler.c | 30 ++++++++++++-- lustre/osc/osc_request.c | 3 +- lustre/ost/ost_handler.c | 3 +- lustre/ptlrpc/rpc.c | 83 ++++++++++++++++++++++++++------------- lustre/tests/llmount.sh | 2 +- 7 files changed, 102 insertions(+), 42 deletions(-) diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 2aa6862..0efec74 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -74,18 +74,18 @@ struct ptlrpc_service { int *replen, char **repbuf); }; - struct ptlrpc_request { + int rq_type; /* one of PTLRPC_REQUEST, PTLRPC_REPLY, PTLRPC_BULK */ struct list_head rq_list; struct mds_obd *rq_obd; struct ost_obd *rq_ost; int rq_status; + __u32 rq_xid; char *rq_reqbuf; int rq_reqlen; struct ptlreq_hdr *rq_reqhdr; union ptl_req rq_req; - __u32 rq_xid; char *rq_repbuf; int rq_replen; @@ -96,15 +96,18 @@ struct ptlrpc_request { int rq_bulklen; int (*rq_bulk_cb)(struct ptlrpc_request *, void *); - void * rq_reply_handle; + void *rq_reply_handle; wait_queue_head_t rq_wait_for_rep; wait_queue_head_t rq_wait_for_bulk; ptl_md_t rq_reply_md; ptl_handle_md_t rq_reply_md_h; + ptl_handle_me_t rq_reply_me_h; + ptl_md_t rq_req_md; ptl_md_t rq_bulk_md; ptl_handle_md_t rq_bulk_md_h; + ptl_handle_me_t rq_bulk_me_h; __u32 rq_reply_portal; __u32 rq_req_portal; __u32 rq_bulk_portal; @@ -125,11 +128,13 @@ struct ptlrpc_client { int (*cli_enqueue)(struct ptlrpc_request *req); }; - - /* rpc/rpc.c */ +#define PTLRPC_REQUEST 1 +#define PTLRPC_REPLY 2 +#define PTLRPC_BULK 3 + int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, - int portal, int is_request); + int portal); int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer); int ptl_received_rpc(struct ptlrpc_service *service); int rpc_register_service(struct ptlrpc_service *service, char *uuid); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 8d322ad..b027372 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -176,7 +176,7 @@ int mdc_readpage(struct ptlrpc_client *peer, ino_t ino, int type, __u64 offset, CDEBUG(D_INODE, "inode: %ld\n", ino); request = ptlrpc_prep_req(peer, MDS_READPAGE, 0, NULL, - sizeof(struct niobuf), (char *)&niobuf); + sizeof(struct niobuf), (char *)&niobuf); if (!request) { CERROR("mdc request: cannot pack\n"); return -ENOMEM; @@ -187,8 +187,8 @@ int mdc_readpage(struct ptlrpc_client *peer, ino_t ino, int type, __u64 offset, request->rq_req.mds->size = offset; request->rq_req.mds->tgtlen = sizeof(niobuf); - //request->rq_bulklen = PAGE_SIZE; - //request->rq_bulkbuf = (void *)(long)niobuf.addr; + request->rq_bulklen = PAGE_SIZE; + request->rq_bulkbuf = (void *)(long)niobuf.addr; request->rq_bulk_portal = MDS_BULK_PORTAL; request->rq_replen = sizeof(struct ptlrep_hdr) + sizeof(struct mds_rep); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 3b0f965..d2b611e 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -92,6 +92,7 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, return -EIO; } else { char *buf; + DECLARE_WAITQUEUE(wait, current); OBD_ALLOC(buf, PAGE_SIZE); if (!buf) @@ -106,11 +107,33 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file, return -EIO; } + req->rq_type = PTLRPC_BULK; req->rq_bulkbuf = buf; req->rq_bulklen = PAGE_SIZE; + init_waitqueue_head(&req->rq_wait_for_bulk); - rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL, 0); - sleep_on(&req->rq_wait_for_bulk); + rc = ptl_send_buf(req, &req->rq_peer, MDS_BULK_PORTAL); + add_wait_queue(&req->rq_wait_for_bulk, &wait); + /* The bulk callback will set rq->bulkbuf to NULL when it's + * been ACKed and it's finished using it. */ + while (req->rq_bulkbuf != NULL) { + set_current_state(TASK_INTERRUPTIBLE); + + /* if this process really wants to die, let it go */ + if (sigismember(&(current->pending.signal), SIGKILL) || + sigismember(&(current->pending.signal), SIGINT)) + break; + + schedule(); + } + remove_wait_queue(&req->rq_wait_for_bulk, &wait); + set_current_state(TASK_RUNNING); + + if (req->rq_bulkbuf != NULL) { + EXIT; + return -EINTR; + } + OBD_FREE(buf, PAGE_SIZE); req->rq_bulklen = 0; /* FIXME: eek. */ } @@ -128,7 +151,8 @@ int mds_reply(struct ptlrpc_request *req) /* This is a request that came from the network via portals. */ /* FIXME: we need to increment the count of handled events */ - ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL, 0); + req->rq_type = PTLRPC_REPLY; + ptl_send_buf(req, &req->rq_peer, MDS_REPLY_PORTAL); } else { /* This is a local request that came from another thread. */ diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index bbb2f96..1b9dd03 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -422,9 +422,10 @@ int osc_sendpage(struct ptlrpc_request *req, struct niobuf *dst, memcpy(buf, (char *)(unsigned long)src->addr, src->len); + req->rq_type = PTLRPC_BULK; req->rq_bulkbuf = buf; req->rq_bulklen = src->len; - rc = ptl_send_buf(req, &req->rq_peer, OST_BULK_PORTAL, 0); + rc = ptl_send_buf(req, &req->rq_peer, OST_BULK_PORTAL); init_waitqueue_head(&req->rq_wait_for_bulk); sleep_on(&req->rq_wait_for_bulk); OBD_FREE(buf, src->len); diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 3ba12c7..e45c6f9 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -98,7 +98,8 @@ int ost_reply(struct obd_device *obddev, struct ptlrpc_request *req) /* This is a request that came from the network via portals. */ /* FIXME: we need to increment the count of handled events */ - ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL, 0); + req->rq_type = PTLRPC_REPLY; + ptl_send_buf(req, &req->rq_peer, OST_REPLY_PORTAL); } else { /* This is a local request that came from another thread. */ diff --git a/lustre/ptlrpc/rpc.c b/lustre/ptlrpc/rpc.c index 6a7b7c9..d7f46a1 100644 --- a/lustre/ptlrpc/rpc.c +++ b/lustre/ptlrpc/rpc.c @@ -71,8 +71,23 @@ void ptlrpc_free_req(struct ptlrpc_request *request) OBD_FREE(request, sizeof(*request)); } -int ptlrpc_queue_wait(struct ptlrpc_request *req, - struct ptlrpc_client *cl) +/* Abort this request and cleanup any resources associated with it. */ +int ptl_abort_rpc(struct ptlrpc_request *request) +{ + /* First remove the MD for the reply; in theory, this means + * that we can tear down the buffer safely. */ + PtlMEUnlink(request->rq_reply_me_h); + PtlMDUnlink(request->rq_reply_md_h); + + if (request->rq_bulklen != 0) { + PtlMEUnlink(request->rq_bulk_me_h); + PtlMDUnlink(request->rq_bulk_md_h); + } + + return 0; +} + +int ptlrpc_queue_wait(struct ptlrpc_request *req, struct ptlrpc_client *cl) { int rc; DECLARE_WAITQUEUE(wait, current); @@ -112,12 +127,15 @@ int ptlrpc_queue_wait(struct ptlrpc_request *req, CDEBUG(0, "-- done\n"); if (req->rq_repbuf == NULL) { - /* We broke out because of a signal */ + /* We broke out because of a signal. Clean up the dangling + * reply buffers! */ + ptl_abort_rpc(req); EXIT; return -EINTR; } - rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep); + rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, + &req->rq_rep); if (rc) { CERROR("unpack_rep failed: %d\n", rc); return rc; @@ -234,6 +252,7 @@ static int bulk_source_callback(ptl_event_t *ev, void *data) CDEBUG(D_NET, "got SENT event\n"); } else if (ev->type == PTL_EVENT_ACK) { CDEBUG(D_NET, "got ACK event\n"); + rpc->rq_bulkbuf = NULL; wake_up_interruptible(&rpc->rq_wait_for_bulk); } else { CERROR("Unexpected event type!\n"); @@ -253,7 +272,7 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data) if (ev->type == PTL_EVENT_PUT) { if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset) CERROR("bulkbuf != mem_desc -- why?\n"); - wake_up_interruptible(&rpc->rq_wait_for_bulk); + //wake_up_interruptible(&rpc->rq_wait_for_bulk); } else { CERROR("Unexpected event type!\n"); BUG(); @@ -264,27 +283,38 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data) } int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, - int portal, int is_request) + int portal) { int rc; ptl_process_id_t remote_id; ptl_handle_md_t md_h; + ptl_ack_req_t ack; - /* FIXME: This is bad. */ - if (request->rq_bulklen) { + switch (request->rq_type) { + case PTLRPC_BULK: request->rq_req_md.start = request->rq_bulkbuf; request->rq_req_md.length = request->rq_bulklen; request->rq_req_md.eventq = bulk_source_eq; - } else if (is_request) { + request->rq_req_md.threshold = 2; /* SENT and ACK events */ + ack = PTL_ACK_REQ; + break; + case PTLRPC_REQUEST: request->rq_req_md.start = request->rq_reqbuf; request->rq_req_md.length = request->rq_reqlen; request->rq_req_md.eventq = sent_pkt_eq; - } else { + request->rq_req_md.threshold = 1; + ack = PTL_NOACK_REQ; + break; + case PTLRPC_REPLY: request->rq_req_md.start = request->rq_repbuf; request->rq_req_md.length = request->rq_replen; request->rq_req_md.eventq = sent_pkt_eq; + request->rq_req_md.threshold = 1; + ack = PTL_NOACK_REQ; + break; + default: + BUG(); } - request->rq_req_md.threshold = 1; request->rq_req_md.options = PTL_MD_OP_PUT; request->rq_req_md.user_ptr = request; @@ -299,13 +329,10 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, remote_id.nid = peer->peer_nid; remote_id.pid = 0; - if (request->rq_bulklen) { - rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, - request->rq_xid, 0, 0); - } else { - rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, - request->rq_xid, 0, 0); - } + CERROR("Sending %d bytes to portal %d, xid %d\n", + request->rq_req_md.length, portal, request->rq_xid); + + rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0); if (rc != PTL_OK) { BUG(); CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, @@ -318,7 +345,6 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) { - ptl_handle_me_t me_h, bulk_me_h; ptl_process_id_t local_id; int rc; char *repbuf; @@ -345,7 +371,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) CERROR("sending req %d\n", request->rq_xid); rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id, - request->rq_xid, 0, PTL_UNLINK, &me_h); + request->rq_xid, 0, PTL_UNLINK, + &request->rq_reply_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); BUG(); @@ -353,6 +380,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) goto cleanup; } + request->rq_type = PTLRPC_REQUEST; request->rq_reply_md.start = repbuf; request->rq_reply_md.length = request->rq_replen; request->rq_reply_md.threshold = 1; @@ -360,8 +388,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) request->rq_reply_md.user_ptr = request; request->rq_reply_md.eventq = rcvd_rep_eq; - rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK, - &request->rq_reply_md_h); + rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md, + PTL_UNLINK, &request->rq_reply_md_h); if (rc != PTL_OK) { CERROR("PtlMDAttach failed: %d\n", rc); BUG(); @@ -372,7 +400,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) if (request->rq_bulklen != 0) { rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal, local_id, request->rq_xid, 0, PTL_UNLINK, - &bulk_me_h); + &request->rq_bulk_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); BUG(); @@ -387,7 +415,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) request->rq_bulk_md.user_ptr = request; request->rq_bulk_md.eventq = bulk_sink_eq; - rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK, + rc = PtlMDAttach(request->rq_bulk_me_h, + request->rq_bulk_md, PTL_UNLINK, &request->rq_bulk_md_h); if (rc != PTL_OK) { CERROR("PtlMDAttach failed: %d\n", rc); @@ -397,14 +426,14 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) } } - return ptl_send_buf(request, peer, request->rq_req_portal, 1); + return ptl_send_buf(request, peer, request->rq_req_portal); cleanup4: - PtlMEUnlink(bulk_me_h); + PtlMEUnlink(request->rq_bulk_me_h); cleanup3: PtlMDUnlink(request->rq_reply_md_h); cleanup2: - PtlMEUnlink(me_h); + PtlMEUnlink(request->rq_reply_me_h); cleanup: OBD_FREE(repbuf, request->rq_replen); diff --git a/lustre/tests/llmount.sh b/lustre/tests/llmount.sh index bf0e991..bc888bb 100755 --- a/lustre/tests/llmount.sh +++ b/lustre/tests/llmount.sh @@ -62,4 +62,4 @@ quit EOF mkdir /mnt/obd -# mount -t lustre_light -o device=3 none /mnt/obd +mount -t lustre_light -o device=3 none /mnt/obd -- 1.8.3.1