X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Fniobuf.c;h=fb1bb1d40e107d51cd1e8e47860e7f662f231103;hb=d2d56f38da01001c92a09afc6b52b5acbd9bc13c;hp=060fb8ed810de34357b3e9db5ae7b41a50307a67;hpb=bf527ab7e56d4445f81223b23302b3cbf0dc5fb1;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 060fb8e..fb1bb1d 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -51,15 +51,15 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len, md.user_ptr = cbid; md.eq_handle = ptlrpc_eq_h; - if (ack == LNET_ACK_REQ && - OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) { + if (unlikely(ack == LNET_ACK_REQ && + OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE))) { /* don't ask for the ack to simulate failing client */ ack = LNET_NOACK_REQ; obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED; } rc = LNetMDBind (md, LNET_UNLINK, mdh); - if (rc != 0) { + if (unlikely(rc != 0)) { CERROR ("LNetMDBind failed: %d\n", rc); LASSERT (rc == -ENOMEM); RETURN (-ENOMEM); @@ -68,9 +68,9 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len, CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n", len, portal, xid); - rc = LNetPut (conn->c_self, *mdh, ack, + rc = LNetPut (conn->c_self, *mdh, ack, conn->c_peer, portal, xid, 0, 0); - if (rc != 0) { + if (unlikely(rc != 0)) { int rc2; /* We're going to get an UNLINK event when I unlink below, * which will complete just like any other failed send, so @@ -93,7 +93,7 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc) __u64 xid; ENTRY; - if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET)) + if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET)) RETURN(0); /* NB no locking required until desc is on the network */ @@ -101,7 +101,6 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc) LASSERT (desc->bd_type == BULK_PUT_SOURCE || desc->bd_type == BULK_GET_SINK); desc->bd_success = 0; - desc->bd_sender = LNET_NID_ANY; md.user_ptr = &desc->bd_cbid; md.eq_handle = ptlrpc_eq_h; @@ -126,17 +125,17 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc) xid = desc->bd_req->rq_xid; CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d " "id %s xid "LPX64"\n", desc->bd_iov_count, - desc->bd_nob, desc->bd_portal, + desc->bd_nob, desc->bd_portal, libcfs_id2str(conn->c_peer), xid); /* Network is about to get at the memory */ desc->bd_network_rw = 1; if (desc->bd_type == BULK_PUT_SOURCE) - rc = LNetPut (conn->c_self, desc->bd_md_h, LNET_ACK_REQ, + rc = LNetPut (conn->c_self, desc->bd_md_h, LNET_ACK_REQ, conn->c_peer, desc->bd_portal, xid, 0, 0); else - rc = LNetGet (conn->c_self, desc->bd_md_h, + rc = LNetGet (conn->c_self, desc->bd_md_h, conn->c_peer, desc->bd_portal, xid, 0); if (rc != 0) { @@ -163,7 +162,7 @@ void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc) if (!ptlrpc_bulk_active(desc)) /* completed or */ return; /* never started */ - + /* Do not send any meaningful data over the wire for evicted clients */ if (desc->bd_export && desc->bd_export->exp_failed) ptl_rpc_wipe_bulk_pages(desc); @@ -179,7 +178,7 @@ void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc) /* Network access will complete in finite time but the HUGE * timeout lets us CWARN for visibility of sluggish NALs */ lwi = LWI_TIMEOUT (cfs_time_seconds(300), NULL, NULL); - rc = l_wait_event(desc->bd_waitq, + rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi); if (rc == 0) return; @@ -199,7 +198,7 @@ int ptlrpc_register_bulk (struct ptlrpc_request *req) lnet_md_t md; ENTRY; - if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_GET_NET)) + if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_GET_NET)) RETURN(0); /* NB no locking required until desc is on the network */ @@ -211,15 +210,14 @@ int ptlrpc_register_bulk (struct ptlrpc_request *req) desc->bd_type == BULK_GET_SOURCE); desc->bd_success = 0; - desc->bd_sender = LNET_NID_ANY; peer = desc->bd_import->imp_connection->c_peer; md.user_ptr = &desc->bd_cbid; md.eq_handle = ptlrpc_eq_h; md.threshold = 1; /* PUT or GET */ - md.options = PTLRPC_MD_OPTIONS | - ((desc->bd_type == BULK_GET_SOURCE) ? + md.options = PTLRPC_MD_OPTIONS | + ((desc->bd_type == BULK_GET_SOURCE) ? LNET_MD_OP_GET : LNET_MD_OP_PUT); ptlrpc_fill_bulk_md(&md, desc); @@ -285,7 +283,7 @@ void ptlrpc_unregister_bulk (struct ptlrpc_request *req) * a chance to run client_bulk_callback() */ LNetMDUnlink (desc->bd_md_h); - + if (req->rq_set != NULL) wq = &req->rq_set->set_waitq; else @@ -300,7 +298,7 @@ void ptlrpc_unregister_bulk (struct ptlrpc_request *req) return; LASSERT (rc == -ETIMEDOUT); - DEBUG_REQ(D_WARNING,req,"Unexpectedly long timeout: desc %p\n", + DEBUG_REQ(D_WARNING,req,"Unexpectedly long timeout: desc %p", desc); } } @@ -313,19 +311,22 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult) int rc; /* We must already have a reply buffer (only ptlrpc_error() may be - * called without one). We must also have a request buffer which - * is either the actual (swabbed) incoming request, or a saved copy - * if this is a req saved in target_queue_final_reply(). */ - LASSERT (req->rq_reqmsg != NULL); - LASSERT (req->rq_repmsg != NULL); + * called without one). The reply generated by security layer (e.g. + * error notify, etc.) might have NULL rq->reqmsg; Otherwise we must + * have a request buffer which is either the actual (swabbed) incoming + * request, or a saved copy if this is a req saved in + * target_queue_final_reply(). + */ + LASSERT (req->rq_reqbuf != NULL); LASSERT (rs != NULL); - LASSERT (req->rq_repmsg == rs->rs_msg); LASSERT (may_be_difficult || !rs->rs_difficult); + LASSERT (req->rq_repmsg != NULL); + LASSERT (req->rq_repmsg == rs->rs_msg); LASSERT (rs->rs_cb_id.cbid_fn == reply_out_callback); LASSERT (rs->rs_cb_id.cbid_arg == rs); - if (req->rq_export && req->rq_export->exp_obd && - req->rq_export->exp_obd->obd_fail) { + if (unlikely(req->rq_export && req->rq_export->exp_obd && + req->rq_export->exp_obd->obd_fail)) { /* Failed obd's only send ENODEV */ req->rq_type = PTL_RPC_MSG_ERR; req->rq_status = -ENODEV; @@ -338,25 +339,31 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult) lustre_msg_set_type(req->rq_repmsg, req->rq_type); lustre_msg_set_status(req->rq_repmsg, req->rq_status); - lustre_msg_set_opc(req->rq_repmsg, lustre_msg_get_opc(req->rq_reqmsg)); + lustre_msg_set_opc(req->rq_repmsg, + req->rq_reqmsg ? lustre_msg_get_opc(req->rq_reqmsg) : 0); if (req->rq_export == NULL || req->rq_export->exp_connection == NULL) conn = ptlrpc_get_connection(req->rq_peer, req->rq_self, NULL); else conn = ptlrpc_connection_addref(req->rq_export->exp_connection); - if (conn == NULL) { + if (unlikely(conn == NULL)) { CERROR("not replying on NULL connection\n"); /* bug 9635 */ return -ENOTCONN; } atomic_inc (&svc->srv_outstanding_replies); ptlrpc_rs_addref(rs); /* +1 ref for the network */ - rc = ptl_send_buf (&rs->rs_md_h, req->rq_repmsg, req->rq_replen, + rc = sptlrpc_svc_wrap_reply(req); + if (unlikely(rc)) + goto out; + + rc = ptl_send_buf (&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len, rs->rs_difficult ? LNET_ACK_REQ : LNET_NOACK_REQ, &rs->rs_cb_id, conn, svc->srv_rep_portal, req->rq_xid); - if (rc != 0) { +out: + if (unlikely(rc != 0)) { atomic_dec (&svc->srv_outstanding_replies); ptlrpc_rs_decref(rs); } @@ -395,7 +402,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) lnet_md_t reply_md; ENTRY; - OBD_FAIL_RETURN(OBD_FAIL_PTLRPC_DROP_RPC, 0); + OBD_FAIL_RETURN(OBD_FAIL_PTLRPC_DROP_RPC, 0); LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST); @@ -411,14 +418,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) request->rq_err = 1; RETURN(-ENODEV); } - - connection = request->rq_import->imp_connection; - if (request->rq_bulk != NULL) { - rc = ptlrpc_register_bulk (request); - if (rc != 0) - RETURN(rc); - } + connection = request->rq_import->imp_connection; lustre_msg_set_handle(request->rq_reqmsg, &request->rq_import->imp_remote_handle); @@ -426,12 +427,25 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) lustre_msg_set_conn_cnt(request->rq_reqmsg, request->rq_import->imp_conn_cnt); + rc = sptlrpc_cli_wrap_request(request); + if (rc) + RETURN(rc); + + /* bulk register should be done after wrap_request() */ + if (request->rq_bulk != NULL) { + rc = ptlrpc_register_bulk (request); + if (rc != 0) + RETURN(rc); + } + if (!noreply) { LASSERT (request->rq_replen != 0); - if (request->rq_repmsg == NULL) - OBD_ALLOC(request->rq_repmsg, request->rq_replen); - if (request->rq_repmsg == NULL) - GOTO(cleanup_bulk, rc = -ENOMEM); + if (request->rq_repbuf == NULL) { + rc = sptlrpc_cli_alloc_repbuf(request, + request->rq_replen); + if (rc) + GOTO(cleanup_bulk, rc); + } rc = LNetMEAttach(request->rq_reply_portal,/*XXX FIXME bug 249*/ connection->c_peer, request->rq_xid, 0, @@ -439,7 +453,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) if (rc != 0) { CERROR("LNetMEAttach failed: %d\n", rc); LASSERT (rc == -ENOMEM); - GOTO(cleanup_repmsg, rc = -ENOMEM); + GOTO(cleanup_bulk, rc = -ENOMEM); } } @@ -456,14 +470,14 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) spin_unlock(&request->rq_lock); if (!noreply) { - reply_md.start = request->rq_repmsg; - reply_md.length = request->rq_replen; + reply_md.start = request->rq_repbuf; + reply_md.length = request->rq_repbuf_len; reply_md.threshold = 1; reply_md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT; reply_md.user_ptr = &request->rq_reply_cbid; reply_md.eq_handle = ptlrpc_eq_h; - rc = LNetMDAttach(reply_me_h, reply_md, LNET_UNLINK, + rc = LNetMDAttach(reply_me_h, reply_md, LNET_UNLINK, &request->rq_reply_md_h); if (rc != 0) { CERROR("LNetMDAttach failed: %d\n", rc); @@ -477,7 +491,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64 ", portal %u\n", - request->rq_replen, request->rq_xid, + request->rq_repbuf_len, request->rq_xid, request->rq_reply_portal); } @@ -490,7 +504,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) request->rq_sent = CURRENT_SECONDS; ptlrpc_pinger_sending_on_import(request->rq_import); rc = ptl_send_buf(&request->rq_req_md_h, - request->rq_reqmsg, request->rq_reqlen, + request->rq_reqbuf, request->rq_reqdata_len, LNET_NOACK_REQ, &request->rq_req_cbid, connection, request->rq_request_portal, @@ -517,10 +531,6 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) /* UNLINKED callback called synchronously */ LASSERT (!request->rq_receiving_reply); - cleanup_repmsg: - OBD_FREE(request->rq_repmsg, request->rq_replen); - request->rq_repmsg = NULL; - cleanup_bulk: if (request->rq_bulk != NULL) ptlrpc_unregister_bulk(request); @@ -559,7 +569,7 @@ int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd) md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MAX_SIZE; md.user_ptr = &rqbd->rqbd_cbid; md.eq_handle = ptlrpc_eq_h; - + rc = LNetMDAttach(me_h, md, LNET_UNLINK, &rqbd->rqbd_md_h); if (rc == 0) return (0); @@ -569,6 +579,6 @@ int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd) rc = LNetMEUnlink (me_h); LASSERT (rc == 0); rqbd->rqbd_refcount = 0; - + return (-ENOMEM); }