Whamcloud - gitweb
smash the HEAD with the contents of b_cmd. HEAD_PRE_CMD_SMASH and
[fs/lustre-release.git] / lustre / ptlrpc / niobuf.c
index b628216..c22e668 100644 (file)
@@ -23,6 +23,7 @@
 #define DEBUG_SUBSYSTEM S_RPC
 #ifndef __KERNEL__
 #include <liblustre.h>
+#include <portals/lib-types.h>
 #endif
 #include <linux/obd_support.h>
 #include <linux/lustre_net.h>
@@ -43,9 +44,11 @@ static int ptl_send_buf (ptl_handle_md_t *mdh, void *base, int len,
 
         LASSERT (portal != 0);
         LASSERT (conn != NULL);
-        CDEBUG (D_INFO, "conn=%p ni %s nid %s on %s\n",
+        CDEBUG (D_INFO, "conn=%p ni %s nid "LPX64" (%s) on %s\n",
                 conn, conn->c_peer.peer_ni->pni_name,
-                ptlrpc_peernid2str(&conn->c_peer, str),
+                conn->c_peer.peer_nid,
+                portals_nid2str(conn->c_peer.peer_ni->pni_number,
+                                conn->c_peer.peer_nid, str),
                 conn->c_peer.peer_ni->pni_name);
 
         remote_id.nid = conn->c_peer.peer_nid,
@@ -81,9 +84,8 @@ static int ptl_send_buf (ptl_handle_md_t *mdh, void *base, int len,
                 /* We're going to get an UNLINK event when I unlink below,
                  * which will complete just like any other failed send, so
                  * I fall through and return success here! */
-                CERROR("PtlPut(%s, %d, "LPD64") failed: %d\n",
-                       ptlrpc_peernid2str(&conn->c_peer, str),
-                       portal, xid, rc);
+                CERROR("PtlPut("LPU64", %d, "LPD64") failed: %d\n",
+                       remote_id.nid, portal, xid, rc);
                 rc2 = PtlMDUnlink(*mdh);
                 LASSERT (rc2 == PTL_OK);
         }
@@ -91,6 +93,20 @@ static int ptl_send_buf (ptl_handle_md_t *mdh, void *base, int len,
         RETURN (0);
 }
 
+static void ptlrpc_fill_md(ptl_md_t *md, struct ptlrpc_bulk_desc *desc)
+{
+        LASSERT(ptl_md_max_iovs() == 0  || 
+                (desc->bd_iov_count <= ptl_md_max_iovs()));
+
+        if (ptl_requires_iov() || desc->bd_iov_count > 0) {
+                md->options |= PTLRPC_PTL_MD_IOV;
+                md->start = &desc->bd_iov[0];
+                md->niov = desc->bd_iov_count;
+        } else {
+                md->start = ptl_iov_base(&desc->bd_iov[0]);
+        }
+}
+
 int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
 {
         int                 rc;
@@ -99,7 +115,6 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
         ptl_process_id_t    remote_id;
         ptl_md_t            md;
         __u64               xid;
-        char                str[PTL_NALFMT_SIZE];
         ENTRY;
 
         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_PTLRPC_BULK_PUT_NET)) 
@@ -112,12 +127,13 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
         desc->bd_success = 0;
         peer = &desc->bd_export->exp_connection->c_peer;
 
-        md.user_ptr = &desc->bd_cbid;
+        md.length = desc->bd_nob;
         md.eventq = peer->peer_ni->pni_eq_h;
         md.threshold = 2; /* SENT and ACK/REPLY */
         md.options = PTLRPC_MD_OPTIONS;
-        ptlrpc_fill_bulk_md(&md, desc);
 
+        ptlrpc_fill_md(&md, desc);
+        md.user_ptr = &desc->bd_cbid;
         LASSERT (desc->bd_cbid.cbid_fn == server_bulk_callback);
         LASSERT (desc->bd_cbid.cbid_arg == desc);
 
@@ -138,9 +154,9 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
         remote_id.pid = 0;
 
         CDEBUG(D_NET, "Transferring %u pages %u bytes via portal %d on %s "
-               "nid %s pid %d xid "LPX64"\n", desc->bd_iov_count,
-               desc->bd_nob, desc->bd_portal, peer->peer_ni->pni_name,
-               ptlrpc_peernid2str(peer, str), remote_id.pid, xid);
+               "nid "LPX64" pid %d xid "LPX64"\n", 
+               md.niov, md.length, desc->bd_portal, peer->peer_ni->pni_name,
+               remote_id.nid, remote_id.pid, xid);
 
         /* Network is about to get at the memory */
         desc->bd_network_rw = 1;
@@ -156,9 +172,8 @@ int ptlrpc_start_bulk_transfer (struct ptlrpc_bulk_desc *desc)
                 /* Can't send, so we unlink the MD bound above.  The UNLINK
                  * event this creates will signal completion with failure,
                  * so we return SUCCESS here! */
-                CERROR("Transfer(%s, %d, "LPX64") failed: %d\n",
-                       ptlrpc_peernid2str(peer, str),
-                       desc->bd_portal, xid, rc);
+                CERROR("Transfer("LPU64", %d, "LPX64") failed: %d\n",
+                       remote_id.nid, desc->bd_portal, xid, rc);
                 rc2 = PtlMDUnlink(desc->bd_md_h);
                 LASSERT (rc2 == PTL_OK);
         }
@@ -179,11 +194,16 @@ void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc)
                 return;                         /* never started */
         
         /* The unlink ensures the callback happens ASAP and is the last
-         * one.  If it fails, it must be because completion just happened,
-         * but we must still l_wait_event() in this case, to give liblustre
-         * a chance to run server_bulk_callback()*/
+         * one.  If it fails, it must be because completion just
+         * happened. */
 
-        PtlMDUnlink (desc->bd_md_h);
+        rc = PtlMDUnlink (desc->bd_md_h);
+        if (rc == PTL_MD_INVALID) {
+                LASSERT(!ptlrpc_bulk_active(desc));
+                return;
+        }
+        
+        LASSERT (rc == PTL_OK);
 
         for (;;) {
                 /* Network access will complete in finite time but the HUGE
@@ -225,14 +245,14 @@ int ptlrpc_register_bulk (struct ptlrpc_request *req)
 
         peer = &desc->bd_import->imp_connection->c_peer;
 
-        md.user_ptr = &desc->bd_cbid;
+        md.length = desc->bd_nob;
         md.eventq = peer->peer_ni->pni_eq_h;
         md.threshold = 1;                       /* PUT or GET */
         md.options = PTLRPC_MD_OPTIONS | 
                      ((desc->bd_type == BULK_GET_SOURCE) ? 
                       PTL_MD_OP_GET : PTL_MD_OP_PUT);
-        ptlrpc_fill_bulk_md(&md, desc);
-
+        ptlrpc_fill_md(&md, desc);
+        md.user_ptr = &desc->bd_cbid;
         LASSERT (desc->bd_cbid.cbid_fn == client_bulk_callback);
         LASSERT (desc->bd_cbid.cbid_arg == desc);
 
@@ -270,7 +290,7 @@ int ptlrpc_register_bulk (struct ptlrpc_request *req)
         CDEBUG(D_NET, "Setup bulk %s buffers: %u pages %u bytes, xid "LPX64", "
                "portal %u on %s\n",
                desc->bd_type == BULK_GET_SOURCE ? "get-source" : "put-sink",
-               desc->bd_iov_count, desc->bd_nob,
+               md.niov, md.length,
                req->rq_xid, desc->bd_portal, peer->peer_ni->pni_name);
         RETURN(0);
 }
@@ -292,11 +312,16 @@ void ptlrpc_unregister_bulk (struct ptlrpc_request *req)
         LASSERT (desc->bd_req == req);          /* bd_req NULL until registered */
 
         /* the unlink ensures the callback happens ASAP and is the last
-         * one.  If it fails, it must be because completion just happened,
-         * but we must still l_wait_event() in this case to give liblustre
-         * a chance to run client_bulk_callback() */
+         * one.  If it fails, it must be because completion just
+         * happened. */
 
-        PtlMDUnlink (desc->bd_md_h);
+        rc = PtlMDUnlink (desc->bd_md_h);
+        if (rc == PTL_MD_INVALID) {
+                LASSERT(!ptlrpc_bulk_active(desc));
+                return;
+        }
+        
+        LASSERT (rc == PTL_OK);
         
         if (desc->bd_req->rq_set != NULL)
                 wq = &req->rq_set->set_waitq;
@@ -446,7 +471,6 @@ int ptl_send_rpc(struct ptlrpc_request *request)
         request->rq_replied = 0;
         request->rq_err = 0;
         request->rq_timedout = 0;
-        request->rq_net_err = 0;
         request->rq_resend = 0;
         request->rq_restart = 0;
         spin_unlock_irqrestore (&request->rq_lock, flags);