X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fniobuf.c;h=b6e6ce52750e95a9a812ea5260c4b3fc2a662748;hb=9144b8ecbb0c12afe8ec324765bba4bee0299202;hp=072592f92b66cd4251f6bed7c7c3ba7c3d7e972a;hpb=9140bd3dae4ac7e447da4e7ab56e9d49c9fd29c3;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 072592f..b6e6ce5 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -25,42 +25,10 @@ #include #include #include +#include extern ptl_handle_eq_t request_out_eq, reply_in_eq, reply_out_eq, bulk_source_eq, bulk_sink_eq; -static ptl_process_id_t local_id = {PTL_NID_ANY, PTL_PID_ANY}; - -int ptlrpc_check_bulk_sent(struct ptlrpc_bulk_desc *bulk) -{ - ENTRY; - - if (bulk->b_flags & PTL_BULK_FL_SENT) - RETURN(1); - - if (l_killable_pending(current)) { - bulk->b_flags |= PTL_RPC_FL_INTR; - RETURN(1); - } - - CDEBUG(D_NET, "no event yet\n"); - RETURN(0); -} - -int ptlrpc_check_bulk_received(struct ptlrpc_bulk_desc *bulk) -{ - ENTRY; - - if (bulk->b_flags & PTL_BULK_FL_RCVD) - RETURN(1); - - if (l_killable_pending(current)) { - bulk->b_flags |= PTL_RPC_FL_INTR; - RETURN(1); - } - - CDEBUG(D_NET, "no event yet\n"); - RETURN(0); -} static int ptl_send_buf(struct ptlrpc_request *request, struct ptlrpc_connection *conn, int portal) @@ -68,34 +36,31 @@ static int ptl_send_buf(struct ptlrpc_request *request, int rc; ptl_process_id_t remote_id; ptl_handle_md_t md_h; - ptl_ack_req_t ack; request->rq_req_md.user_ptr = request; switch (request->rq_type) { - case PTL_RPC_TYPE_REQUEST: + case PTL_RPC_MSG_REQUEST: + request->rq_reqmsg->type = HTON__u32(request->rq_type); request->rq_req_md.start = request->rq_reqmsg; request->rq_req_md.length = request->rq_reqlen; request->rq_req_md.eventq = request_out_eq; - request->rq_req_md.threshold = 1; - ack = PTL_NOACK_REQ; break; - case PTL_RPC_TYPE_REPLY: + case PTL_RPC_MSG_REPLY: + request->rq_repmsg->type = HTON__u32(request->rq_type); request->rq_req_md.start = request->rq_repmsg; request->rq_req_md.length = request->rq_replen; request->rq_req_md.eventq = reply_out_eq; - request->rq_req_md.threshold = 1; - ack = PTL_NOACK_REQ; break; default: LBUG(); return -1; /* notreached */ } + request->rq_req_md.threshold = 1; request->rq_req_md.options = PTL_MD_OP_PUT; request->rq_req_md.user_ptr = request; rc = PtlMDBind(conn->c_peer.peer_ni, request->rq_req_md, &md_h); - //CERROR("MDBind (outgoing req/rep/bulk): %Lu\n", (__u64)md_h); if (rc != 0) { CERROR("PtlMDBind failed: %d\n", rc); LBUG(); @@ -105,13 +70,15 @@ static int ptl_send_buf(struct ptlrpc_request *request, remote_id.nid = conn->c_peer.peer_nid; remote_id.pid = 0; - CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %Ld\n", + CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n", request->rq_req_md.length, portal, request->rq_xid); - rc = PtlPut(md_h, ack, remote_id, portal, 0, request->rq_xid, + if (!portal) + LBUG(); + rc = PtlPut(md_h, PTL_NOACK_REQ, remote_id, portal, 0, request->rq_xid, 0, 0); if (rc != PTL_OK) { - CERROR("PtlPut(%Lu, %d, %Ld) failed: %d\n", remote_id.nid, + CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n", remote_id.nid, portal, request->rq_xid, rc); PtlMDUnlink(md_h); } @@ -119,47 +86,98 @@ static int ptl_send_buf(struct ptlrpc_request *request, return rc; } +static inline struct iovec * +ptlrpc_get_bulk_iov (struct ptlrpc_bulk_desc *desc) +{ + struct iovec *iov; + + if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec)) + return (desc->bd_iov); + + OBD_ALLOC (iov, desc->bd_page_count * sizeof (struct iovec)); + if (iov == NULL) + LBUG(); + + return (iov); +} + +static inline void +ptlrpc_put_bulk_iov (struct ptlrpc_bulk_desc *desc, struct iovec *iov) +{ + if (desc->bd_page_count <= sizeof (desc->bd_iov)/sizeof (struct iovec)) + return; + + OBD_FREE (iov, desc->bd_page_count * sizeof (struct iovec)); +} + int ptlrpc_send_bulk(struct ptlrpc_bulk_desc *desc) { int rc; struct list_head *tmp, *next; ptl_process_id_t remote_id; + __u32 xid = 0; + struct iovec *iov; ENTRY; - list_for_each_safe(tmp, next, &desc->b_page_list) { + iov = ptlrpc_get_bulk_iov (desc); + if (iov == NULL) + RETURN (-ENOMEM); + + desc->bd_md.start = iov; + desc->bd_md.niov = 0; + desc->bd_md.length = 0; + desc->bd_md.eventq = bulk_source_eq; + desc->bd_md.threshold = 2; /* SENT and ACK */ + desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV; + desc->bd_md.user_ptr = desc; + + atomic_set (&desc->bd_source_callback_count, 2); + + list_for_each_safe(tmp, next, &desc->bd_page_list) { struct ptlrpc_bulk_page *bulk; - bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); - - bulk->b_md.start = bulk->b_buf; - bulk->b_md.length = bulk->b_buflen; - bulk->b_md.eventq = bulk_source_eq; - bulk->b_md.threshold = 2; /* SENT and ACK */ - bulk->b_md.options = PTL_MD_OP_PUT; - bulk->b_md.user_ptr = bulk; - - rc = PtlMDBind(desc->b_connection->c_peer.peer_ni, bulk->b_md, - &bulk->b_md_h); - if (rc != 0) { - CERROR("PtlMDBind failed: %d\n", rc); - LBUG(); - RETURN(rc); - } - - remote_id.nid = desc->b_connection->c_peer.peer_nid; - remote_id.pid = 0; - - CDEBUG(D_NET, "Sending %d bytes to portal %d, xid %d\n", - bulk->b_md.length, desc->b_portal, bulk->b_xid); - - rc = PtlPut(bulk->b_md_h, PTL_ACK_REQ, remote_id, - desc->b_portal, 0, bulk->b_xid, 0, 0); - if (rc != PTL_OK) { - CERROR("PtlPut(%Lu, %d, %d) failed: %d\n", - remote_id.nid, desc->b_portal, bulk->b_xid, rc); - PtlMDUnlink(bulk->b_md_h); - LBUG(); - RETURN(rc); - } + bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link); + + LASSERT (desc->bd_md.niov < desc->bd_page_count); + + if (desc->bd_md.niov == 0) + xid = bulk->bp_xid; + LASSERT (xid == bulk->bp_xid); /* should all be the same */ + + iov[desc->bd_md.niov].iov_base = bulk->bp_buf; + iov[desc->bd_md.niov].iov_len = bulk->bp_buflen; + desc->bd_md.niov++; + desc->bd_md.length += bulk->bp_buflen; + } + + LASSERT (desc->bd_md.niov == desc->bd_page_count); + LASSERT (desc->bd_md.niov != 0); + + rc = PtlMDBind(desc->bd_connection->c_peer.peer_ni, desc->bd_md, + &desc->bd_md_h); + + ptlrpc_put_bulk_iov (desc, iov); /* move down to reduce latency to send */ + + if (rc != PTL_OK) { + CERROR("PtlMDBind failed: %d\n", rc); + LBUG(); + RETURN(rc); + } + + remote_id.nid = desc->bd_connection->c_peer.peer_nid; + remote_id.pid = 0; + + CDEBUG(D_NET, "Sending %u pages %u bytes to portal %d nid "LPX64" pid %d xid %d\n", + desc->bd_md.niov, desc->bd_md.length, + desc->bd_portal, remote_id.nid, remote_id.pid, xid); + + rc = PtlPut(desc->bd_md_h, PTL_ACK_REQ, remote_id, + desc->bd_portal, 0, xid, 0, 0); + if (rc != PTL_OK) { + CERROR("PtlPut("LPU64", %d, %d) failed: %d\n", + remote_id.nid, desc->bd_portal, xid, rc); + PtlMDUnlink(desc->bd_md_h); + LBUG(); + RETURN(rc); } RETURN(0); @@ -169,41 +187,69 @@ int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc) { struct list_head *tmp, *next; int rc; + __u32 xid = 0; + struct iovec *iov; + ptl_process_id_t source_id; ENTRY; - list_for_each_safe(tmp, next, &desc->b_page_list) { + iov = ptlrpc_get_bulk_iov (desc); + if (iov == NULL) + return (-ENOMEM); + + desc->bd_md.start = iov; + desc->bd_md.niov = 0; + desc->bd_md.length = 0; + desc->bd_md.threshold = 1; + desc->bd_md.options = PTL_MD_OP_PUT | PTL_MD_IOV; + desc->bd_md.user_ptr = desc; + desc->bd_md.eventq = bulk_sink_eq; + + list_for_each_safe(tmp, next, &desc->bd_page_list) { struct ptlrpc_bulk_page *bulk; - bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); - - rc = PtlMEAttach(desc->b_connection->c_peer.peer_ni, - desc->b_portal, local_id, bulk->b_xid, 0, - PTL_UNLINK, PTL_INS_AFTER, &bulk->b_me_h); - if (rc != PTL_OK) { - CERROR("PtlMEAttach failed: %d\n", rc); - LBUG(); - GOTO(cleanup, rc); - } - - bulk->b_md.start = bulk->b_buf; - bulk->b_md.length = bulk->b_buflen; - bulk->b_md.threshold = 1; - bulk->b_md.options = PTL_MD_OP_PUT; - bulk->b_md.user_ptr = bulk; - bulk->b_md.eventq = bulk_sink_eq; - - rc = PtlMDAttach(bulk->b_me_h, bulk->b_md, PTL_UNLINK, - &bulk->b_md_h); - if (rc != PTL_OK) { - CERROR("PtlMDAttach failed: %d\n", rc); - LBUG(); - GOTO(cleanup, rc); - } - - CDEBUG(D_NET, "Setup bulk sink buffer: %u bytes, xid %u, " - "portal %u\n", bulk->b_buflen, bulk->b_xid, - desc->b_portal); + bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link); + + LASSERT (desc->bd_md.niov < desc->bd_page_count); + + if (desc->bd_md.niov == 0) + xid = bulk->bp_xid; + LASSERT (xid == bulk->bp_xid); /* should all be the same */ + + iov[desc->bd_md.niov].iov_base = bulk->bp_buf; + iov[desc->bd_md.niov].iov_len = bulk->bp_buflen; + desc->bd_md.niov++; + desc->bd_md.length += bulk->bp_buflen; } + LASSERT (desc->bd_md.niov == desc->bd_page_count); + LASSERT (desc->bd_md.niov != 0); + + source_id.nid = desc->bd_connection->c_peer.peer_nid; + source_id.pid = PTL_PID_ANY; + + rc = PtlMEAttach(desc->bd_connection->c_peer.peer_ni, + desc->bd_portal, source_id, xid, 0, + PTL_UNLINK, PTL_INS_AFTER, &desc->bd_me_h); + + ptlrpc_put_bulk_iov (desc, iov); + + if (rc != PTL_OK) { + CERROR("PtlMEAttach failed: %d\n", rc); + LBUG(); + GOTO(cleanup, rc); + } + + rc = PtlMDAttach(desc->bd_me_h, desc->bd_md, PTL_UNLINK, + &desc->bd_md_h); + if (rc != PTL_OK) { + CERROR("PtlMDAttach failed: %d\n", rc); + LBUG(); + GOTO(cleanup, rc); + } + + CDEBUG(D_NET, "Setup bulk sink buffers: %u pages %u bytes, xid %u, " + "portal %u\n", desc->bd_md.niov, desc->bd_md.length, + xid, desc->bd_portal); + RETURN(0); cleanup: @@ -214,17 +260,10 @@ int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *desc) int ptlrpc_abort_bulk(struct ptlrpc_bulk_desc *desc) { - struct list_head *tmp, *next; - - list_for_each_safe(tmp, next, &desc->b_page_list) { - struct ptlrpc_bulk_page *bulk; - bulk = list_entry(tmp, struct ptlrpc_bulk_page, b_link); - - /* This should be safe: these handles are initialized to be - * invalid in ptlrpc_prep_bulk_page() */ - PtlMDUnlink(bulk->b_md_h); - PtlMEUnlink(bulk->b_me_h); - } + /* This should be safe: these handles are initialized to be + * invalid in ptlrpc_prep_bulk() */ + PtlMDUnlink(desc->bd_md_h); + PtlMEUnlink(desc->bd_me_h); return 0; } @@ -238,11 +277,11 @@ int ptlrpc_reply(struct ptlrpc_service *svc, struct ptlrpc_request *req) } /* FIXME: we need to increment the count of handled events */ - req->rq_type = PTL_RPC_TYPE_REPLY; + if (req->rq_type != PTL_RPC_MSG_ERR) + req->rq_type = PTL_RPC_MSG_REPLY; //req->rq_repmsg->conn = req->rq_connection->c_remote_conn; //req->rq_repmsg->token = req->rq_connection->c_remote_token; req->rq_repmsg->status = HTON__u32(req->rq_status); - req->rq_reqmsg->type = HTON__u32(req->rq_type); return ptl_send_buf(req, req->rq_connection, svc->srv_rep_portal); } @@ -266,15 +305,15 @@ int ptlrpc_error(struct ptlrpc_service *svc, struct ptlrpc_request *req) RETURN(rc); } - int ptl_send_rpc(struct ptlrpc_request *request) { int rc; char *repbuf; + ptl_process_id_t source_id; ENTRY; - if (NTOH__u32(request->rq_reqmsg->type) != PTL_RPC_MSG_REQUEST) { + if (request->rq_type != PTL_RPC_MSG_REQUEST) { CERROR("wrong packet type sent %d\n", NTOH__u32(request->rq_reqmsg->type)); LBUG(); @@ -296,11 +335,14 @@ int ptl_send_rpc(struct ptlrpc_request *request) RETURN(ENOMEM); } - down(&request->rq_client->cli_rpc_sem); + // down(&request->rq_client->cli_rpc_sem); + + source_id.nid = request->rq_connection->c_peer.peer_nid; + source_id.pid = PTL_PID_ANY; rc = PtlMEAttach(request->rq_connection->c_peer.peer_ni, - request->rq_client->cli_reply_portal, - local_id, request->rq_xid, 0, PTL_UNLINK, + request->rq_import->imp_client->cli_reply_portal, + source_id, request->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER, &request->rq_reply_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); @@ -308,7 +350,6 @@ int ptl_send_rpc(struct ptlrpc_request *request) GOTO(cleanup, rc); } - request->rq_type = PTL_RPC_TYPE_REQUEST; request->rq_reply_md.start = repbuf; request->rq_reply_md.length = request->rq_replen; request->rq_reply_md.threshold = 1; @@ -324,87 +365,52 @@ int ptl_send_rpc(struct ptlrpc_request *request) GOTO(cleanup2, rc); } - CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid %Lu, portal %u\n", + CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64", portal %u\n", request->rq_replen, request->rq_xid, - request->rq_client->cli_reply_portal); + request->rq_import->imp_client->cli_reply_portal); rc = ptl_send_buf(request, request->rq_connection, - request->rq_client->cli_request_portal); + request->rq_import->imp_client->cli_request_portal); RETURN(rc); cleanup2: PtlMEUnlink(request->rq_reply_me_h); cleanup: OBD_FREE(repbuf, request->rq_replen); - up(&request->rq_client->cli_rpc_sem); + // up(&request->rq_client->cli_rpc_sem); return rc; } -void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i) +void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd) { + struct ptlrpc_service *service = rqbd->rqbd_service; + static ptl_process_id_t match_id = {PTL_NID_ANY, PTL_PID_ANY}; int rc; ptl_md_t dummy; ptl_handle_md_t md_h; /* Attach the leading ME on which we build the ring */ rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal, - local_id, 0, ~0, PTL_RETAIN, PTL_INS_BEFORE, - &(service->srv_me_h[i])); + match_id, 0, ~0, + PTL_UNLINK, PTL_INS_AFTER, &rqbd->rqbd_me_h); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %d\n", rc); LBUG(); } - - if (service->srv_ref_count[i]) - LBUG(); - dummy.start = service->srv_buf[i]; + dummy.start = rqbd->rqbd_buffer; dummy.length = service->srv_buf_size; dummy.max_offset = service->srv_buf_size; - dummy.threshold = PTL_MD_THRESH_INF; - dummy.options = PTL_MD_OP_PUT | PTL_MD_AUTO_UNLINK; - dummy.user_ptr = service; + dummy.threshold = 1; + dummy.options = PTL_MD_OP_PUT; + dummy.user_ptr = rqbd; dummy.eventq = service->srv_eq_h; - dummy.max_offset = service->srv_buf_size; - - rc = PtlMDAttach(service->srv_me_h[i], dummy, PTL_UNLINK, &md_h); + + rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h); if (rc != PTL_OK) { /* cleanup */ CERROR("PtlMDAttach failed: %d\n", rc); LBUG(); } -} - -/* ptl_handled_rpc() should be called by the sleeping process once - * it finishes processing an event. This ensures the ref count is - * decremented and that the rpc ring buffer cycles properly. - */ -int ptl_handled_rpc(struct ptlrpc_service *service, void *start) -{ - int index; - - spin_lock(&service->srv_lock); - for (index = 0; index < service->srv_ring_length; index++) - if (service->srv_buf[index] == start) - break; - - if (index == service->srv_ring_length) - LBUG(); - - CDEBUG(D_INFO, "MD index=%d Ref Count=%d\n", index, - service->srv_ref_count[index]); - service->srv_ref_count[index]--; - - if (service->srv_ref_count[index] < 0) - LBUG(); - - if (service->srv_ref_count[index] == 0 && - !ptl_is_valid_handle(&(service->srv_me_h[index]))) { - CDEBUG(D_NET, "relinking %d\n", index); - ptlrpc_link_svc_me(service, index); - } - - spin_unlock(&service->srv_lock); - return 0; }