Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
index 060fb8e..fb1bb1d 100644 (file)
@@ -51,15 +51,15 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len,
         md.user_ptr  = cbid;
         md.eq_handle = ptlrpc_eq_h;
 
-        if (ack == LNET_ACK_REQ &&
-            OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE)) {
+        if (unlikely(ack == LNET_ACK_REQ &&
+                     OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_ACK | OBD_FAIL_ONCE))) {
                 /* don't ask for the ack to simulate failing client */
                 ack = LNET_NOACK_REQ;
                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
         }
 
         rc = LNetMDBind (md, LNET_UNLINK, mdh);
-        if (rc != 0) {
+        if (unlikely(rc != 0)) {
                 CERROR ("LNetMDBind failed: %d\n", rc);
                 LASSERT (rc == -ENOMEM);
                 RETURN (-ENOMEM);
@@ -68,9 +68,9 @@ static int ptl_send_buf (lnet_handle_md_t *mdh, void *base, int len,
         CDEBUG(D_NET, "Sending %d bytes to portal %d, xid "LPD64"\n",
                len, portal, xid);
 
-        rc = LNetPut (conn->c_self, *mdh, ack, 
+        rc = LNetPut (conn->c_self, *mdh, ack,
                       conn->c_peer, portal, xid, 0, 0);
-        if (rc != 0) {
+        if (unlikely(rc != 0)) {
                 int rc2;
                 /* We're going to get an UNLINK event when I unlink below,
                  * which will complete just like any other failed send, so
@@ -93,7 +93,7 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
         __u64                     xid;
         ENTRY;
 
-        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET)) 
+        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET))
                 RETURN(0);
 
         /* NB no locking required until desc is on the network */
@@ -101,7 +101,6 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
         LASSERT (desc->bd_type == BULK_PUT_SOURCE ||
                  desc->bd_type == BULK_GET_SINK);
         desc->bd_success = 0;
-        desc->bd_sender = LNET_NID_ANY;
 
         md.user_ptr = &desc->bd_cbid;
         md.eq_handle = ptlrpc_eq_h;
@@ -126,17 +125,17 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
         xid = desc->bd_req->rq_xid;
         CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d "
                "id %s xid "LPX64"\n", desc->bd_iov_count,
-               desc->bd_nob, desc->bd_portal, 
+               desc->bd_nob, desc->bd_portal,
                libcfs_id2str(conn->c_peer), xid);
 
         /* Network is about to get at the memory */
         desc->bd_network_rw = 1;
 
         if (desc->bd_type == BULK_PUT_SOURCE)
-                rc = LNetPut (conn->c_self, desc->bd_md_h, LNET_ACK_REQ, 
+                rc = LNetPut (conn->c_self, desc->bd_md_h, LNET_ACK_REQ,
                               conn->c_peer, desc->bd_portal, xid, 0, 0);
         else
-                rc = LNetGet (conn->c_self, desc->bd_md_h, 
+                rc = LNetGet (conn->c_self, desc->bd_md_h,
                               conn->c_peer, desc->bd_portal, xid, 0);
 
         if (rc != 0) {
@@ -163,7 +162,7 @@ void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
 
         if (!ptlrpc_bulk_active(desc))          /* completed or */
                 return;                         /* never started */
-        
+
         /* Do not send any meaningful data over the wire for evicted clients */
         if (desc->bd_export && desc->bd_export->exp_failed)
                 ptl_rpc_wipe_bulk_pages(desc);
@@ -179,7 +178,7 @@ void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
                 /* Network access will complete in finite time but the HUGE
                  * timeout lets us CWARN for visibility of sluggish NALs */
                 lwi = LWI_TIMEOUT (cfs_time_seconds(300), NULL, NULL);
-                rc = l_wait_event(desc->bd_waitq, 
+                rc = l_wait_event(desc->bd_waitq,
                                   !ptlrpc_bulk_active(desc), &lwi);
                 if (rc == 0)
                         return;
@@ -199,7 +198,7 @@ int ptlrpc_register_bulk (struct ptlrpc_request *req)
         lnet_md_t         md;
         ENTRY;
 
-        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_GET_NET)) 
+        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_GET_NET))
                 RETURN(0);
 
         /* NB no locking required until desc is on the network */
@@ -211,15 +210,14 @@ int ptlrpc_register_bulk (struct ptlrpc_request *req)
                  desc->bd_type == BULK_GET_SOURCE);
 
         desc->bd_success = 0;
-        desc->bd_sender = LNET_NID_ANY;
 
         peer = desc->bd_import->imp_connection->c_peer;
 
         md.user_ptr = &desc->bd_cbid;
         md.eq_handle = ptlrpc_eq_h;
         md.threshold = 1;                       /* PUT or GET */
-        md.options = PTLRPC_MD_OPTIONS | 
-                     ((desc->bd_type == BULK_GET_SOURCE) ? 
+        md.options = PTLRPC_MD_OPTIONS |
+                     ((desc->bd_type == BULK_GET_SOURCE) ?
                       LNET_MD_OP_GET : LNET_MD_OP_PUT);
         ptlrpc_fill_bulk_md(&md, desc);
 
@@ -285,7 +283,7 @@ void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
          * a chance to run client_bulk_callback() */
 
         LNetMDUnlink (desc->bd_md_h);
-        
+
         if (req->rq_set != NULL)
                 wq = &req->rq_set->set_waitq;
         else
@@ -300,7 +298,7 @@ void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
                         return;
 
                 LASSERT (rc == -ETIMEDOUT);
-                DEBUG_REQ(D_WARNING,req,"Unexpectedly long timeout: desc %p\n",
+                DEBUG_REQ(D_WARNING,req,"Unexpectedly long timeout: desc %p",
                           desc);
         }
 }
@@ -313,19 +311,22 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult)
         int                        rc;
 
         /* We must already have a reply buffer (only ptlrpc_error() may be
-         * called without one).  We must also have a request buffer which
-         * is either the actual (swabbed) incoming request, or a saved copy
-         * if this is a req saved in target_queue_final_reply(). */
-        LASSERT (req->rq_reqmsg != NULL);
-        LASSERT (req->rq_repmsg != NULL);
+         * called without one). The reply generated by security layer (e.g.
+         * error notify, etc.) might have NULL rq->reqmsg; Otherwise we must
+         * have a request buffer which is either the actual (swabbed) incoming
+         * request, or a saved copy if this is a req saved in
+         * target_queue_final_reply().
+         */
+        LASSERT (req->rq_reqbuf != NULL);
         LASSERT (rs != NULL);
-        LASSERT (req->rq_repmsg == rs->rs_msg);
         LASSERT (may_be_difficult || !rs->rs_difficult);
+        LASSERT (req->rq_repmsg != NULL);
+        LASSERT (req->rq_repmsg == rs->rs_msg);
         LASSERT (rs->rs_cb_id.cbid_fn == reply_out_callback);
         LASSERT (rs->rs_cb_id.cbid_arg == rs);
 
-        if (req->rq_export && req->rq_export->exp_obd &&
-            req->rq_export->exp_obd->obd_fail) {
+        if (unlikely(req->rq_export && req->rq_export->exp_obd &&
+                     req->rq_export->exp_obd->obd_fail)) {
                 /* Failed obd's only send ENODEV */
                 req->rq_type = PTL_RPC_MSG_ERR;
                 req->rq_status = -ENODEV;
@@ -338,25 +339,31 @@ int ptlrpc_send_reply (struct ptlrpc_request *req, int may_be_difficult)
 
         lustre_msg_set_type(req->rq_repmsg, req->rq_type);
         lustre_msg_set_status(req->rq_repmsg, req->rq_status);
-        lustre_msg_set_opc(req->rq_repmsg, lustre_msg_get_opc(req->rq_reqmsg));
+        lustre_msg_set_opc(req->rq_repmsg,
+                req->rq_reqmsg ? lustre_msg_get_opc(req->rq_reqmsg) : 0);
 
         if (req->rq_export == NULL || req->rq_export->exp_connection == NULL)
                 conn = ptlrpc_get_connection(req->rq_peer, req->rq_self, NULL);
         else
                 conn = ptlrpc_connection_addref(req->rq_export->exp_connection);
 
-        if (conn == NULL) {
+        if (unlikely(conn == NULL)) {
                 CERROR("not replying on NULL connection\n"); /* bug 9635 */
                 return -ENOTCONN;
         }
         atomic_inc (&svc->srv_outstanding_replies);
         ptlrpc_rs_addref(rs);                   /* +1 ref for the network */
 
-        rc = ptl_send_buf (&rs->rs_md_h, req->rq_repmsg, req->rq_replen,
+        rc = sptlrpc_svc_wrap_reply(req);
+        if (unlikely(rc))
+                goto out;
+
+        rc = ptl_send_buf (&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len,
                            rs->rs_difficult ? LNET_ACK_REQ : LNET_NOACK_REQ,
                            &rs->rs_cb_id, conn,
                            svc->srv_rep_portal, req->rq_xid);
-        if (rc != 0) {
+out:
+        if (unlikely(rc != 0)) {
                 atomic_dec (&svc->srv_outstanding_replies);
                 ptlrpc_rs_decref(rs);
         }
@@ -395,7 +402,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         lnet_md_t         reply_md;
         ENTRY;
 
-        OBD_FAIL_RETURN(OBD_FAIL_PTLRPC_DROP_RPC, 0); 
+        OBD_FAIL_RETURN(OBD_FAIL_PTLRPC_DROP_RPC, 0);
 
         LASSERT (request->rq_type == PTL_RPC_MSG_REQUEST);
 
@@ -411,14 +418,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                 request->rq_err = 1;
                 RETURN(-ENODEV);
         }
-        
-        connection = request->rq_import->imp_connection;
 
-        if (request->rq_bulk != NULL) {
-                rc = ptlrpc_register_bulk (request);
-                if (rc != 0)
-                        RETURN(rc);
-        }
+        connection = request->rq_import->imp_connection;
 
         lustre_msg_set_handle(request->rq_reqmsg,
                               &request->rq_import->imp_remote_handle);
@@ -426,12 +427,25 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         lustre_msg_set_conn_cnt(request->rq_reqmsg,
                                 request->rq_import->imp_conn_cnt);
 
+        rc = sptlrpc_cli_wrap_request(request);
+        if (rc)
+                RETURN(rc);
+
+        /* bulk register should be done after wrap_request() */
+        if (request->rq_bulk != NULL) {
+                rc = ptlrpc_register_bulk (request);
+                if (rc != 0)
+                        RETURN(rc);
+        }
+
         if (!noreply) {
                 LASSERT (request->rq_replen != 0);
-                if (request->rq_repmsg == NULL)
-                        OBD_ALLOC(request->rq_repmsg, request->rq_replen);
-                if (request->rq_repmsg == NULL)
-                        GOTO(cleanup_bulk, rc = -ENOMEM);
+                if (request->rq_repbuf == NULL) {
+                        rc = sptlrpc_cli_alloc_repbuf(request,
+                                                      request->rq_replen);
+                        if (rc)
+                                GOTO(cleanup_bulk, rc);
+                }
 
                 rc = LNetMEAttach(request->rq_reply_portal,/*XXX FIXME bug 249*/
                                   connection->c_peer, request->rq_xid, 0,
@@ -439,7 +453,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                 if (rc != 0) {
                         CERROR("LNetMEAttach failed: %d\n", rc);
                         LASSERT (rc == -ENOMEM);
-                        GOTO(cleanup_repmsg, rc = -ENOMEM);
+                        GOTO(cleanup_bulk, rc = -ENOMEM);
                 }
         }
 
@@ -456,14 +470,14 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         spin_unlock(&request->rq_lock);
 
         if (!noreply) {
-                reply_md.start     = request->rq_repmsg;
-                reply_md.length    = request->rq_replen;
+                reply_md.start     = request->rq_repbuf;
+                reply_md.length    = request->rq_repbuf_len;
                 reply_md.threshold = 1;
                 reply_md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT;
                 reply_md.user_ptr  = &request->rq_reply_cbid;
                 reply_md.eq_handle = ptlrpc_eq_h;
 
-                rc = LNetMDAttach(reply_me_h, reply_md, LNET_UNLINK, 
+                rc = LNetMDAttach(reply_me_h, reply_md, LNET_UNLINK,
                                  &request->rq_reply_md_h);
                 if (rc != 0) {
                         CERROR("LNetMDAttach failed: %d\n", rc);
@@ -477,7 +491,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
 
                 CDEBUG(D_NET, "Setup reply buffer: %u bytes, xid "LPU64
                        ", portal %u\n",
-                       request->rq_replen, request->rq_xid,
+                       request->rq_repbuf_len, request->rq_xid,
                        request->rq_reply_portal);
         }
 
@@ -490,7 +504,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         request->rq_sent = CURRENT_SECONDS;
         ptlrpc_pinger_sending_on_import(request->rq_import);
         rc = ptl_send_buf(&request->rq_req_md_h,
-                          request->rq_reqmsg, request->rq_reqlen,
+                          request->rq_reqbuf, request->rq_reqdata_len,
                           LNET_NOACK_REQ, &request->rq_req_cbid,
                           connection,
                           request->rq_request_portal,
@@ -517,10 +531,6 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         /* UNLINKED callback called synchronously */
         LASSERT (!request->rq_receiving_reply);
 
- cleanup_repmsg:
-        OBD_FREE(request->rq_repmsg, request->rq_replen);
-        request->rq_repmsg = NULL;
-
  cleanup_bulk:
         if (request->rq_bulk != NULL)
                 ptlrpc_unregister_bulk(request);
@@ -559,7 +569,7 @@ int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
         md.options   = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MAX_SIZE;
         md.user_ptr  = &rqbd->rqbd_cbid;
         md.eq_handle = ptlrpc_eq_h;
-        
+
         rc = LNetMDAttach(me_h, md, LNET_UNLINK, &rqbd->rqbd_md_h);
         if (rc == 0)
                 return (0);
@@ -569,6 +579,6 @@ int ptlrpc_register_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
         rc = LNetMEUnlink (me_h);
         LASSERT (rc == 0);
         rqbd->rqbd_refcount = 0;
-        
+
         return (-ENOMEM);
 }