Whamcloud - gitweb
* Added socknal multi-frag I/O
authoreeb <eeb>
Fri, 29 Oct 2004 18:02:09 +0000 (18:02 +0000)
committereeb <eeb>
Fri, 29 Oct 2004 18:02:09 +0000 (18:02 +0000)
lnet/klnds/socklnd/socklnd.h
lnet/klnds/socklnd/socklnd_cb.c
lustre/portals/knals/socknal/socknal.h
lustre/portals/knals/socknal/socknal_cb.c

index b8bbefd..20cd3d9 100644 (file)
 
 #define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sk_sndbuf*8)/10)
 
+#define SOCKNAL_SINGLE_FRAG_TX      0           /* disable multi-fragment sends */
+#define SOCKNAL_SINGLE_FRAG_RX      0           /* disable multi-fragment receives */
+#define SOCKNAL_RISK_KMAP_DEADLOCK  0           /* risk kmap deadlock on multi-frag I/O 
+                                                 * (backs off to single-frag if disabled) */
+                                                
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,72))
 # define sk_allocation  allocation
 # define sk_data_ready data_ready
@@ -349,6 +354,13 @@ typedef struct ksock_conn
         atomic_t            ksnc_tx_nob;        /* # bytes queued */
         int                 ksnc_tx_ready;      /* write space */
         int                 ksnc_tx_scheduled;  /* being progressed */
+
+#if !SOCKNAL_SINGLE_FRAG_RX
+        struct iovec        ksnc_rx_scratch_iov[PTL_MD_MAX_IOV];
+#endif
+#if !SOCKNAL_SINGLE_FRAG_TX
+        struct iovec        ksnc_tx_scratch_iov[PTL_MD_MAX_IOV];
+#endif
 } ksock_conn_t;
 
 #define KSNR_TYPED_ROUTES   ((1 << SOCKNAL_CONN_CONTROL) |      \
index 762133e..ed91f94 100644 (file)
@@ -84,21 +84,17 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
         struct iovec  *iov = tx->tx_iov;
-        int            fragsize = iov->iov_len;
-        unsigned long  vaddr = (unsigned long)iov->iov_base;
-        int            more = (tx->tx_niov > 1) || 
-                              (tx->tx_nkiov > 0) ||
-                              (!list_empty (&conn->ksnc_tx_queue));
 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
+        unsigned long  vaddr = (unsigned long)iov->iov_base
         int            offset = vaddr & (PAGE_SIZE - 1);
-        int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
+        int            zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
         struct page   *page;
 #endif
+        int            nob;
         int            rc;
 
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only send 1 frag at a time. */
-        LASSERT (fragsize <= tx->tx_resid);
+         * or leave them alone. */
         LASSERT (tx->tx_niov > 0);
         
 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
@@ -106,52 +102,74 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
+                int msgflg = MSG_DONTWAIT;
                 
                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
                        (void *)vaddr, page, page_address(page), offset, zcsize);
 
-                if (fragsize > zcsize) {
-                        more = 1;
-                        fragsize = zcsize;
-                }
-
-                rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
-                                       more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
-                                       &tx->tx_zccd);
+                if (!list_empty (&conn->ksnc_tx_queue) ||
+                    zcsize < tx->tx_resid)
+                        msgflg |= MSG_MORE;
+                
+                rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
         } else
 #endif
         {
-                /* NB don't pass tx's iov; sendmsg may or may not update it */
-                struct iovec fragiov = { .iov_base = (void *)vaddr,
-                                         .iov_len  = fragsize};
+#if SOCKNAL_SINGLE_FRAG_TX
+                struct iovec    scratch;
+                struct iovec   *scratchiov = &scratch;
+                int             niov = 1;
+#else
+                struct iovec   *scratchiov = conn->ksnc_tx_scratch_iov;
+                int             niov = tx->tx_niov;
+#endif
                 struct msghdr msg = {
                         .msg_name       = NULL,
                         .msg_namelen    = 0,
-                        .msg_iov        = &fragiov,
-                        .msg_iovlen     = 1,
+                        .msg_iov        = scratchiov,
+                        .msg_iovlen     = niov,
                         .msg_control    = NULL,
                         .msg_controllen = 0,
-                        .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
+                        .msg_flags      = MSG_DONTWAIT
                 };
                 mm_segment_t oldmm = get_fs();
+                int  i;
 
+                for (nob = i = 0; i < niov; i++) {
+                        scratchiov[i] = tx->tx_iov[i];
+                        nob += scratchiov[i].iov_len;
+                }
+
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    nob < tx->tx_resid)
+                        msg.msg_flags |= MSG_MORE;
+                
                 set_fs (KERNEL_DS);
-                rc = sock_sendmsg(sock, &msg, fragsize);
+                rc = sock_sendmsg(sock, &msg, nob);
                 set_fs (oldmm);
         } 
 
-        if (rc > 0) {
-                tx->tx_resid -= rc;
+        if (rc <= 0)                            /* sent nothing? */
+                return (rc);
 
-                if (rc < iov->iov_len) {
-                        /* didn't send whole iov entry... */
-                        iov->iov_base = (void *)(vaddr + rc);
-                        iov->iov_len -= rc;
-                } else {
-                        tx->tx_iov++;
-                        tx->tx_niov--;
+        nob = rc;
+        LASSERT (nob <= tx->tx_resid);
+        tx->tx_resid -= nob;
+
+        /* "consume" iov */
+        do {
+                LASSERT (tx->tx_niov > 0);
+                
+                if (nob < iov->iov_len) {
+                        iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
+                        iov->iov_len -= nob;
+                        return (rc);
                 }
-        }
+
+                nob -= iov->iov_len;
+                tx->tx_iov = ++iov;
+                tx->tx_niov--;
+        } while (nob != 0);
         
         return (rc);
 }
@@ -161,66 +179,94 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
         ptl_kiov_t    *kiov = tx->tx_kiov;
-        int            fragsize = kiov->kiov_len;
-        struct page   *page = kiov->kiov_page;
-        int            offset = kiov->kiov_offset;
-        int            more = (tx->tx_nkiov > 1) ||
-                              (!list_empty (&conn->ksnc_tx_queue));
         int            rc;
-
+        int            nob;
+        
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only send 1 frag at a time. */
-        LASSERT (fragsize <= tx->tx_resid);
-        LASSERT (offset + fragsize <= PAGE_SIZE);
+         * or leave them alone. */
         LASSERT (tx->tx_niov == 0);
         LASSERT (tx->tx_nkiov > 0);
 
 #if SOCKNAL_ZC
-        if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag &&
+        if (kiov->kiov_len >= ksocknal_tunables.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
+                struct page   *page = kiov->kiov_page;
+                int            offset = kiov->kiov_offset;
+                int            fragsize = kiov->kiov_len;
+                int            msgflg = MSG_DONTWAIT;
 
                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
-                               page, offset, fragsize);
+                               page, offset, kiov->kiov_len);
 
-                rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
-                                       more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    fragsize < tx->tx_resid)
+                        msgflg |= MSG_MORE;
+
+                rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
                                        &tx->tx_zccd);
         } else
 #endif
         {
-                char *addr = ((char *)kmap (page)) + offset;
-                struct iovec fragiov = {.iov_base = addr,
-                                        .iov_len  = fragsize};
+#if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
+                struct iovec  scratch;
+                struct iovec *scratchiov = &scratch;
+                int           niov = 1;
+#else
+#warning "XXX risk of kmap deadlock on multiple frags..."
+                struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
+                int           niov = tx->tx_nkiov;
+#endif
                 struct msghdr msg = {
                         .msg_name       = NULL,
                         .msg_namelen    = 0,
-                        .msg_iov        = &fragiov,
-                        .msg_iovlen     = 1,
+                        .msg_iov        = scratchiov,
+                        .msg_iovlen     = niov,
                         .msg_control    = NULL,
                         .msg_controllen = 0,
-                        .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
+                        .msg_flags      = MSG_DONTWAIT
                 };
                 mm_segment_t  oldmm = get_fs();
+                int           i;
                 
+                for (nob = i = 0; i < niov; i++) {
+                        scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
+                                                 kiov[i].kiov_offset;
+                        nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+                }
+
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    nob < tx->tx_resid)
+                        msg.msg_flags |= MSG_DONTWAIT;
+
                 set_fs (KERNEL_DS);
-                rc = sock_sendmsg(sock, &msg, fragsize);
+                rc = sock_sendmsg(sock, &msg, nob);
                 set_fs (oldmm);
 
-                kunmap (page);
+                for (i = 0; i < niov; i++)
+                        kunmap(kiov[i].kiov_page);
         }
 
-        if (rc > 0) {
-                tx->tx_resid -= rc;
-                if (rc < fragsize) {
-                        kiov->kiov_offset = offset + rc;
-                        kiov->kiov_len    = fragsize - rc;
-                } else {
-                        tx->tx_kiov++;
-                        tx->tx_nkiov--;
+        if (rc <= 0)                            /* sent nothing? */
+                return (rc);
+
+        nob = rc;
+        LASSERT (nob <= tx->tx_resid);
+        tx->tx_resid -= nob;
+
+        do {
+                LASSERT(tx->tx_nkiov > 0);
+                
+                if (nob < kiov->kiov_len) {
+                        kiov->kiov_offset += nob;
+                        kiov->kiov_len -= nob;
+                        return rc;
                 }
-        }
+                
+                nob -= kiov->kiov_len;
+                tx->tx_kiov = ++kiov;
+                tx->tx_nkiov--;
+        } while (nob != 0);
 
         return (rc);
 }
@@ -269,35 +315,35 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                         mb();
                 }
 
-                if (rc <= 0) {
-                        /* Didn't write anything.
-                         *
-                         * NB: rc == 0 and rc == -EAGAIN both mean try
-                         * again later (linux stack returns -EAGAIN for
-                         * this, but Adaptech TOE returns 0).
-                         *
-                         * Also, sends never fail with -ENOMEM, just
-                         * -EAGAIN, but with the added bonus that we can't
-                         * expect write_space() to call us back to tell us
-                         * when to try sending again.  We use the
-                         * SOCK_NOSPACE flag to diagnose...  */
-
-                        LASSERT(rc != -ENOMEM);
-
-                        if (rc == 0 || rc == -EAGAIN) {
-                                if (test_bit(SOCK_NOSPACE, 
-                                             &conn->ksnc_sock->flags)) {
-                                        rc = -EAGAIN;
-                                } else {
-                                        static int counter;
-                         
-                                        counter++;
-                                        if ((counter & (-counter)) == counter)
-                                                CWARN("%d ENOMEM tx %p\n", 
-                                                      counter, conn);
-                                        rc = -ENOMEM;
-                                }
+                if (rc <= 0) { /* Didn't write anything? */
+                        unsigned long  flags;
+                        ksock_sched_t *sched;
+
+                        if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
+                                rc = -EAGAIN;
+
+                        if (rc != -EAGAIN)
+                                break;
+
+                        /* Check if EAGAIN is due to memory pressure */
+
+                        sched = conn->ksnc_scheduler;
+                        spin_lock_irqsave(&sched->kss_lock, flags);
+                                
+                        if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
+                            !conn->ksnc_tx_ready) {
+                                /* SOCK_NOSPACE is set when the socket fills
+                                 * and cleared in the write_space callback
+                                 * (which also sets ksnc_tx_ready).  If
+                                 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
+                                 * zero, I didn't fill the socket and
+                                 * write_space won't reschedule me, so I
+                                 * return -ENOMEM to get my caller to retry
+                                 * after a timeout */
+                                rc = -ENOMEM;
                         }
+
+                        spin_unlock_irqrestore(&sched->kss_lock, flags);
                         break;
                 }
 
@@ -332,114 +378,151 @@ ksocknal_eager_ack (ksock_conn_t *conn)
 int
 ksocknal_recv_iov (ksock_conn_t *conn)
 {
+#if SOCKNAL_SINGLE_FRAG_RX
+        struct iovec  scratch;
+        struct iovec *scratchiov = &scratch;
+        int           niov = 1;
+#else
+        struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+        int           niov = conn->ksnc_rx_niov;
+#endif
         struct iovec *iov = conn->ksnc_rx_iov;
-        int           fragsize  = iov->iov_len;
-        unsigned long vaddr = (unsigned long)iov->iov_base;
-        struct iovec  fragiov = { .iov_base = (void *)vaddr,
-                                  .iov_len  = fragsize};
         struct msghdr msg = {
                 .msg_name       = NULL,
                 .msg_namelen    = 0,
-                .msg_iov        = &fragiov,
-                .msg_iovlen     = 1,
+                .msg_iov        = scratchiov,
+                .msg_iovlen     = niov,
                 .msg_control    = NULL,
                 .msg_controllen = 0,
                 .msg_flags      = 0
         };
         mm_segment_t oldmm = get_fs();
+        int          nob;
+        int          i;
         int          rc;
 
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only receive 1 frag at a time. */
-        LASSERT (conn->ksnc_rx_niov > 0);
-        LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
+         * or leave them alone. */
+        LASSERT (niov > 0);
+
+        for (nob = i = 0; i < niov; i++) {
+                scratchiov[i] = iov[i];
+                nob += scratchiov[i].iov_len;
+        }
+        LASSERT (nob <= conn->ksnc_rx_nob_wanted);
 
         set_fs (KERNEL_DS);
-        rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
-        /* NB this is just a boolean............................^ */
+        rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
+        /* NB this is just a boolean..........................^ */
         set_fs (oldmm);
 
         if (rc <= 0)
                 return (rc);
 
         /* received something... */
+        nob = rc;
+
         conn->ksnc_peer->ksnp_last_alive = jiffies;
         conn->ksnc_rx_deadline = jiffies + 
                                  ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                           /* order with setting rx_started */
         conn->ksnc_rx_started = 1;
 
-        conn->ksnc_rx_nob_wanted -= rc;
-        conn->ksnc_rx_nob_left -= rc;
+        conn->ksnc_rx_nob_wanted -= nob;
+        conn->ksnc_rx_nob_left -= nob;
+
+        do {
+                LASSERT (conn->ksnc_rx_niov > 0);
                 
-        if (rc < fragsize) {
-                iov->iov_base = (void *)(vaddr + rc);
-                iov->iov_len = fragsize - rc;
-                return (-EAGAIN);
-        }
+                if (nob < iov->iov_len) {
+                        iov->iov_len -= nob;
+                        iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
+                        return (-EAGAIN);
+                }
+                
+                nob -= iov->iov_len;
+                conn->ksnc_rx_iov = ++iov;
+                conn->ksnc_rx_niov--;
+        } while (nob != 0);
 
-        conn->ksnc_rx_iov++;
-        conn->ksnc_rx_niov--;
-        return (1);
+        return (rc);
 }
 
 int
 ksocknal_recv_kiov (ksock_conn_t *conn)
 {
+#if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
+        struct iovec  scratch;
+        struct iovec *scratchiov = &scratch;
+        int           niov = 1;
+#else
+#warning "XXX risk of kmap deadlock on multiple frags..."
+        struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+        int           niov = conn->ksnc_rx_nkiov;
+#endif   
         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
-        struct page  *page = kiov->kiov_page;
-        int           offset = kiov->kiov_offset;
-        int           fragsize = kiov->kiov_len;
-        unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
-        struct iovec  fragiov = { .iov_base = (void *)vaddr,
-                                  .iov_len  = fragsize};
         struct msghdr msg = {
                 .msg_name       = NULL,
                 .msg_namelen    = 0,
-                .msg_iov        = &fragiov,
-                .msg_iovlen     = 1,
+                .msg_iov        = scratchiov,
+                .msg_iovlen     = niov,
                 .msg_control    = NULL,
                 .msg_controllen = 0,
                 .msg_flags      = 0
         };
         mm_segment_t oldmm = get_fs();
+        int          nob;
+        int          i;
         int          rc;
 
-        /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only receive 1 frag at a time. */
-        LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
         LASSERT (conn->ksnc_rx_nkiov > 0);
-        LASSERT (offset + fragsize <= PAGE_SIZE);
+
+        /* NB we can't trust socket ops to either consume our iovs
+         * or leave them alone. */
+        for (nob = i = 0; i < niov; i++) {
+                scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
+                nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+        }
+        LASSERT (nob <= conn->ksnc_rx_nob_wanted);
 
         set_fs (KERNEL_DS);
-        rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
-        /* NB this is just a boolean............................^ */
+        rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
+        /* NB this is just a boolean.......................^ */
         set_fs (oldmm);
 
-        kunmap (page);
-        
+        for (i = 0; i < niov; i++)
+                kunmap(kiov[i].kiov_page);
+
         if (rc <= 0)
                 return (rc);
         
         /* received something... */
+        nob = rc;
+
         conn->ksnc_peer->ksnp_last_alive = jiffies;
         conn->ksnc_rx_deadline = jiffies + 
                                  ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                           /* order with setting rx_started */
         conn->ksnc_rx_started = 1;
 
-        conn->ksnc_rx_nob_wanted -= rc;
-        conn->ksnc_rx_nob_left -= rc;
+        conn->ksnc_rx_nob_wanted -= nob;
+        conn->ksnc_rx_nob_left -= nob;
+
+        do {
+                LASSERT (conn->ksnc_rx_nkiov > 0);
                 
-        if (rc < fragsize) {
-                kiov->kiov_offset = offset + rc;
-                kiov->kiov_len = fragsize - rc;
-                return (-EAGAIN);
-        }
+                if (nob < kiov->kiov_len) {
+                        kiov->kiov_offset += nob;
+                        kiov->kiov_len -= nob;
+                        return -EAGAIN;
+                }
+                
+                nob -= kiov->kiov_len;
+                conn->ksnc_rx_kiov = ++kiov;
+                conn->ksnc_rx_nkiov--;
+        } while (nob != 0);
 
-        conn->ksnc_rx_kiov++;
-        conn->ksnc_rx_nkiov--;
-        return (1);
+        return 1;
 }
 
 int
@@ -599,6 +682,12 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                 return (rc);
 
         if (rc == -ENOMEM) {
+                static int counter;
+
+                counter++;   /* exponential backoff warnings */
+                if ((counter & (-counter)) == counter)
+                        CWARN("%d ENOMEM tx %p\n", counter, conn);
+
                 /* Queue on ksnd_enomem_conns for retry after a timeout */
                 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
 
@@ -1853,12 +1942,11 @@ ksocknal_write_space (struct sock *sk)
         }
 
         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
-                clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
-
                 sched = conn->ksnc_scheduler;
 
                 spin_lock_irqsave (&sched->kss_lock, flags);
 
+                clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
                 conn->ksnc_tx_ready = 1;
 
                 if (!conn->ksnc_tx_scheduled && // not being progressed
index b8bbefd..20cd3d9 100644 (file)
 
 #define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sk_sndbuf*8)/10)
 
+#define SOCKNAL_SINGLE_FRAG_TX      0           /* disable multi-fragment sends */
+#define SOCKNAL_SINGLE_FRAG_RX      0           /* disable multi-fragment receives */
+#define SOCKNAL_RISK_KMAP_DEADLOCK  0           /* risk kmap deadlock on multi-frag I/O 
+                                                 * (backs off to single-frag if disabled) */
+                                                
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,72))
 # define sk_allocation  allocation
 # define sk_data_ready data_ready
@@ -349,6 +354,13 @@ typedef struct ksock_conn
         atomic_t            ksnc_tx_nob;        /* # bytes queued */
         int                 ksnc_tx_ready;      /* write space */
         int                 ksnc_tx_scheduled;  /* being progressed */
+
+#if !SOCKNAL_SINGLE_FRAG_RX
+        struct iovec        ksnc_rx_scratch_iov[PTL_MD_MAX_IOV];
+#endif
+#if !SOCKNAL_SINGLE_FRAG_TX
+        struct iovec        ksnc_tx_scratch_iov[PTL_MD_MAX_IOV];
+#endif
 } ksock_conn_t;
 
 #define KSNR_TYPED_ROUTES   ((1 << SOCKNAL_CONN_CONTROL) |      \
index 762133e..ed91f94 100644 (file)
@@ -84,21 +84,17 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
         struct iovec  *iov = tx->tx_iov;
-        int            fragsize = iov->iov_len;
-        unsigned long  vaddr = (unsigned long)iov->iov_base;
-        int            more = (tx->tx_niov > 1) || 
-                              (tx->tx_nkiov > 0) ||
-                              (!list_empty (&conn->ksnc_tx_queue));
 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
+        unsigned long  vaddr = (unsigned long)iov->iov_base
         int            offset = vaddr & (PAGE_SIZE - 1);
-        int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
+        int            zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
         struct page   *page;
 #endif
+        int            nob;
         int            rc;
 
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only send 1 frag at a time. */
-        LASSERT (fragsize <= tx->tx_resid);
+         * or leave them alone. */
         LASSERT (tx->tx_niov > 0);
         
 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
@@ -106,52 +102,74 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
+                int msgflg = MSG_DONTWAIT;
                 
                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
                        (void *)vaddr, page, page_address(page), offset, zcsize);
 
-                if (fragsize > zcsize) {
-                        more = 1;
-                        fragsize = zcsize;
-                }
-
-                rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
-                                       more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
-                                       &tx->tx_zccd);
+                if (!list_empty (&conn->ksnc_tx_queue) ||
+                    zcsize < tx->tx_resid)
+                        msgflg |= MSG_MORE;
+                
+                rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
         } else
 #endif
         {
-                /* NB don't pass tx's iov; sendmsg may or may not update it */
-                struct iovec fragiov = { .iov_base = (void *)vaddr,
-                                         .iov_len  = fragsize};
+#if SOCKNAL_SINGLE_FRAG_TX
+                struct iovec    scratch;
+                struct iovec   *scratchiov = &scratch;
+                int             niov = 1;
+#else
+                struct iovec   *scratchiov = conn->ksnc_tx_scratch_iov;
+                int             niov = tx->tx_niov;
+#endif
                 struct msghdr msg = {
                         .msg_name       = NULL,
                         .msg_namelen    = 0,
-                        .msg_iov        = &fragiov,
-                        .msg_iovlen     = 1,
+                        .msg_iov        = scratchiov,
+                        .msg_iovlen     = niov,
                         .msg_control    = NULL,
                         .msg_controllen = 0,
-                        .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
+                        .msg_flags      = MSG_DONTWAIT
                 };
                 mm_segment_t oldmm = get_fs();
+                int  i;
 
+                for (nob = i = 0; i < niov; i++) {
+                        scratchiov[i] = tx->tx_iov[i];
+                        nob += scratchiov[i].iov_len;
+                }
+
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    nob < tx->tx_resid)
+                        msg.msg_flags |= MSG_MORE;
+                
                 set_fs (KERNEL_DS);
-                rc = sock_sendmsg(sock, &msg, fragsize);
+                rc = sock_sendmsg(sock, &msg, nob);
                 set_fs (oldmm);
         } 
 
-        if (rc > 0) {
-                tx->tx_resid -= rc;
+        if (rc <= 0)                            /* sent nothing? */
+                return (rc);
 
-                if (rc < iov->iov_len) {
-                        /* didn't send whole iov entry... */
-                        iov->iov_base = (void *)(vaddr + rc);
-                        iov->iov_len -= rc;
-                } else {
-                        tx->tx_iov++;
-                        tx->tx_niov--;
+        nob = rc;
+        LASSERT (nob <= tx->tx_resid);
+        tx->tx_resid -= nob;
+
+        /* "consume" iov */
+        do {
+                LASSERT (tx->tx_niov > 0);
+                
+                if (nob < iov->iov_len) {
+                        iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
+                        iov->iov_len -= nob;
+                        return (rc);
                 }
-        }
+
+                nob -= iov->iov_len;
+                tx->tx_iov = ++iov;
+                tx->tx_niov--;
+        } while (nob != 0);
         
         return (rc);
 }
@@ -161,66 +179,94 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
         ptl_kiov_t    *kiov = tx->tx_kiov;
-        int            fragsize = kiov->kiov_len;
-        struct page   *page = kiov->kiov_page;
-        int            offset = kiov->kiov_offset;
-        int            more = (tx->tx_nkiov > 1) ||
-                              (!list_empty (&conn->ksnc_tx_queue));
         int            rc;
-
+        int            nob;
+        
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only send 1 frag at a time. */
-        LASSERT (fragsize <= tx->tx_resid);
-        LASSERT (offset + fragsize <= PAGE_SIZE);
+         * or leave them alone. */
         LASSERT (tx->tx_niov == 0);
         LASSERT (tx->tx_nkiov > 0);
 
 #if SOCKNAL_ZC
-        if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag &&
+        if (kiov->kiov_len >= ksocknal_tunables.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
+                struct page   *page = kiov->kiov_page;
+                int            offset = kiov->kiov_offset;
+                int            fragsize = kiov->kiov_len;
+                int            msgflg = MSG_DONTWAIT;
 
                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
-                               page, offset, fragsize);
+                               page, offset, kiov->kiov_len);
 
-                rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
-                                       more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    fragsize < tx->tx_resid)
+                        msgflg |= MSG_MORE;
+
+                rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
                                        &tx->tx_zccd);
         } else
 #endif
         {
-                char *addr = ((char *)kmap (page)) + offset;
-                struct iovec fragiov = {.iov_base = addr,
-                                        .iov_len  = fragsize};
+#if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
+                struct iovec  scratch;
+                struct iovec *scratchiov = &scratch;
+                int           niov = 1;
+#else
+#warning "XXX risk of kmap deadlock on multiple frags..."
+                struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
+                int           niov = tx->tx_nkiov;
+#endif
                 struct msghdr msg = {
                         .msg_name       = NULL,
                         .msg_namelen    = 0,
-                        .msg_iov        = &fragiov,
-                        .msg_iovlen     = 1,
+                        .msg_iov        = scratchiov,
+                        .msg_iovlen     = niov,
                         .msg_control    = NULL,
                         .msg_controllen = 0,
-                        .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
+                        .msg_flags      = MSG_DONTWAIT
                 };
                 mm_segment_t  oldmm = get_fs();
+                int           i;
                 
+                for (nob = i = 0; i < niov; i++) {
+                        scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
+                                                 kiov[i].kiov_offset;
+                        nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+                }
+
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    nob < tx->tx_resid)
+                        msg.msg_flags |= MSG_DONTWAIT;
+
                 set_fs (KERNEL_DS);
-                rc = sock_sendmsg(sock, &msg, fragsize);
+                rc = sock_sendmsg(sock, &msg, nob);
                 set_fs (oldmm);
 
-                kunmap (page);
+                for (i = 0; i < niov; i++)
+                        kunmap(kiov[i].kiov_page);
         }
 
-        if (rc > 0) {
-                tx->tx_resid -= rc;
-                if (rc < fragsize) {
-                        kiov->kiov_offset = offset + rc;
-                        kiov->kiov_len    = fragsize - rc;
-                } else {
-                        tx->tx_kiov++;
-                        tx->tx_nkiov--;
+        if (rc <= 0)                            /* sent nothing? */
+                return (rc);
+
+        nob = rc;
+        LASSERT (nob <= tx->tx_resid);
+        tx->tx_resid -= nob;
+
+        do {
+                LASSERT(tx->tx_nkiov > 0);
+                
+                if (nob < kiov->kiov_len) {
+                        kiov->kiov_offset += nob;
+                        kiov->kiov_len -= nob;
+                        return rc;
                 }
-        }
+                
+                nob -= kiov->kiov_len;
+                tx->tx_kiov = ++kiov;
+                tx->tx_nkiov--;
+        } while (nob != 0);
 
         return (rc);
 }
@@ -269,35 +315,35 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                         mb();
                 }
 
-                if (rc <= 0) {
-                        /* Didn't write anything.
-                         *
-                         * NB: rc == 0 and rc == -EAGAIN both mean try
-                         * again later (linux stack returns -EAGAIN for
-                         * this, but Adaptech TOE returns 0).
-                         *
-                         * Also, sends never fail with -ENOMEM, just
-                         * -EAGAIN, but with the added bonus that we can't
-                         * expect write_space() to call us back to tell us
-                         * when to try sending again.  We use the
-                         * SOCK_NOSPACE flag to diagnose...  */
-
-                        LASSERT(rc != -ENOMEM);
-
-                        if (rc == 0 || rc == -EAGAIN) {
-                                if (test_bit(SOCK_NOSPACE, 
-                                             &conn->ksnc_sock->flags)) {
-                                        rc = -EAGAIN;
-                                } else {
-                                        static int counter;
-                         
-                                        counter++;
-                                        if ((counter & (-counter)) == counter)
-                                                CWARN("%d ENOMEM tx %p\n", 
-                                                      counter, conn);
-                                        rc = -ENOMEM;
-                                }
+                if (rc <= 0) { /* Didn't write anything? */
+                        unsigned long  flags;
+                        ksock_sched_t *sched;
+
+                        if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
+                                rc = -EAGAIN;
+
+                        if (rc != -EAGAIN)
+                                break;
+
+                        /* Check if EAGAIN is due to memory pressure */
+
+                        sched = conn->ksnc_scheduler;
+                        spin_lock_irqsave(&sched->kss_lock, flags);
+                                
+                        if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
+                            !conn->ksnc_tx_ready) {
+                                /* SOCK_NOSPACE is set when the socket fills
+                                 * and cleared in the write_space callback
+                                 * (which also sets ksnc_tx_ready).  If
+                                 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
+                                 * zero, I didn't fill the socket and
+                                 * write_space won't reschedule me, so I
+                                 * return -ENOMEM to get my caller to retry
+                                 * after a timeout */
+                                rc = -ENOMEM;
                         }
+
+                        spin_unlock_irqrestore(&sched->kss_lock, flags);
                         break;
                 }
 
@@ -332,114 +378,151 @@ ksocknal_eager_ack (ksock_conn_t *conn)
 int
 ksocknal_recv_iov (ksock_conn_t *conn)
 {
+#if SOCKNAL_SINGLE_FRAG_RX
+        struct iovec  scratch;
+        struct iovec *scratchiov = &scratch;
+        int           niov = 1;
+#else
+        struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+        int           niov = conn->ksnc_rx_niov;
+#endif
         struct iovec *iov = conn->ksnc_rx_iov;
-        int           fragsize  = iov->iov_len;
-        unsigned long vaddr = (unsigned long)iov->iov_base;
-        struct iovec  fragiov = { .iov_base = (void *)vaddr,
-                                  .iov_len  = fragsize};
         struct msghdr msg = {
                 .msg_name       = NULL,
                 .msg_namelen    = 0,
-                .msg_iov        = &fragiov,
-                .msg_iovlen     = 1,
+                .msg_iov        = scratchiov,
+                .msg_iovlen     = niov,
                 .msg_control    = NULL,
                 .msg_controllen = 0,
                 .msg_flags      = 0
         };
         mm_segment_t oldmm = get_fs();
+        int          nob;
+        int          i;
         int          rc;
 
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only receive 1 frag at a time. */
-        LASSERT (conn->ksnc_rx_niov > 0);
-        LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
+         * or leave them alone. */
+        LASSERT (niov > 0);
+
+        for (nob = i = 0; i < niov; i++) {
+                scratchiov[i] = iov[i];
+                nob += scratchiov[i].iov_len;
+        }
+        LASSERT (nob <= conn->ksnc_rx_nob_wanted);
 
         set_fs (KERNEL_DS);
-        rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
-        /* NB this is just a boolean............................^ */
+        rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
+        /* NB this is just a boolean..........................^ */
         set_fs (oldmm);
 
         if (rc <= 0)
                 return (rc);
 
         /* received something... */
+        nob = rc;
+
         conn->ksnc_peer->ksnp_last_alive = jiffies;
         conn->ksnc_rx_deadline = jiffies + 
                                  ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                           /* order with setting rx_started */
         conn->ksnc_rx_started = 1;
 
-        conn->ksnc_rx_nob_wanted -= rc;
-        conn->ksnc_rx_nob_left -= rc;
+        conn->ksnc_rx_nob_wanted -= nob;
+        conn->ksnc_rx_nob_left -= nob;
+
+        do {
+                LASSERT (conn->ksnc_rx_niov > 0);
                 
-        if (rc < fragsize) {
-                iov->iov_base = (void *)(vaddr + rc);
-                iov->iov_len = fragsize - rc;
-                return (-EAGAIN);
-        }
+                if (nob < iov->iov_len) {
+                        iov->iov_len -= nob;
+                        iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
+                        return (-EAGAIN);
+                }
+                
+                nob -= iov->iov_len;
+                conn->ksnc_rx_iov = ++iov;
+                conn->ksnc_rx_niov--;
+        } while (nob != 0);
 
-        conn->ksnc_rx_iov++;
-        conn->ksnc_rx_niov--;
-        return (1);
+        return (rc);
 }
 
 int
 ksocknal_recv_kiov (ksock_conn_t *conn)
 {
+#if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
+        struct iovec  scratch;
+        struct iovec *scratchiov = &scratch;
+        int           niov = 1;
+#else
+#warning "XXX risk of kmap deadlock on multiple frags..."
+        struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+        int           niov = conn->ksnc_rx_nkiov;
+#endif   
         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
-        struct page  *page = kiov->kiov_page;
-        int           offset = kiov->kiov_offset;
-        int           fragsize = kiov->kiov_len;
-        unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
-        struct iovec  fragiov = { .iov_base = (void *)vaddr,
-                                  .iov_len  = fragsize};
         struct msghdr msg = {
                 .msg_name       = NULL,
                 .msg_namelen    = 0,
-                .msg_iov        = &fragiov,
-                .msg_iovlen     = 1,
+                .msg_iov        = scratchiov,
+                .msg_iovlen     = niov,
                 .msg_control    = NULL,
                 .msg_controllen = 0,
                 .msg_flags      = 0
         };
         mm_segment_t oldmm = get_fs();
+        int          nob;
+        int          i;
         int          rc;
 
-        /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only receive 1 frag at a time. */
-        LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
         LASSERT (conn->ksnc_rx_nkiov > 0);
-        LASSERT (offset + fragsize <= PAGE_SIZE);
+
+        /* NB we can't trust socket ops to either consume our iovs
+         * or leave them alone. */
+        for (nob = i = 0; i < niov; i++) {
+                scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
+                nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+        }
+        LASSERT (nob <= conn->ksnc_rx_nob_wanted);
 
         set_fs (KERNEL_DS);
-        rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
-        /* NB this is just a boolean............................^ */
+        rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
+        /* NB this is just a boolean.......................^ */
         set_fs (oldmm);
 
-        kunmap (page);
-        
+        for (i = 0; i < niov; i++)
+                kunmap(kiov[i].kiov_page);
+
         if (rc <= 0)
                 return (rc);
         
         /* received something... */
+        nob = rc;
+
         conn->ksnc_peer->ksnp_last_alive = jiffies;
         conn->ksnc_rx_deadline = jiffies + 
                                  ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                           /* order with setting rx_started */
         conn->ksnc_rx_started = 1;
 
-        conn->ksnc_rx_nob_wanted -= rc;
-        conn->ksnc_rx_nob_left -= rc;
+        conn->ksnc_rx_nob_wanted -= nob;
+        conn->ksnc_rx_nob_left -= nob;
+
+        do {
+                LASSERT (conn->ksnc_rx_nkiov > 0);
                 
-        if (rc < fragsize) {
-                kiov->kiov_offset = offset + rc;
-                kiov->kiov_len = fragsize - rc;
-                return (-EAGAIN);
-        }
+                if (nob < kiov->kiov_len) {
+                        kiov->kiov_offset += nob;
+                        kiov->kiov_len -= nob;
+                        return -EAGAIN;
+                }
+                
+                nob -= kiov->kiov_len;
+                conn->ksnc_rx_kiov = ++kiov;
+                conn->ksnc_rx_nkiov--;
+        } while (nob != 0);
 
-        conn->ksnc_rx_kiov++;
-        conn->ksnc_rx_nkiov--;
-        return (1);
+        return 1;
 }
 
 int
@@ -599,6 +682,12 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                 return (rc);
 
         if (rc == -ENOMEM) {
+                static int counter;
+
+                counter++;   /* exponential backoff warnings */
+                if ((counter & (-counter)) == counter)
+                        CWARN("%d ENOMEM tx %p\n", counter, conn);
+
                 /* Queue on ksnd_enomem_conns for retry after a timeout */
                 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
 
@@ -1853,12 +1942,11 @@ ksocknal_write_space (struct sock *sk)
         }
 
         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
-                clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
-
                 sched = conn->ksnc_scheduler;
 
                 spin_lock_irqsave (&sched->kss_lock, flags);
 
+                clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
                 conn->ksnc_tx_ready = 1;
 
                 if (!conn->ksnc_tx_scheduled && // not being progressed