From 394f9e61905fbb3c18135a255e62de51c9108116 Mon Sep 17 00:00:00 2001 From: pschwan Date: Sun, 3 Mar 2002 02:24:49 +0000 Subject: [PATCH] OSC/OST bulk writing. TOTALLY UNTESTED. I am about to merge this branch back to the tip, also, which is going to break the tip. You have been warned. --- lustre/ptlrpc/client.c | 40 ++++++++------ lustre/ptlrpc/events.c | 34 +++++++----- lustre/ptlrpc/niobuf.c | 138 ++++++++++++++++++++++++++++++++++++------------ lustre/ptlrpc/service.c | 26 ++++----- 4 files changed, 160 insertions(+), 78 deletions(-) diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index e67378e..8fc5669 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -72,6 +72,7 @@ int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, int err; memset(cl, 0, sizeof(*cl)); + spin_lock_init(&cl->cli_lock); cl->cli_xid = 1; cl->cli_obd = NULL; cl->cli_request_portal = req_portal; @@ -106,6 +107,20 @@ int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, return err; } +struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct lustre_peer *peer) +{ + struct ptlrpc_bulk_desc *bulk; + + OBD_ALLOC(bulk, sizeof(*bulk)); + if (bulk != NULL) { + memset(bulk, 0, sizeof(*bulk)); + memcpy(&bulk->b_peer, peer, sizeof(*peer)); + init_waitqueue_head(&bulk->b_waitq); + } + + return bulk; +} + struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, int opcode, int namelen, char *name, int tgtlen, char *tgt) @@ -121,7 +136,10 @@ struct ptlrpc_request *ptlrpc_prep_req(struct ptlrpc_client *cl, } memset(request, 0, sizeof(*request)); + + spin_lock(&cl->cli_lock); request->rq_xid = cl->cli_xid++; + spin_unlock(&cl->cli_lock); rc = cl->cli_req_pack(name, namelen, tgt, tgtlen, &request->rq_reqhdr, &request->rq_req, @@ -157,7 +175,6 @@ static int ptlrpc_check_reply(struct ptlrpc_request *req) return 1; } - CERROR("no event yet\n"); return 0; } @@ -172,13 +189,6 @@ int ptlrpc_abort(struct ptlrpc_request *request) request->rq_repbuf = NULL; request->rq_replen = 0; - if (request->rq_bulklen != 0) { - PtlMEUnlink(request->rq_bulk_me_h); - PtlMDUnlink(request->rq_bulk_md_h); - /* FIXME: wake whoever's sleeping on this bulk sending to let - * -them- clean it up. */ - } - return 0; } @@ -201,14 +211,12 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) rc = ptl_send_rpc(req, &cl->cli_server); } if (rc) { - CERROR("error %d, opcode %d\n", rc, - req->rq_reqhdr->opc); + CERROR("error %d, opcode %d\n", rc, req->rq_reqhdr->opc); return -rc; } CDEBUG(0, "-- sleeping\n"); - wait_event_interruptible(req->rq_wait_for_rep, - ptlrpc_check_reply(req)); + wait_event_interruptible(req->rq_wait_for_rep, ptlrpc_check_reply(req)); CDEBUG(0, "-- done\n"); if (req->rq_flags == PTL_RPC_INTR) { @@ -224,7 +232,8 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) return -EINTR; } - rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, &req->rq_rephdr, &req->rq_rep); + rc = cl->cli_rep_unpack(req->rq_repbuf, req->rq_replen, + &req->rq_rephdr, &req->rq_rep); if (rc) { CERROR("unpack_rep failed: %d\n", rc); return rc; @@ -232,9 +241,8 @@ int ptlrpc_queue_wait(struct ptlrpc_client *cl, struct ptlrpc_request *req) CERROR("got rep %d\n", req->rq_rephdr->xid); if ( req->rq_rephdr->status == 0 ) - CDEBUG(0, "--> buf %p len %d status %d\n", - req->rq_repbuf, req->rq_replen, - req->rq_rephdr->status); + CDEBUG(0, "--> buf %p len %d status %d\n", req->rq_repbuf, + req->rq_replen, req->rq_rephdr->status); EXIT; return 0; diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index c39d093..f3cfd1c 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -33,6 +33,7 @@ #include ptl_handle_eq_t sent_pkt_eq, rcvd_rep_eq, bulk_source_eq, bulk_sink_eq; +static const ptl_handle_ni_t *socknal_nip = NULL, *qswnal_nip = NULL; /* * Free the packet when it has gone out @@ -131,7 +132,7 @@ int server_request_callback(ptl_event_t *ev, void *data) static int bulk_source_callback(ptl_event_t *ev, void *data) { - struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; + struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr; ENTRY; @@ -139,8 +140,8 @@ static int bulk_source_callback(ptl_event_t *ev, void *data) CDEBUG(D_NET, "got SENT event\n"); } else if (ev->type == PTL_EVENT_ACK) { CDEBUG(D_NET, "got ACK event\n"); - rpc->rq_bulkbuf = NULL; - wake_up_interruptible(&rpc->rq_wait_for_bulk); + bulk->b_flags = PTL_BULK_SENT; + wake_up_interruptible(&bulk->b_waitq); } else { CERROR("Unexpected event type!\n"); BUG(); @@ -152,14 +153,15 @@ static int bulk_source_callback(ptl_event_t *ev, void *data) static int bulk_sink_callback(ptl_event_t *ev, void *data) { - struct ptlrpc_request *rpc = ev->mem_desc.user_ptr; + struct ptlrpc_bulk_desc *bulk = ev->mem_desc.user_ptr; ENTRY; if (ev->type == PTL_EVENT_PUT) { - if (rpc->rq_bulkbuf != ev->mem_desc.start + ev->offset) + if (bulk->b_buf != ev->mem_desc.start + ev->offset) CERROR("bulkbuf != mem_desc -- why?\n"); - //wake_up_interruptible(&rpc->rq_wait_for_bulk); + bulk->b_flags = PTL_BULK_RCVD; + wake_up_interruptible(&bulk->b_waitq); } else { CERROR("Unexpected event type!\n"); BUG(); @@ -172,15 +174,20 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data) int ptlrpc_init_portals(void) { int rc; - const ptl_handle_ni_t *nip; ptl_handle_ni_t ni; - nip = inter_module_get_request(LUSTRE_NAL "_ni", LUSTRE_NAL); - if (nip == NULL) { - CERROR("get_ni failed: is the NAL module loaded?\n"); + socknal_nip = inter_module_get_request("ksocknal_ni", "ksocknal"); + qswnal_nip = inter_module_get_request("kqswnal_ni", "kqswnal"); + if (socknal_nip == NULL && qswnal_nip == NULL) { + CERROR("get_ni failed: is a NAL module loaded?\n"); return -EIO; } - ni = *nip; + + /* Use the qswnal if it's there */ + if (qswnal_nip != NULL) + ni = *qswnal_nip; + else + ni = *socknal_nip; rc = PtlEQAlloc(ni, 128, sent_packet_callback, NULL, &sent_pkt_eq); if (rc != PTL_OK) @@ -208,5 +215,8 @@ void ptlrpc_exit_portals(void) PtlEQFree(bulk_source_eq); PtlEQFree(bulk_sink_eq); - inter_module_put(LUSTRE_NAL "_ni"); + if (qswnal_nip != NULL) + inter_module_put("kqswnal_ni"); + if (socknal_nip != NULL) + inter_module_put("ksocknal_ni"); } diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index d391f9c..33a4900 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -33,8 +33,26 @@ #include extern ptl_handle_eq_t bulk_source_eq, sent_pkt_eq, rcvd_rep_eq, bulk_sink_eq; +static ptl_process_id_t local_id = {PTL_ADDR_GID, PTL_ID_ANY, PTL_ID_ANY}; +int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk) +{ + if (bulk->b_flags == PTL_BULK_SENT) { + EXIT; + return 1; + } + + if (sigismember(&(current->pending.signal), SIGKILL) || + sigismember(&(current->pending.signal), SIGINT)) { + bulk->b_flags = PTL_RPC_INTR; + EXIT; + return 1; + } + + CERROR("no event yet\n"); + return 0; +} int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, int portal) @@ -68,6 +86,7 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, break; default: BUG(); + return -1; /* notreached */ } request->rq_req_md.options = PTL_MD_OP_PUT; request->rq_req_md.user_ptr = request; @@ -97,6 +116,85 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, return rc; } +int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal) +{ + int rc; + ptl_process_id_t remote_id; + ptl_handle_md_t md_h; + + bulk->b_md.start = bulk->b_buf; + bulk->b_md.length = bulk->b_buflen; + bulk->b_md.eventq = bulk_source_eq; + bulk->b_md.threshold = 2; /* SENT and ACK events */ + bulk->b_md.options = PTL_MD_OP_PUT; + bulk->b_md.user_ptr = bulk; + + rc = PtlMDBind(bulk->b_peer.peer_ni, bulk->b_md, &md_h); + if (rc != 0) { + BUG(); + CERROR("PtlMDBind failed: %d\n", rc); + return rc; + } + + remote_id.addr_kind = PTL_ADDR_NID; + remote_id.nid = bulk->b_peer.peer_nid; + remote_id.pid = 0; + + CERROR("Sending %d bytes to portal %d, xid %d\n", + bulk->b_md.length, portal, bulk->b_xid); + + rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, bulk->b_xid, 0, 0); + if (rc != PTL_OK) { + BUG(); + CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, + portal, bulk->b_xid, rc); + /* FIXME: tear down md */ + } + + return rc; +} + +int ptlrpc_wait_bulk(struct ptlrpc_bulk_desc *bulk) +{ + int rc; + + ENTRY; + + rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id, + bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h); + if (rc != PTL_OK) { + CERROR("PtlMEAttach failed: %d\n", rc); + BUG(); + EXIT; + goto cleanup1; + } + + bulk->b_md.start = bulk->b_buf; + bulk->b_md.length = bulk->b_buflen; + bulk->b_md.threshold = 1; + bulk->b_md.options = PTL_MD_OP_PUT; + bulk->b_md.user_ptr = bulk; + bulk->b_md.eventq = bulk_sink_eq; + + rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h); + if (rc != PTL_OK) { + CERROR("PtlMDAttach failed: %d\n", rc); + BUG(); + EXIT; + goto cleanup2; + } + + CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n", + bulk->b_buflen, bulk->b_xid, bulk->b_portal); + + cleanup2: + PtlMEUnlink(bulk->b_me_h); + cleanup1: + PtlMDUnlink(bulk->b_md_h); + + return rc; +} + int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc, struct ptlrpc_request *req) { @@ -162,7 +260,6 @@ int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc, return ptlrpc_reply(obddev, svc, req); } - int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) { ptl_process_id_t local_id; @@ -190,9 +287,9 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) local_id.rid = PTL_ID_ANY; //CERROR("sending req %d\n", request->rq_xid); - rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id, - request->rq_xid, 0, PTL_UNLINK, - &request->rq_reply_me_h); + rc = PtlMEPrepend(peer->peer_ni, request->rq_reply_portal, local_id, + request->rq_xid, 0, PTL_UNLINK, + &request->rq_reply_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); BUG(); @@ -217,40 +314,11 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) goto cleanup2; } - if (request->rq_bulklen != 0) { - rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal, - local_id, request->rq_xid, 0, PTL_UNLINK, - &request->rq_bulk_me_h); - if (rc != PTL_OK) { - CERROR("PtlMEAttach failed: %d\n", rc); - BUG(); - EXIT; - goto cleanup3; - } - - request->rq_bulk_md.start = request->rq_bulkbuf; - request->rq_bulk_md.length = request->rq_bulklen; - request->rq_bulk_md.threshold = 1; - request->rq_bulk_md.options = PTL_MD_OP_PUT; - request->rq_bulk_md.user_ptr = request; - request->rq_bulk_md.eventq = bulk_sink_eq; - - rc = PtlMDAttach(request->rq_bulk_me_h, request->rq_bulk_md, - PTL_UNLINK, &request->rq_bulk_md_h); - if (rc != PTL_OK) { - CERROR("PtlMDAttach failed: %d\n", rc); - BUG(); - EXIT; - goto cleanup4; - } - } + CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n", + request->rq_replen, request->rq_xid, request->rq_reply_portal); return ptl_send_buf(request, peer, request->rq_req_portal); - cleanup4: - PtlMEUnlink(request->rq_bulk_me_h); - cleanup3: - PtlMDUnlink(request->rq_reply_md_h); cleanup2: PtlMEUnlink(request->rq_reply_me_h); cleanup: diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 3767eda..f560a41 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -37,10 +37,8 @@ extern int server_request_callback(ptl_event_t *ev, void *data); static int ptlrpc_check_event(struct ptlrpc_service *svc) { - if (sigismember(&(current->pending.signal), - SIGKILL) || - sigismember(&(current->pending.signal), - SIGINT)) { + if (sigismember(&(current->pending.signal), SIGKILL) || + sigismember(&(current->pending.signal), SIGINT)) { svc->srv_flags |= SVC_KILLED; EXIT; return 1; @@ -80,14 +78,9 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc) return 0; } -struct ptlrpc_service *ptlrpc_init_svc(__u32 bufsize, - int req_portal, - int rep_portal, - char *uuid, - req_unpack_t unpack, - rep_pack_t pack, - svc_handler_t handler - ) +struct ptlrpc_service * +ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid, + req_unpack_t unpack, rep_pack_t pack, svc_handler_t handler) { int err; struct ptlrpc_service *svc; @@ -215,7 +208,8 @@ void ptlrpc_stop_thread(struct ptlrpc_service *svc) svc->srv_flags = SVC_STOPPING; wake_up(&svc->srv_waitq); - wait_event_interruptible(svc->srv_ctl_waitq, (svc->srv_flags & SVC_STOPPED)); + wait_event_interruptible(svc->srv_ctl_waitq, + (svc->srv_flags & SVC_STOPPED)); } int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc, @@ -336,8 +330,10 @@ int rpc_unregister_service(struct ptlrpc_service *service) rc = PtlMEUnlink(service->srv_me_h[i]); if (rc) CERROR("PtlMEUnlink failed: %d\n", rc); - - OBD_FREE(service->srv_buf[i], service->srv_buf_size); + + if (service->srv_buf[i] != NULL) + OBD_FREE(service->srv_buf[i], service->srv_buf_size); + service->srv_buf[i] = NULL; } rc = PtlEQFree(service->srv_eq_h); -- 1.8.3.1