From b6159d3cc3565b0092143a9f4ada753510a318c3 Mon Sep 17 00:00:00 2001 From: pschwan Date: Fri, 1 Mar 2002 01:28:10 +0000 Subject: [PATCH] Bring forward all of the tip's bulk movement fixes. --- lustre/ptlrpc/client.c | 28 ++++++++++++++++++++--- lustre/ptlrpc/events.c | 3 ++- lustre/ptlrpc/niobuf.c | 60 +++++++++++++++++++++++++++++--------------------- 3 files changed, 62 insertions(+), 29 deletions(-) diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index d59dfa6..e67378e 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -32,8 +32,7 @@ #include #include -int ptlrpc_enqueue(struct ptlrpc_client *peer, - struct ptlrpc_request *req) +int ptlrpc_enqueue(struct ptlrpc_client *peer, struct ptlrpc_request *req) { struct ptlrpc_request *srv_req; @@ -68,7 +67,7 @@ int ptlrpc_enqueue(struct ptlrpc_client *peer, int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, req_pack_t req_pack, rep_unpack_t rep_unpack, - struct ptlrpc_client *cl) + struct ptlrpc_client *cl) { int err; @@ -162,6 +161,27 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) return 0; } +/* Abort this request and cleanup any resources associated with it. */ +int ptlrpc_abort(struct ptlrpc_request *request) +{ + /* First remove the MD for the reply; in theory, this means + * that we can tear down the buffer safely. */ + PtlMEUnlink(request->rq_reply_me_h); + PtlMDUnlink(request->rq_reply_md_h); + OBD_FREE(request->rq_repbuf, request->rq_replen); + request->rq_repbuf = NULL; + request->rq_replen = 0; + + if (request->rq_bulklen != 0) { + PtlMEUnlink(request->rq_bulk_me_h); + PtlMDUnlink(request->rq_bulk_md_h); + /* FIXME: wake whoever's sleeping on this bulk sending to let + * -them- clean it up. */ + } + + return 0; +} + int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) { @@ -192,6 +212,8 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) CDEBUG(0, "-- done\n"); if (req->rq_flags == PTL_RPC_INTR) { + /* Clean up the dangling reply buffers */ + ptlrpc_abort(req); EXIT; return -EINTR; } diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index f6299f0..c39d093 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -139,6 +139,7 @@ static int bulk_source_callback(ptl_event_t *ev, void *data) CDEBUG(D_NET, "got SENT event\n"); } else if (ev->type == PTL_EVENT_ACK) { CDEBUG(D_NET, "got ACK event\n"); + rpc->rq_bulkbuf = NULL; wake_up_interruptible(&rpc->rq_wait_for_bulk); } else { CERROR("Unexpected event type!\n"); @@ -158,7 +159,7 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data) if (ev->type == PTL_EVENT_PUT) { if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset) CERROR("bulkbuf != mem_desc -- why?\n"); - wake_up_interruptible(&rpc->rq_wait_for_bulk); + //wake_up_interruptible(&rpc->rq_wait_for_bulk); } else { CERROR("Unexpected event type!\n"); BUG(); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 69c5a96..d391f9c 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -1,4 +1,3 @@ - /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * @@ -38,27 +37,38 @@ extern ptl_handle_eq_t bulk_source_eq, sent_pkt_eq, rcvd_rep_eq, bulk_sink_eq; int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, - int portal, int is_request) + int portal) { int rc; ptl_process_id_t remote_id; ptl_handle_md_t md_h; + ptl_ack_req_t ack; - /* FIXME: This is bad. */ - if (request->rq_bulklen) { + switch (request->rq_type) { + case PTL_RPC_BULK: request->rq_req_md.start = request->rq_bulkbuf; request->rq_req_md.length = request->rq_bulklen; request->rq_req_md.eventq = bulk_source_eq; - } else if (is_request) { + request->rq_req_md.threshold = 2; /* SENT and ACK events */ + ack = PTL_ACK_REQ; + break; + case PTL_RPC_REQUEST: request->rq_req_md.start = request->rq_reqbuf; request->rq_req_md.length = request->rq_reqlen; request->rq_req_md.eventq = sent_pkt_eq; - } else { + request->rq_req_md.threshold = 1; + ack = PTL_NOACK_REQ; + break; + case PTL_RPC_REPLY: request->rq_req_md.start = request->rq_repbuf; request->rq_req_md.length = request->rq_replen; request->rq_req_md.eventq = sent_pkt_eq; + request->rq_req_md.threshold = 1; + ack = PTL_NOACK_REQ; + break; + default: + BUG(); } - request->rq_req_md.threshold = 1; request->rq_req_md.options = PTL_MD_OP_PUT; request->rq_req_md.user_ptr = request; @@ -73,13 +83,10 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, remote_id.nid = peer->peer_nid; remote_id.pid = 0; - if (request->rq_bulklen) { - rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, - request->rq_xid, 0, 0); - } else { - rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, - request->rq_xid, 0, 0); - } + CERROR("Sending %d bytes to portal %d, xid %d\n", + request->rq_req_md.length, portal, request->rq_xid); + + rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0); if (rc != PTL_OK) { BUG(); CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, @@ -100,7 +107,9 @@ int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc, /* This is a request that came from the network via portals. */ /* FIXME: we need to increment the count of handled events */ - ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal, 0); + req->rq_type = PTL_RPC_REPLY; + req->rq_reqhdr->xid = req->rq_reqhdr->xid; + ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal); } else { /* This is a local request that came from another thread. */ @@ -156,7 +165,6 @@ int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc, int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) { - ptl_handle_me_t me_h, bulk_me_h; ptl_process_id_t local_id; int rc; char *repbuf; @@ -183,7 +191,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) //CERROR("sending req %d\n", request->rq_xid); rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id, - request->rq_xid, 0, PTL_UNLINK, &me_h); + request->rq_xid, 0, PTL_UNLINK, + &request->rq_reply_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); BUG(); @@ -191,6 +200,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) goto cleanup; } + request->rq_type = PTL_RPC_REQUEST; request->rq_reply_md.start = repbuf; request->rq_reply_md.length = request->rq_replen; request->rq_reply_md.threshold = 1; @@ -198,8 +208,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) request->rq_reply_md.user_ptr = request; request->rq_reply_md.eventq = rcvd_rep_eq; - rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK, - &request->rq_reply_md_h); + rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md, + PTL_UNLINK, &request->rq_reply_md_h); if (rc != PTL_OK) { CERROR("PtlMDAttach failed: %d\n", rc); BUG(); @@ -210,7 +220,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) if (request->rq_bulklen != 0) { rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal, local_id, request->rq_xid, 0, PTL_UNLINK, - &bulk_me_h); + &request->rq_bulk_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); BUG(); @@ -225,8 +235,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) request->rq_bulk_md.user_ptr = request; request->rq_bulk_md.eventq = bulk_sink_eq; - rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK, - &request->rq_bulk_md_h); + rc = PtlMDAttach(request->rq_bulk_me_h, request->rq_bulk_md, + PTL_UNLINK, &request->rq_bulk_md_h); if (rc != PTL_OK) { CERROR("PtlMDAttach failed: %d\n", rc); BUG(); @@ -235,14 +245,14 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) } } - return ptl_send_buf(request, peer, request->rq_req_portal, 1); + return ptl_send_buf(request, peer, request->rq_req_portal); cleanup4: - PtlMEUnlink(bulk_me_h); + PtlMEUnlink(request->rq_bulk_me_h); cleanup3: PtlMDUnlink(request->rq_reply_md_h); cleanup2: - PtlMEUnlink(me_h); + PtlMEUnlink(request->rq_reply_me_h); cleanup: OBD_FREE(repbuf, request->rq_replen); -- 1.8.3.1