X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fniobuf.c;h=33a49005b8da8c412827c2101af48f7846ae4a14;hb=a07a72605bd79e48c40f06a0aa9c97e14d96e031;hp=f0195603d268961529f32d774803cbd1effbfd4d;hpb=9e6aa79072ec967289485b565a579c0d080fea9d;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index f019560..33a4900 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -1,4 +1,3 @@ - /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * @@ -34,29 +33,61 @@ #include extern ptl_handle_eq_t bulk_source_eq, sent_pkt_eq, rcvd_rep_eq, bulk_sink_eq; +static ptl_process_id_t local_id = {PTL_ADDR_GID, PTL_ID_ANY, PTL_ID_ANY}; + + +int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk) +{ + if (bulk->b_flags == PTL_BULK_SENT) { + EXIT; + return 1; + } + + if (sigismember(&(current->pending.signal), SIGKILL) || + sigismember(&(current->pending.signal), SIGINT)) { + bulk->b_flags = PTL_RPC_INTR; + EXIT; + return 1; + } + + CERROR("no event yet\n"); + return 0; +} int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, - int portal, int is_request) + int portal) { int rc; ptl_process_id_t remote_id; ptl_handle_md_t md_h; + ptl_ack_req_t ack; - /* FIXME: This is bad. */ - if (request->rq_bulklen) { + switch (request->rq_type) { + case PTL_RPC_BULK: request->rq_req_md.start = request->rq_bulkbuf; request->rq_req_md.length = request->rq_bulklen; request->rq_req_md.eventq = bulk_source_eq; - } else if (is_request) { + request->rq_req_md.threshold = 2; /* SENT and ACK events */ + ack = PTL_ACK_REQ; + break; + case PTL_RPC_REQUEST: request->rq_req_md.start = request->rq_reqbuf; request->rq_req_md.length = request->rq_reqlen; request->rq_req_md.eventq = sent_pkt_eq; - } else { + request->rq_req_md.threshold = 1; + ack = PTL_NOACK_REQ; + break; + case PTL_RPC_REPLY: request->rq_req_md.start = request->rq_repbuf; request->rq_req_md.length = request->rq_replen; request->rq_req_md.eventq = sent_pkt_eq; + request->rq_req_md.threshold = 1; + ack = PTL_NOACK_REQ; + break; + default: + BUG(); + return -1; /* notreached */ } - request->rq_req_md.threshold = 1; request->rq_req_md.options = PTL_MD_OP_PUT; request->rq_req_md.user_ptr = request; @@ -71,13 +102,10 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, remote_id.nid = peer->peer_nid; remote_id.pid = 0; - if (request->rq_bulklen) { - rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, - request->rq_xid, 0, 0); - } else { - rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, - request->rq_xid, 0, 0); - } + CERROR("Sending %d bytes to portal %d, xid %d\n", + request->rq_req_md.length, portal, request->rq_xid); + + rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, 0, 0); if (rc != PTL_OK) { BUG(); CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, @@ -88,9 +116,152 @@ int ptl_send_buf(struct ptlrpc_request *request, struct lustre_peer *peer, return rc; } +int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *bulk, int portal) +{ + int rc; + ptl_process_id_t remote_id; + ptl_handle_md_t md_h; + + bulk->b_md.start = bulk->b_buf; + bulk->b_md.length = bulk->b_buflen; + bulk->b_md.eventq = bulk_source_eq; + bulk->b_md.threshold = 2; /* SENT and ACK events */ + bulk->b_md.options = PTL_MD_OP_PUT; + bulk->b_md.user_ptr = bulk; + + rc = PtlMDBind(bulk->b_peer.peer_ni, bulk->b_md, &md_h); + if (rc != 0) { + BUG(); + CERROR("PtlMDBind failed: %d\n", rc); + return rc; + } + + remote_id.addr_kind = PTL_ADDR_NID; + remote_id.nid = bulk->b_peer.peer_nid; + remote_id.pid = 0; + + CERROR("Sending %d bytes to portal %d, xid %d\n", + bulk->b_md.length, portal, bulk->b_xid); + + rc = PtlPut(md_h, PTL_ACK_REQ, remote_id, portal, 0, bulk->b_xid, 0, 0); + if (rc != PTL_OK) { + BUG(); + CERROR("PtlPut(%d, %d, %d) failed: %d\n", remote_id.nid, + portal, bulk->b_xid, rc); + /* FIXME: tear down md */ + } + + return rc; +} + +int ptlrpc_wait_bulk(struct ptlrpc_bulk_desc *bulk) +{ + int rc; + + ENTRY; + + rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id, + bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h); + if (rc != PTL_OK) { + CERROR("PtlMEAttach failed: %d\n", rc); + BUG(); + EXIT; + goto cleanup1; + } + + bulk->b_md.start = bulk->b_buf; + bulk->b_md.length = bulk->b_buflen; + bulk->b_md.threshold = 1; + bulk->b_md.options = PTL_MD_OP_PUT; + bulk->b_md.user_ptr = bulk; + bulk->b_md.eventq = bulk_sink_eq; + + rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, &bulk->b_md_h); + if (rc != PTL_OK) { + CERROR("PtlMDAttach failed: %d\n", rc); + BUG(); + EXIT; + goto cleanup2; + } + + CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, portal %u\n", + bulk->b_buflen, bulk->b_xid, bulk->b_portal); + + cleanup2: + PtlMEUnlink(bulk->b_me_h); + cleanup1: + PtlMDUnlink(bulk->b_md_h); + + return rc; +} + +int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + struct ptlrpc_request *clnt_req = req->rq_reply_handle; + ENTRY; + + if (req->rq_reply_handle == NULL) { + /* This is a request that came from the network via portals. */ + + /* FIXME: we need to increment the count of handled events */ + req->rq_type = PTL_RPC_REPLY; + req->rq_reqhdr->xid = req->rq_reqhdr->xid; + ptl_send_buf(req, &req->rq_peer, svc->srv_rep_portal); + } else { + /* This is a local request that came from another thread. */ + + /* move the reply to the client */ + clnt_req->rq_replen = req->rq_replen; + clnt_req->rq_repbuf = req->rq_repbuf; + req->rq_repbuf = NULL; + req->rq_replen = 0; + + /* free the request buffer */ + OBD_FREE(req->rq_reqbuf, req->rq_reqlen); + req->rq_reqbuf = NULL; + + /* wake up the client */ + wake_up_interruptible(&clnt_req->rq_wait_for_rep); + } + + EXIT; + return 0; +} + +int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc, + struct ptlrpc_request *req) +{ + struct ptlrep_hdr *hdr; + + ENTRY; + + OBD_ALLOC(hdr, sizeof(*hdr)); + if (!hdr) { + EXIT; + return -ENOMEM; + } + + memset(hdr, 0, sizeof(*hdr)); + + hdr->xid = req->rq_reqhdr->xid; + hdr->status = req->rq_status; + hdr->type = OST_TYPE_ERR; + + if (req->rq_repbuf) { + CERROR("req has repbuf\n"); + BUG(); + } + + req->rq_repbuf = (char *)hdr; + req->rq_replen = sizeof(*hdr); + + EXIT; + return ptlrpc_reply(obddev, svc, req); +} + int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) { - ptl_handle_me_t me_h, bulk_me_h; ptl_process_id_t local_id; int rc; char *repbuf; @@ -116,8 +287,9 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) local_id.rid = PTL_ID_ANY; //CERROR("sending req %d\n", request->rq_xid); - rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id, - request->rq_xid, 0, PTL_UNLINK, &me_h); + rc = PtlMEPrepend(peer->peer_ni, request->rq_reply_portal, local_id, + request->rq_xid, 0, PTL_UNLINK, + &request->rq_reply_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); BUG(); @@ -125,6 +297,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) goto cleanup; } + request->rq_type = PTL_RPC_REQUEST; request->rq_reply_md.start = repbuf; request->rq_reply_md.length = request->rq_replen; request->rq_reply_md.threshold = 1; @@ -132,8 +305,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) request->rq_reply_md.user_ptr = request; request->rq_reply_md.eventq = rcvd_rep_eq; - rc = PtlMDAttach(me_h, request->rq_reply_md, PTL_UNLINK, - &request->rq_reply_md_h); + rc = PtlMDAttach(request->rq_reply_me_h, request->rq_reply_md, + PTL_UNLINK, &request->rq_reply_md_h); if (rc != PTL_OK) { CERROR("PtlMDAttach failed: %d\n", rc); BUG(); @@ -141,42 +314,13 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) goto cleanup2; } - if (request->rq_bulklen != 0) { - rc = PtlMEAttach(peer->peer_ni, request->rq_bulk_portal, - local_id, request->rq_xid, 0, PTL_UNLINK, - &bulk_me_h); - if (rc != PTL_OK) { - CERROR("PtlMEAttach failed: %d\n", rc); - BUG(); - EXIT; - goto cleanup3; - } - - request->rq_bulk_md.start = request->rq_bulkbuf; - request->rq_bulk_md.length = request->rq_bulklen; - request->rq_bulk_md.threshold = 1; - request->rq_bulk_md.options = PTL_MD_OP_PUT; - request->rq_bulk_md.user_ptr = request; - request->rq_bulk_md.eventq = bulk_sink_eq; - - rc = PtlMDAttach(bulk_me_h, request->rq_bulk_md, PTL_UNLINK, - &request->rq_bulk_md_h); - if (rc != PTL_OK) { - CERROR("PtlMDAttach failed: %d\n", rc); - BUG(); - EXIT; - goto cleanup4; - } - } + CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %u, portal %u\n", + request->rq_replen, request->rq_xid, request->rq_reply_portal); - return ptl_send_buf(request, peer, request->rq_req_portal, 1); + return ptl_send_buf(request, peer, request->rq_req_portal); - cleanup4: - PtlMEUnlink(bulk_me_h); - cleanup3: - PtlMDUnlink(request->rq_reply_md_h); cleanup2: - PtlMEUnlink(me_h); + PtlMEUnlink(request->rq_reply_me_h); cleanup: OBD_FREE(repbuf, request->rq_replen);