Whamcloud - gitweb
* Applied the last patch in Bug 2306, which changes the portals router/NAL
[fs/lustre-release.git] / lustre / portals / knals / socknal / socknal_cb.c
index 0678d41..72bd0b7 100644 (file)
@@ -123,7 +123,7 @@ ksocknal_free_ltx (ksock_ltx_t *ltx)
         PORTAL_FREE(ltx, ltx->ltx_desc_size);
 }
 
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
 struct page *
 ksocknal_kvaddr_to_page (unsigned long vaddr)
 {
@@ -159,7 +159,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         int            more = (tx->tx_niov > 1) || 
                               (tx->tx_nkiov > 0) ||
                               (!list_empty (&conn->ksnc_tx_queue));
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
         int            offset = vaddr & (PAGE_SIZE - 1);
         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
         struct page   *page;
@@ -171,7 +171,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (fragsize <= tx->tx_resid);
         LASSERT (tx->tx_niov > 0);
         
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
@@ -771,7 +771,8 @@ ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
         /* Find the conn with the shortest tx queue */
         list_for_each (tmp, &peer->ksnp_conns) {
                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
-                int           nob = atomic_read(&c->ksnc_tx_nob);
+                int           nob = atomic_read(&c->ksnc_tx_nob) +
+                                    c->ksnc_sock->sk->sk_wmem_queued;
 
                 LASSERT (!c->ksnc_closing);
 
@@ -1132,7 +1133,7 @@ void
 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
-        ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
+        ksock_ftx_t  *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
         int           rc;
         
         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
@@ -1142,14 +1143,18 @@ ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (nid == ksocknal_lib.ni.nid)
                 nid = fwd->kprfd_target_nid;
 
-        tx->tx_isfwd = 1;                   /* This is a forwarding packet */
-        tx->tx_nob   = fwd->kprfd_nob;
-        tx->tx_niov  = fwd->kprfd_niov;
-        tx->tx_iov   = fwd->kprfd_iov;
-        tx->tx_nkiov = 0;
-        tx->tx_kiov  = NULL;
+        /* setup iov for hdr */
+        ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
+        ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
+
+        ftx->ftx_tx.tx_isfwd = 1;                  /* This is a forwarding packet */
+        ftx->ftx_tx.tx_nob   = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
+        ftx->ftx_tx.tx_niov  = 1;
+        ftx->ftx_tx.tx_iov   = &ftx->ftx_iov;
+        ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
+        ftx->ftx_tx.tx_kiov  = fwd->kprfd_kiov;
 
-        rc = ksocknal_launch_packet (tx, nid);
+        rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
         if (rc != 0)
                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
 }
@@ -1177,7 +1182,7 @@ ksocknal_fmb_callback (void *arg, int error)
 {
         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
-        ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
+        ptl_hdr_t         *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page);
         ksock_conn_t      *conn = NULL;
         ksock_sched_t     *sched;
         unsigned long      flags;
@@ -1235,7 +1240,6 @@ ksock_fmb_t *
 ksocknal_get_idle_fmb (ksock_conn_t *conn)
 {
         int               payload_nob = conn->ksnc_rx_nob_left;
-        int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
         unsigned long     flags;
         ksock_fmb_pool_t *pool;
         ksock_fmb_t      *fmb;
@@ -1243,7 +1247,7 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn)
         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
         LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
 
-        if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
+        if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
                 pool = &ksocknal_data.ksnd_small_fmp;
         else
                 pool = &ksocknal_data.ksnd_large_fmp;
@@ -1274,98 +1278,64 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn)
 int
 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
 {
-        int payload_nob = conn->ksnc_rx_nob_left;
-        int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
+        int       payload_nob = conn->ksnc_rx_nob_left;
         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
-        int niov;                               /* at least the header */
-        int nob;
+        int       niov = 0;
+        int       nob = payload_nob;
 
         LASSERT (conn->ksnc_rx_scheduled);
         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
         LASSERT (payload_nob >= 0);
-        LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
+        LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
-
-        /* Got a forwarding buffer; copy the header we just read into the
-         * forwarding buffer.  If there's payload, start reading reading it
-         * into the buffer, otherwise the forwarding buffer can be kicked
-         * off immediately.
-         *
-         * NB fmb->fmb_iov spans the WHOLE packet.
-         *    conn->ksnc_rx_iov spans just the payload.
-         */
-        fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
-
-        /* copy header */
-        memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
+        LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
 
         /* Take a ref on the conn's peer to prevent module unload before
-         * forwarding completes.  NB we ref peer and not conn since because
-         * all refs on conn after it has been closed must remove themselves
-         * in finite time */
+         * forwarding completes. */
         fmb->fmb_peer = conn->ksnc_peer;
         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
 
-        if (payload_nob == 0) {         /* got complete packet already */
-                CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
-                        conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
-                        dest_nid, packet_nob);
+        /* Copy the header we just read into the forwarding buffer.  If
+         * there's payload, start reading reading it into the buffer,
+         * otherwise the forwarding buffer can be kicked off
+         * immediately. */
+        fmb->fmb_hdr = conn->ksnc_hdr;
 
-                fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
+        while (nob > 0) {
+                LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
+                LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
+                fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
+                nob -= PAGE_SIZE;
+                niov++;
+        }
+
+        kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
+                     payload_nob, niov, fmb->fmb_kiov,
+                     ksocknal_fmb_callback, fmb);
 
-                kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
-                              packet_nob, 1, fmb->fmb_iov,
-                              ksocknal_fmb_callback, fmb);
+        if (payload_nob == 0) {         /* got complete packet already */
+                CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
+                        conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid);
 
-                /* forward it now */
                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
 
                 ksocknal_new_packet (conn, 0);  /* on to next packet */
                 return (1);
         }
 
-        niov = 1;
-        if (packet_nob <= PAGE_SIZE) {  /* whole packet fits in first page */
-                fmb->fmb_iov[0].iov_len = packet_nob;
-        } else {
-                fmb->fmb_iov[0].iov_len = PAGE_SIZE;
-                nob = packet_nob - PAGE_SIZE;
-
-                do {
-                        LASSERT (niov < fmb->fmb_npages);
-                        fmb->fmb_iov[niov].iov_base =
-                                page_address (fmb->fmb_pages[niov]);
-                        fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
-                        nob -= PAGE_SIZE;
-                        niov++;
-                } while (nob > 0);
-        }
-
-        kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
-                      packet_nob, niov, fmb->fmb_iov,
-                      ksocknal_fmb_callback, fmb);
-
         conn->ksnc_cookie = fmb;                /* stash fmb for later */
         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
         
-        /* payload is desc's iov-ed buffer, but skipping the hdr */
-        LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
-                 sizeof (struct iovec));
-
-        conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
-        conn->ksnc_rx_iov[0].iov_base =
-                (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
-                         sizeof (ptl_hdr_t));
-        conn->ksnc_rx_iov[0].iov_len =
-                fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
-
-        if (niov > 1)
-                memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
-                       (niov - 1) * sizeof (struct iovec));
-
-        conn->ksnc_rx_niov = niov;
+        /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
+         * buffer */
+        LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
 
+        conn->ksnc_rx_niov = 0;
+        conn->ksnc_rx_nkiov = niov;
+        conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
+        memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
+        
         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
         return (0);