Whamcloud - gitweb
* Added socknal multi-frag I/O
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
index 21e0abe..ed91f94 100644 (file)
  *  LIB functions follow
  *
  */
-ptl_err_t
-ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
-              user_ptr src_addr, size_t len)
-{
-        CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
-               nal->ni.nid, (long)len, src_addr, dst_addr);
-
-        memcpy( dst_addr, src_addr, len );
-        return PTL_OK;
-}
-
-ptl_err_t
-ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
-               void *src_addr, size_t len)
-{
-        CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
-               nal->ni.nid, (long)len, src_addr, dst_addr);
-
-        memcpy( dst_addr, src_addr, len );
-        return PTL_OK;
-}
-
-void *
-ksocknal_malloc(nal_cb_t *nal, size_t len)
-{
-        void *buf;
-
-        PORTAL_ALLOC(buf, len);
-
-        if (buf != NULL)
-                memset(buf, 0, len);
-
-        return (buf);
-}
-
-void
-ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
-{
-        PORTAL_FREE(buf, len);
-}
-
-void
-ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
-{
-        va_list ap;
-        char msg[256];
-
-        va_start (ap, fmt);
-        vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
-        va_end (ap);
-
-        msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
-
-        CDEBUG (D_NET, "%s", msg);
-}
-
-void
-ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
-{
-        ksock_nal_data_t *data = nal->nal_data;
-
-        /* OK to ignore 'flags'; we're only ever serialise threads and
-         * never need to lock out interrupts */
-        spin_lock(&data->ksnd_nal_cb_lock);
-}
-
-void
-ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
-{
-        ksock_nal_data_t *data;
-        data = nal->nal_data;
-
-        /* OK to ignore 'flags'; we're only ever serialise threads and
-         * never need to lock out interrupts */
-        spin_unlock(&data->ksnd_nal_cb_lock);
-}
-
-void
-ksocknal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
-{
-        /* holding ksnd_nal_cb_lock */
-
-        if (eq->event_callback != NULL)
-                eq->event_callback(ev);
-        
-        if (waitqueue_active(&ksocknal_data.ksnd_yield_waitq))
-                wake_up_all(&ksocknal_data.ksnd_yield_waitq);
-}
-
 int
-ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
+ksocknal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
 {
         /* I would guess that if ksocknal_get_peer (nid) == NULL,
            and we're not routing, then 'nid' is very distant :) */
-        if ( nal->ni.nid == nid ) {
+        if (nal->libnal_ni.ni_pid.nid == nid) {
                 *dist = 0;
         } else {
                 *dist = 1;
@@ -173,21 +84,17 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
         struct iovec  *iov = tx->tx_iov;
-        int            fragsize = iov->iov_len;
-        unsigned long  vaddr = (unsigned long)iov->iov_base;
-        int            more = (tx->tx_niov > 1) || 
-                              (tx->tx_nkiov > 0) ||
-                              (!list_empty (&conn->ksnc_tx_queue));
 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
+        unsigned long  vaddr = (unsigned long)iov->iov_base
         int            offset = vaddr & (PAGE_SIZE - 1);
-        int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
+        int            zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
         struct page   *page;
 #endif
+        int            nob;
         int            rc;
 
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only send 1 frag at a time. */
-        LASSERT (fragsize <= tx->tx_resid);
+         * or leave them alone. */
         LASSERT (tx->tx_niov > 0);
         
 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
@@ -195,52 +102,74 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
+                int msgflg = MSG_DONTWAIT;
                 
                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
                        (void *)vaddr, page, page_address(page), offset, zcsize);
 
-                if (fragsize > zcsize) {
-                        more = 1;
-                        fragsize = zcsize;
-                }
-
-                rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
-                                       more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
-                                       &tx->tx_zccd);
+                if (!list_empty (&conn->ksnc_tx_queue) ||
+                    zcsize < tx->tx_resid)
+                        msgflg |= MSG_MORE;
+                
+                rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
         } else
 #endif
         {
-                /* NB don't pass tx's iov; sendmsg may or may not update it */
-                struct iovec fragiov = { .iov_base = (void *)vaddr,
-                                         .iov_len  = fragsize};
+#if SOCKNAL_SINGLE_FRAG_TX
+                struct iovec    scratch;
+                struct iovec   *scratchiov = &scratch;
+                int             niov = 1;
+#else
+                struct iovec   *scratchiov = conn->ksnc_tx_scratch_iov;
+                int             niov = tx->tx_niov;
+#endif
                 struct msghdr msg = {
                         .msg_name       = NULL,
                         .msg_namelen    = 0,
-                        .msg_iov        = &fragiov,
-                        .msg_iovlen     = 1,
+                        .msg_iov        = scratchiov,
+                        .msg_iovlen     = niov,
                         .msg_control    = NULL,
                         .msg_controllen = 0,
-                        .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
+                        .msg_flags      = MSG_DONTWAIT
                 };
                 mm_segment_t oldmm = get_fs();
+                int  i;
 
+                for (nob = i = 0; i < niov; i++) {
+                        scratchiov[i] = tx->tx_iov[i];
+                        nob += scratchiov[i].iov_len;
+                }
+
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    nob < tx->tx_resid)
+                        msg.msg_flags |= MSG_MORE;
+                
                 set_fs (KERNEL_DS);
-                rc = sock_sendmsg(sock, &msg, fragsize);
+                rc = sock_sendmsg(sock, &msg, nob);
                 set_fs (oldmm);
         } 
 
-        if (rc > 0) {
-                tx->tx_resid -= rc;
+        if (rc <= 0)                            /* sent nothing? */
+                return (rc);
 
-                if (rc < iov->iov_len) {
-                        /* didn't send whole iov entry... */
-                        iov->iov_base = (void *)(vaddr + rc);
-                        iov->iov_len -= rc;
-                } else {
-                        tx->tx_iov++;
-                        tx->tx_niov--;
+        nob = rc;
+        LASSERT (nob <= tx->tx_resid);
+        tx->tx_resid -= nob;
+
+        /* "consume" iov */
+        do {
+                LASSERT (tx->tx_niov > 0);
+                
+                if (nob < iov->iov_len) {
+                        iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
+                        iov->iov_len -= nob;
+                        return (rc);
                 }
-        }
+
+                nob -= iov->iov_len;
+                tx->tx_iov = ++iov;
+                tx->tx_niov--;
+        } while (nob != 0);
         
         return (rc);
 }
@@ -250,66 +179,94 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
         ptl_kiov_t    *kiov = tx->tx_kiov;
-        int            fragsize = kiov->kiov_len;
-        struct page   *page = kiov->kiov_page;
-        int            offset = kiov->kiov_offset;
-        int            more = (tx->tx_nkiov > 1) ||
-                              (!list_empty (&conn->ksnc_tx_queue));
         int            rc;
-
+        int            nob;
+        
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only send 1 frag at a time. */
-        LASSERT (fragsize <= tx->tx_resid);
-        LASSERT (offset + fragsize <= PAGE_SIZE);
+         * or leave them alone. */
         LASSERT (tx->tx_niov == 0);
         LASSERT (tx->tx_nkiov > 0);
 
 #if SOCKNAL_ZC
-        if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag &&
+        if (kiov->kiov_len >= ksocknal_tunables.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
+                struct page   *page = kiov->kiov_page;
+                int            offset = kiov->kiov_offset;
+                int            fragsize = kiov->kiov_len;
+                int            msgflg = MSG_DONTWAIT;
 
                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
-                               page, offset, fragsize);
+                               page, offset, kiov->kiov_len);
+
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    fragsize < tx->tx_resid)
+                        msgflg |= MSG_MORE;
 
-                rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
-                                       more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
+                rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
                                        &tx->tx_zccd);
         } else
 #endif
         {
-                char *addr = ((char *)kmap (page)) + offset;
-                struct iovec fragiov = {.iov_base = addr,
-                                        .iov_len  = fragsize};
+#if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
+                struct iovec  scratch;
+                struct iovec *scratchiov = &scratch;
+                int           niov = 1;
+#else
+#warning "XXX risk of kmap deadlock on multiple frags..."
+                struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
+                int           niov = tx->tx_nkiov;
+#endif
                 struct msghdr msg = {
                         .msg_name       = NULL,
                         .msg_namelen    = 0,
-                        .msg_iov        = &fragiov,
-                        .msg_iovlen     = 1,
+                        .msg_iov        = scratchiov,
+                        .msg_iovlen     = niov,
                         .msg_control    = NULL,
                         .msg_controllen = 0,
-                        .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
+                        .msg_flags      = MSG_DONTWAIT
                 };
                 mm_segment_t  oldmm = get_fs();
+                int           i;
                 
+                for (nob = i = 0; i < niov; i++) {
+                        scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
+                                                 kiov[i].kiov_offset;
+                        nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+                }
+
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    nob < tx->tx_resid)
+                        msg.msg_flags |= MSG_DONTWAIT;
+
                 set_fs (KERNEL_DS);
-                rc = sock_sendmsg(sock, &msg, fragsize);
+                rc = sock_sendmsg(sock, &msg, nob);
                 set_fs (oldmm);
 
-                kunmap (page);
+                for (i = 0; i < niov; i++)
+                        kunmap(kiov[i].kiov_page);
         }
 
-        if (rc > 0) {
-                tx->tx_resid -= rc;
-                if (rc < fragsize) {
-                        kiov->kiov_offset = offset + rc;
-                        kiov->kiov_len    = fragsize - rc;
-                } else {
-                        tx->tx_kiov++;
-                        tx->tx_nkiov--;
+        if (rc <= 0)                            /* sent nothing? */
+                return (rc);
+
+        nob = rc;
+        LASSERT (nob <= tx->tx_resid);
+        tx->tx_resid -= nob;
+
+        do {
+                LASSERT(tx->tx_nkiov > 0);
+                
+                if (nob < kiov->kiov_len) {
+                        kiov->kiov_offset += nob;
+                        kiov->kiov_len -= nob;
+                        return rc;
                 }
-        }
+                
+                nob -= kiov->kiov_len;
+                tx->tx_kiov = ++kiov;
+                tx->tx_nkiov--;
+        } while (nob != 0);
 
         return (rc);
 }
@@ -318,6 +275,7 @@ int
 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         int      rc;
+        int      bufnob;
         
         if (ksocknal_data.ksnd_stall_tx != 0) {
                 set_current_state (TASK_UNINTERRUPTIBLE);
@@ -343,50 +301,56 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                         rc = ksocknal_send_kiov (conn, tx);
                 }
 
-                if (rc <= 0) {
-                        /* Didn't write anything.
-                         *
-                         * NB: rc == 0 and rc == -EAGAIN both mean try
-                         * again later (linux stack returns -EAGAIN for
-                         * this, but Adaptech TOE returns 0).
-                         *
-                         * Also, sends never fail with -ENOMEM, just
-                         * -EAGAIN, but with the added bonus that we can't
-                         * expect write_space() to call us back to tell us
-                         * when to try sending again.  We use the
-                         * SOCK_NOSPACE flag to diagnose...  */
-
-                        LASSERT(rc != -ENOMEM);
-
-                        if (rc == 0 || rc == -EAGAIN) {
-                                if (test_bit(SOCK_NOSPACE, 
-                                             &conn->ksnc_sock->flags)) {
-                                        rc = -EAGAIN;
-                                } else {
-                                        static int counter;
-                         
-                                        counter++;
-                                        if ((counter & (-counter)) == counter)
-                                                CWARN("%d ENOMEM tx %p\n", 
-                                                      counter, conn);
-                                        rc = -ENOMEM;
-                                }
+                bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
+                if (rc > 0)                     /* sent something? */
+                        conn->ksnc_tx_bufnob += rc; /* account it */
+                
+                if (bufnob < conn->ksnc_tx_bufnob) {
+                        /* allocated send buffer bytes < computed; infer
+                         * something got ACKed */
+                        conn->ksnc_tx_deadline = jiffies + 
+                                                 ksocknal_tunables.ksnd_io_timeout * HZ;
+                        conn->ksnc_peer->ksnp_last_alive = jiffies;
+                        conn->ksnc_tx_bufnob = bufnob;
+                        mb();
+                }
+
+                if (rc <= 0) { /* Didn't write anything? */
+                        unsigned long  flags;
+                        ksock_sched_t *sched;
+
+                        if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
+                                rc = -EAGAIN;
+
+                        if (rc != -EAGAIN)
+                                break;
+
+                        /* Check if EAGAIN is due to memory pressure */
+
+                        sched = conn->ksnc_scheduler;
+                        spin_lock_irqsave(&sched->kss_lock, flags);
+                                
+                        if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
+                            !conn->ksnc_tx_ready) {
+                                /* SOCK_NOSPACE is set when the socket fills
+                                 * and cleared in the write_space callback
+                                 * (which also sets ksnc_tx_ready).  If
+                                 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
+                                 * zero, I didn't fill the socket and
+                                 * write_space won't reschedule me, so I
+                                 * return -ENOMEM to get my caller to retry
+                                 * after a timeout */
+                                rc = -ENOMEM;
                         }
+
+                        spin_unlock_irqrestore(&sched->kss_lock, flags);
                         break;
                 }
 
+                /* socket's wmem_queued now includes 'rc' bytes */
+                atomic_sub (rc, &conn->ksnc_tx_nob);
                 rc = 0;
 
-                /* Consider the connection alive since we managed to chuck
-                 * more data into it.  Really, we'd like to consider it
-                 * alive only when the peer ACKs something, but
-                 * write_space() only gets called back while SOCK_NOSPACE
-                 * is set.  Instead, we presume peer death has occurred if
-                 * the socket doesn't drain within a timout */
-                conn->ksnc_tx_deadline = jiffies + 
-                                         ksocknal_tunables.ksnd_io_timeout * HZ;
-                conn->ksnc_peer->ksnp_last_alive = jiffies;
-
         } while (tx->tx_resid != 0);
 
         ksocknal_putconnsock (conn);
@@ -414,114 +378,151 @@ ksocknal_eager_ack (ksock_conn_t *conn)
 int
 ksocknal_recv_iov (ksock_conn_t *conn)
 {
+#if SOCKNAL_SINGLE_FRAG_RX
+        struct iovec  scratch;
+        struct iovec *scratchiov = &scratch;
+        int           niov = 1;
+#else
+        struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+        int           niov = conn->ksnc_rx_niov;
+#endif
         struct iovec *iov = conn->ksnc_rx_iov;
-        int           fragsize  = iov->iov_len;
-        unsigned long vaddr = (unsigned long)iov->iov_base;
-        struct iovec  fragiov = { .iov_base = (void *)vaddr,
-                                  .iov_len  = fragsize};
         struct msghdr msg = {
                 .msg_name       = NULL,
                 .msg_namelen    = 0,
-                .msg_iov        = &fragiov,
-                .msg_iovlen     = 1,
+                .msg_iov        = scratchiov,
+                .msg_iovlen     = niov,
                 .msg_control    = NULL,
                 .msg_controllen = 0,
                 .msg_flags      = 0
         };
         mm_segment_t oldmm = get_fs();
+        int          nob;
+        int          i;
         int          rc;
 
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only receive 1 frag at a time. */
-        LASSERT (conn->ksnc_rx_niov > 0);
-        LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
+         * or leave them alone. */
+        LASSERT (niov > 0);
+
+        for (nob = i = 0; i < niov; i++) {
+                scratchiov[i] = iov[i];
+                nob += scratchiov[i].iov_len;
+        }
+        LASSERT (nob <= conn->ksnc_rx_nob_wanted);
 
         set_fs (KERNEL_DS);
-        rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
-        /* NB this is just a boolean............................^ */
+        rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
+        /* NB this is just a boolean..........................^ */
         set_fs (oldmm);
 
         if (rc <= 0)
                 return (rc);
 
         /* received something... */
+        nob = rc;
+
         conn->ksnc_peer->ksnp_last_alive = jiffies;
         conn->ksnc_rx_deadline = jiffies + 
                                  ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                           /* order with setting rx_started */
         conn->ksnc_rx_started = 1;
 
-        conn->ksnc_rx_nob_wanted -= rc;
-        conn->ksnc_rx_nob_left -= rc;
+        conn->ksnc_rx_nob_wanted -= nob;
+        conn->ksnc_rx_nob_left -= nob;
+
+        do {
+                LASSERT (conn->ksnc_rx_niov > 0);
                 
-        if (rc < fragsize) {
-                iov->iov_base = (void *)(vaddr + rc);
-                iov->iov_len = fragsize - rc;
-                return (-EAGAIN);
-        }
+                if (nob < iov->iov_len) {
+                        iov->iov_len -= nob;
+                        iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
+                        return (-EAGAIN);
+                }
+                
+                nob -= iov->iov_len;
+                conn->ksnc_rx_iov = ++iov;
+                conn->ksnc_rx_niov--;
+        } while (nob != 0);
 
-        conn->ksnc_rx_iov++;
-        conn->ksnc_rx_niov--;
-        return (1);
+        return (rc);
 }
 
 int
 ksocknal_recv_kiov (ksock_conn_t *conn)
 {
+#if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
+        struct iovec  scratch;
+        struct iovec *scratchiov = &scratch;
+        int           niov = 1;
+#else
+#warning "XXX risk of kmap deadlock on multiple frags..."
+        struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+        int           niov = conn->ksnc_rx_nkiov;
+#endif   
         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
-        struct page  *page = kiov->kiov_page;
-        int           offset = kiov->kiov_offset;
-        int           fragsize = kiov->kiov_len;
-        unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
-        struct iovec  fragiov = { .iov_base = (void *)vaddr,
-                                  .iov_len  = fragsize};
         struct msghdr msg = {
                 .msg_name       = NULL,
                 .msg_namelen    = 0,
-                .msg_iov        = &fragiov,
-                .msg_iovlen     = 1,
+                .msg_iov        = scratchiov,
+                .msg_iovlen     = niov,
                 .msg_control    = NULL,
                 .msg_controllen = 0,
                 .msg_flags      = 0
         };
         mm_segment_t oldmm = get_fs();
+        int          nob;
+        int          i;
         int          rc;
 
-        /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only receive 1 frag at a time. */
-        LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
         LASSERT (conn->ksnc_rx_nkiov > 0);
-        LASSERT (offset + fragsize <= PAGE_SIZE);
+
+        /* NB we can't trust socket ops to either consume our iovs
+         * or leave them alone. */
+        for (nob = i = 0; i < niov; i++) {
+                scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
+                nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+        }
+        LASSERT (nob <= conn->ksnc_rx_nob_wanted);
 
         set_fs (KERNEL_DS);
-        rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
-        /* NB this is just a boolean............................^ */
+        rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
+        /* NB this is just a boolean.......................^ */
         set_fs (oldmm);
 
-        kunmap (page);
-        
+        for (i = 0; i < niov; i++)
+                kunmap(kiov[i].kiov_page);
+
         if (rc <= 0)
                 return (rc);
         
         /* received something... */
+        nob = rc;
+
         conn->ksnc_peer->ksnp_last_alive = jiffies;
         conn->ksnc_rx_deadline = jiffies + 
                                  ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                           /* order with setting rx_started */
         conn->ksnc_rx_started = 1;
 
-        conn->ksnc_rx_nob_wanted -= rc;
-        conn->ksnc_rx_nob_left -= rc;
+        conn->ksnc_rx_nob_wanted -= nob;
+        conn->ksnc_rx_nob_left -= nob;
+
+        do {
+                LASSERT (conn->ksnc_rx_nkiov > 0);
                 
-        if (rc < fragsize) {
-                kiov->kiov_offset = offset + rc;
-                kiov->kiov_len = fragsize - rc;
-                return (-EAGAIN);
-        }
+                if (nob < kiov->kiov_len) {
+                        kiov->kiov_offset += nob;
+                        kiov->kiov_len -= nob;
+                        return -EAGAIN;
+                }
+                
+                nob -= kiov->kiov_len;
+                conn->ksnc_rx_kiov = ++kiov;
+                conn->ksnc_rx_nkiov--;
+        } while (nob != 0);
 
-        conn->ksnc_rx_kiov++;
-        conn->ksnc_rx_nkiov--;
-        return (1);
+        return 1;
 }
 
 int
@@ -608,8 +609,6 @@ ksocknal_tx_done (ksock_tx_t *tx, int asynch)
         ENTRY;
 
         if (tx->tx_conn != NULL) {
-                /* This tx got queued on a conn; do the accounting... */
-                atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
 #if SOCKNAL_ZC
                 /* zero copy completion isn't always from
                  * process_transmit() so it needs to keep a ref on
@@ -683,6 +682,12 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                 return (rc);
 
         if (rc == -ENOMEM) {
+                static int counter;
+
+                counter++;   /* exponential backoff warnings */
+                if ((counter & (-counter)) == counter)
+                        CWARN("%d ENOMEM tx %p\n", counter, conn);
+
                 /* Queue on ksnd_enomem_conns for retry after a timeout */
                 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
 
@@ -724,7 +729,7 @@ ksocknal_launch_autoconnect_locked (ksock_route_t *route)
         LASSERT (!route->ksnr_deleted);
         LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
         LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
-        LASSERT (!route->ksnr_connecting);
+        LASSERT (route->ksnr_connecting == 0);
         
         if (ksocknal_tunables.ksnd_typed_conns)
                 route->ksnr_connecting = 
@@ -786,13 +791,16 @@ ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
         int               tnob  = 0;
         ksock_conn_t     *fallback = NULL;
         int               fnob     = 0;
+        ksock_conn_t     *conn;
 
-        /* Find the conn with the shortest tx queue */
         list_for_each (tmp, &peer->ksnp_conns) {
                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
+#if SOCKNAL_ROUND_ROBIN
+                const int     nob = 0;
+#else
                 int           nob = atomic_read(&c->ksnc_tx_nob) +
                                         c->ksnc_sock->sk->sk_wmem_queued;
-
+#endif
                 LASSERT (!c->ksnc_closing);
 
                 if (fallback == NULL || nob < fnob) {
@@ -827,7 +835,16 @@ ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
         }
 
         /* prefer the typed selection */
-        return ((typed != NULL) ? typed : fallback);
+        conn = (typed != NULL) ? typed : fallback;
+
+#if SOCKNAL_ROUND_ROBIN
+        if (conn != NULL) {
+                /* round-robin all else being equal */
+                list_del (&conn->ksnc_list);
+                list_add_tail (&conn->ksnc_list, &peer->ksnp_conns);
+        }
+#endif
+        return conn;
 }
 
 void
@@ -858,9 +875,14 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 #endif
         spin_lock_irqsave (&sched->kss_lock, flags);
 
-        conn->ksnc_tx_deadline = jiffies + 
-                                 ksocknal_tunables.ksnd_io_timeout * HZ;
-        mb();                                   /* order with list_add_tail */
+        if (list_empty(&conn->ksnc_tx_queue) &&
+            conn->ksnc_sock->sk->sk_wmem_queued == 0) {
+                /* First packet starts the timeout */
+                conn->ksnc_tx_deadline = jiffies +
+                                         ksocknal_tunables.ksnd_io_timeout * HZ;
+                conn->ksnc_tx_bufnob = 0;
+                mb();    /* order with adding to tx_queue */
+        }
 
         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
                 
@@ -882,42 +904,32 @@ ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
 {
         struct list_head  *tmp;
         ksock_route_t     *route;
-        ksock_route_t     *candidate = NULL;
-        int                found = 0;
         int                bits;
         
         list_for_each (tmp, &peer->ksnp_routes) {
                 route = list_entry (tmp, ksock_route_t, ksnr_list);
                 bits  = route->ksnr_connected;
-                
-                if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES ||
-                    (bits & (1 << SOCKNAL_CONN_ANY)) != 0 ||
-                    route->ksnr_connecting != 0) {
-                        /* All typed connections have been established, or
-                         * an untyped connection has been established, or
-                         * connections are currently being established */
-                        found = 1;
+
+                /* All typed connections established? */
+                if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES)
+                        continue;
+
+                /* Untyped connection established? */
+                if ((bits & (1 << SOCKNAL_CONN_ANY)) != 0)
+                        continue;
+
+                /* connection being established? */
+                if (route->ksnr_connecting != 0)
                         continue;
-                }
 
                 /* too soon to retry this guy? */
                 if (!time_after_eq (jiffies, route->ksnr_timeout))
                         continue;
                 
-                /* always do eager routes */
-                if (route->ksnr_eager)
-                        return (route);
-
-                if (candidate == NULL) {
-                        /* If we don't find any other route that is fully
-                         * connected or connecting, the first connectable
-                         * route is returned.  If it fails to connect, it
-                         * will get placed at the end of the list */
-                        candidate = route;
-                }
+                return (route);
         }
-        return (found ? NULL : candidate);
+        
+        return (NULL);
 }
 
 ksock_route_t *
@@ -965,8 +977,9 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
         tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
 
         g_lock = &ksocknal_data.ksnd_global_lock;
+#if !SOCKNAL_ROUND_ROBIN
         read_lock (g_lock);
-        
+
         peer = ksocknal_find_target_peer_locked (tx, nid);
         if (peer == NULL) {
                 read_unlock (g_lock);
@@ -983,19 +996,17 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
                         return (0);
                 }
         }
-        
-        /* Making one or more connections; I'll need a write lock... */
-
-        atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
+        /* I'll need a write lock... */
         read_unlock (g_lock);
-        write_lock_irqsave (g_lock, flags);
-        
-        if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
-                write_unlock_irqrestore (g_lock, flags);
-                ksocknal_put_peer (peer);
+#endif
+        write_lock_irqsave(g_lock, flags);
+
+        peer = ksocknal_find_target_peer_locked (tx, nid);
+        if (peer == NULL) {
+                write_unlock_irqrestore(g_lock, flags);
                 return (-EHOSTUNREACH);
         }
-        ksocknal_put_peer (peer);               /* drop ref I got above */
 
         for (;;) {
                 /* launch any/all autoconnections that need it */
@@ -1028,7 +1039,7 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
 }
 
 ptl_err_t
-ksocknal_sendmsg(nal_cb_t     *nal, 
+ksocknal_sendmsg(lib_nal_t     *nal, 
                  void         *private, 
                  lib_msg_t    *cookie,
                  ptl_hdr_t    *hdr, 
@@ -1125,7 +1136,7 @@ ksocknal_sendmsg(nal_cb_t     *nal,
 }
 
 ptl_err_t
-ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
+ksocknal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
                unsigned int payload_niov, struct iovec *payload_iov,
                size_t payload_offset, size_t payload_len)
@@ -1137,7 +1148,7 @@ ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
 }
 
 ptl_err_t
-ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
+ksocknal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
                      size_t payload_offset, size_t payload_len)
@@ -1159,7 +1170,7 @@ ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
 
         /* I'm the gateway; must be the last hop */
-        if (nid == ksocknal_lib.ni.nid)
+        if (nid == ksocknal_lib.libnal_ni.ni_pid.nid)
                 nid = fwd->kprfd_target_nid;
 
         /* setup iov for hdr */
@@ -1181,19 +1192,26 @@ ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 int
 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
 {
-        long    pid = kernel_thread (fn, arg, 0);
+        long          pid = kernel_thread (fn, arg, 0);
+        unsigned long flags;
 
         if (pid < 0)
                 return ((int)pid);
 
-        atomic_inc (&ksocknal_data.ksnd_nthreads);
+        write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
+        ksocknal_data.ksnd_nthreads++;
+        write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
         return (0);
 }
 
 void
 ksocknal_thread_fini (void)
 {
-        atomic_dec (&ksocknal_data.ksnd_nthreads);
+        unsigned long flags;
+
+        write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
+        ksocknal_data.ksnd_nthreads--;
+        write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
 }
 
 void
@@ -1201,7 +1219,7 @@ ksocknal_fmb_callback (void *arg, int error)
 {
         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
-        ptl_hdr_t         *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page);
+        ptl_hdr_t         *hdr = &fmb->fmb_hdr;
         ksock_conn_t      *conn = NULL;
         ksock_sched_t     *sched;
         unsigned long      flags;
@@ -1211,14 +1229,14 @@ ksocknal_fmb_callback (void *arg, int error)
         if (error != 0)
                 CERROR("Failed to route packet from "
                        LPX64" %s to "LPX64" %s: %d\n",
-                       NTOH__u64(hdr->src_nid),
-                       portals_nid2str(SOCKNAL, NTOH__u64(hdr->src_nid), ipbuf),
-                       NTOH__u64(hdr->dest_nid),
-                       portals_nid2str(SOCKNAL, NTOH__u64(hdr->dest_nid), ipbuf2),
+                       le64_to_cpu(hdr->src_nid),
+                       portals_nid2str(SOCKNAL, le64_to_cpu(hdr->src_nid), ipbuf),
+                       le64_to_cpu(hdr->dest_nid),
+                       portals_nid2str(SOCKNAL, le64_to_cpu(hdr->dest_nid), ipbuf2),
                        error);
         else
                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
-                        NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
+                        le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid));
 
         /* drop peer ref taken on init */
         ksocknal_put_peer (fmb->fmb_peer);
@@ -1298,7 +1316,7 @@ int
 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
 {
         int       payload_nob = conn->ksnc_rx_nob_left;
-        ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
+        ptl_nid_t dest_nid = le64_to_cpu(conn->ksnc_hdr.dest_nid);
         int       niov = 0;
         int       nob = payload_nob;
 
@@ -1335,7 +1353,7 @@ ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
 
         if (payload_nob == 0) {         /* got complete packet already */
                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
-                        conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid);
+                        conn, le64_to_cpu(conn->ksnc_hdr.src_nid), dest_nid);
 
                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
 
@@ -1356,7 +1374,7 @@ ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
         memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
         
         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
-                NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
+                le64_to_cpu(conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
         return (0);
 }
 
@@ -1364,9 +1382,9 @@ void
 ksocknal_fwd_parse (ksock_conn_t *conn)
 {
         ksock_peer_t *peer;
-        ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
-        ptl_nid_t     src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid);
-        int           body_len = NTOH__u32 (conn->ksnc_hdr.payload_length);
+        ptl_nid_t     dest_nid = le64_to_cpu(conn->ksnc_hdr.dest_nid);
+        ptl_nid_t     src_nid = le64_to_cpu(conn->ksnc_hdr.src_nid);
+        int           body_len = le32_to_cpu(conn->ksnc_hdr.payload_length);
         char str[PTL_NALFMT_SIZE];
         char str2[PTL_NALFMT_SIZE];
 
@@ -1543,8 +1561,9 @@ ksocknal_process_receive (ksock_conn_t *conn)
         
         switch (conn->ksnc_rx_state) {
         case SOCKNAL_RX_HEADER:
-                if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
-                    NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
+                if (conn->ksnc_hdr.type != cpu_to_le32(PTL_MSG_HELLO) &&
+                    le64_to_cpu(conn->ksnc_hdr.dest_nid) != 
+                    ksocknal_lib.libnal_ni.ni_pid.nid) {
                         /* This packet isn't for me */
                         ksocknal_fwd_parse (conn);
                         switch (conn->ksnc_rx_state) {
@@ -1561,7 +1580,13 @@ ksocknal_process_receive (ksock_conn_t *conn)
                 }
 
                 /* sets wanted_len, iovs etc */
-                lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
+                rc = lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
+
+                if (rc != PTL_OK) {
+                        /* I just received garbage: give up on this conn */
+                        ksocknal_close_conn_and_siblings (conn, rc);
+                        return (-EPROTO);
+                }
 
                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
@@ -1583,8 +1608,8 @@ ksocknal_process_receive (ksock_conn_t *conn)
         case SOCKNAL_RX_BODY_FWD:
                 /* payload all received */
                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
-                        conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
-                        NTOH__u64 (conn->ksnc_hdr.dest_nid),
+                        conn, le64_to_cpu(conn->ksnc_hdr.src_nid),
+                        le64_to_cpu(conn->ksnc_hdr.dest_nid),
                         conn->ksnc_rx_nob_left);
 
                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
@@ -1608,7 +1633,7 @@ ksocknal_process_receive (ksock_conn_t *conn)
 }
 
 ptl_err_t
-ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
+ksocknal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
                unsigned int niov, struct iovec *iov, 
                size_t offset, size_t mlen, size_t rlen)
 {
@@ -1636,7 +1661,7 @@ ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
 }
 
 ptl_err_t
-ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
+ksocknal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
                      unsigned int niov, ptl_kiov_t *kiov, 
                      size_t offset, size_t mlen, size_t rlen)
 {
@@ -1663,6 +1688,25 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
         return (PTL_OK);
 }
 
+static inline int
+ksocknal_sched_cansleep(ksock_sched_t *sched)
+{
+        unsigned long flags;
+        int           rc;
+
+        spin_lock_irqsave(&sched->kss_lock, flags);
+
+        rc = (!ksocknal_data.ksnd_shuttingdown &&
+#if SOCKNAL_ZC
+              list_empty(&sched->kss_zctxdone_list) &&
+#endif
+              list_empty(&sched->kss_rx_conns) &&
+              list_empty(&sched->kss_tx_conns));
+        
+        spin_unlock_irqrestore(&sched->kss_lock, flags);
+        return (rc);
+}
+
 int ksocknal_scheduler (void *arg)
 {
         ksock_sched_t     *sched = (ksock_sched_t *)arg;
@@ -1679,14 +1723,13 @@ int ksocknal_scheduler (void *arg)
         kportal_blockallsigs ();
 
 #if (CONFIG_SMP && CPU_AFFINITY)
-        if ((cpu_online_map & (1 << id)) != 0) {
-#if 1
-                current->cpus_allowed = (1 << id);
-#else
-                set_cpus_allowed (current, 1<<id);
-#endif
+        id = ksocknal_sched2cpu(id);
+        if (cpu_online(id)) {
+                cpumask_t m;
+                cpu_set(id, m);
+                set_cpus_allowed(current, m);
         } else {
-                CERROR ("Can't set CPU affinity for %s\n", name);
+                CERROR ("Can't set CPU affinity for %s to %d\n", name, id);
         }
 #endif /* CONFIG_SMP && CPU_AFFINITY */
         
@@ -1814,18 +1857,8 @@ int ksocknal_scheduler (void *arg)
                         nloops = 0;
 
                         if (!did_something) {   /* wait for something to do */
-#if SOCKNAL_ZC
-                                rc = wait_event_interruptible (sched->kss_waitq,
-                                                               ksocknal_data.ksnd_shuttingdown ||
-                                                               !list_empty(&sched->kss_rx_conns) ||
-                                                               !list_empty(&sched->kss_tx_conns) ||
-                                                               !list_empty(&sched->kss_zctxdone_list));
-#else
                                 rc = wait_event_interruptible (sched->kss_waitq,
-                                                               ksocknal_data.ksnd_shuttingdown ||
-                                                               !list_empty(&sched->kss_rx_conns) ||
-                                                               !list_empty(&sched->kss_tx_conns));
-#endif
+                                                               !ksocknal_sched_cansleep(sched));
                                 LASSERT (rc == 0);
                         } else
                                our_cond_resched();
@@ -1909,12 +1942,11 @@ ksocknal_write_space (struct sock *sk)
         }
 
         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
-                clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
-
                 sched = conn->ksnc_scheduler;
 
                 spin_lock_irqsave (&sched->kss_lock, flags);
 
+                clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
                 conn->ksnc_tx_ready = 1;
 
                 if (!conn->ksnc_tx_scheduled && // not being progressed
@@ -2013,133 +2045,245 @@ ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
 }
 
 int
-ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type,
-                __u64 *incarnation)
+ksocknal_send_hello (ksock_conn_t *conn, __u32 *ipaddrs, int nipaddrs)
 {
-        int                 rc;
+        /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */
+        struct socket      *sock = conn->ksnc_sock;
         ptl_hdr_t           hdr;
         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
-        char                ipbuf[PTL_NALFMT_SIZE];
-        char                ipbuf2[PTL_NALFMT_SIZE];
+        int                 i;
+        int                 rc;
 
-        LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
+        LASSERT (conn->ksnc_type != SOCKNAL_CONN_NONE);
+        LASSERT (nipaddrs <= SOCKNAL_MAX_INTERFACES);
+
+        /* No need for getconnsock/putconnsock */
+        LASSERT (!conn->ksnc_closing);
 
-        memset (&hdr, 0, sizeof (hdr));
-        hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
-        hmv->version_major = __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
-        hmv->version_minor = __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
+        LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
+        hmv->magic         = cpu_to_le32 (PORTALS_PROTO_MAGIC);
+        hmv->version_major = cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
+        hmv->version_minor = cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
 
-        hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
-        hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
+        hdr.src_nid        = cpu_to_le64 (ksocknal_lib.libnal_ni.ni_pid.nid);
+        hdr.type           = cpu_to_le32 (PTL_MSG_HELLO);
+        hdr.payload_length = cpu_to_le32 (nipaddrs * sizeof(*ipaddrs));
 
-        hdr.msg.hello.type = __cpu_to_le32 (*type);
+        hdr.msg.hello.type = cpu_to_le32 (conn->ksnc_type);
         hdr.msg.hello.incarnation =
-                __cpu_to_le64 (ksocknal_data.ksnd_incarnation);
+                cpu_to_le64 (ksocknal_data.ksnd_incarnation);
 
-        /* Assume sufficient socket buffering for this message */
-        rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
+        /* Receiver is eager */
+        rc = ksocknal_sock_write (sock, &hdr, sizeof(hdr));
         if (rc != 0) {
-                CERROR ("Error %d sending HELLO to "LPX64" %s\n",
-                        rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
+                CERROR ("Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
+                        rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
                 return (rc);
         }
+        
+        if (nipaddrs == 0)
+                return (0);
+        
+        for (i = 0; i < nipaddrs; i++) {
+                ipaddrs[i] = __cpu_to_le32 (ipaddrs[i]);
+        }
+
+        rc = ksocknal_sock_write (sock, ipaddrs, nipaddrs * sizeof(*ipaddrs));
+        if (rc != 0)
+                CERROR ("Error %d sending HELLO payload (%d)"
+                        " to %u.%u.%u.%u/%d\n", rc, nipaddrs, 
+                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
+        return (rc);
+}
+
+int
+ksocknal_invert_type(int type)
+{
+        switch (type)
+        {
+        case SOCKNAL_CONN_ANY:
+        case SOCKNAL_CONN_CONTROL:
+                return (type);
+        case SOCKNAL_CONN_BULK_IN:
+                return SOCKNAL_CONN_BULK_OUT;
+        case SOCKNAL_CONN_BULK_OUT:
+                return SOCKNAL_CONN_BULK_IN;
+        default:
+                return (SOCKNAL_CONN_NONE);
+        }
+}
+
+int
+ksocknal_recv_hello (ksock_conn_t *conn, ptl_nid_t *nid,
+                     __u64 *incarnation, __u32 *ipaddrs)
+{
+        struct socket      *sock = conn->ksnc_sock;
+        int                 rc;
+        int                 nips;
+        int                 i;
+        int                 type;
+        ptl_hdr_t           hdr;
+        ptl_magicversion_t *hmv;
+
+        hmv = (ptl_magicversion_t *)&hdr.dest_nid;
+        LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
 
         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
         if (rc != 0) {
-                CERROR ("Error %d reading HELLO from "LPX64" %s\n",
-                        rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
+                CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
+                        rc, HIPQUAD(conn->ksnc_ipaddr));
                 return (rc);
         }
 
-        if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
-                CERROR ("Bad magic %#08x (%#08x expected) from "LPX64" %s\n",
-                        __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid,
-                        portals_nid2str(SOCKNAL, *nid, ipbuf));
+        if (hmv->magic != le32_to_cpu (PORTALS_PROTO_MAGIC)) {
+                CERROR ("Bad magic %#08x (%#08x expected) from %u.%u.%u.%u\n",
+                        __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC,
+                        HIPQUAD(conn->ksnc_ipaddr));
                 return (-EPROTO);
         }
 
-        if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
-            hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
+        if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
+            hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
-                        " from "LPX64" %s\n",
-                        __le16_to_cpu (hmv->version_major),
-                        __le16_to_cpu (hmv->version_minor),
+                        " from %u.%u.%u.%u\n",
+                        le16_to_cpu (hmv->version_major),
+                        le16_to_cpu (hmv->version_minor),
                         PORTALS_PROTO_VERSION_MAJOR,
                         PORTALS_PROTO_VERSION_MINOR,
-                        *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
+                        HIPQUAD(conn->ksnc_ipaddr));
                 return (-EPROTO);
         }
 
-#if (PORTALS_PROTO_VERSION_MAJOR != 0)
-# error "This code only understands protocol version 0.x"
+#if (PORTALS_PROTO_VERSION_MAJOR != 1)
+# error "This code only understands protocol version 1.x"
 #endif
-        /* version 0 sends magic/version as the dest_nid of a 'hello' header,
-         * so read the rest of it in now... */
+        /* version 1 sends magic/version as the dest_nid of a 'hello'
+         * header, followed by payload full of interface IP addresses.
+         * Read the rest of it in now... */
 
         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
         if (rc != 0) {
-                CERROR ("Error %d reading rest of HELLO hdr from "LPX64" %s\n",
-                        rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
+                CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n",
+                        rc, HIPQUAD(conn->ksnc_ipaddr));
                 return (rc);
         }
 
         /* ...and check we got what we expected */
-        if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
-            hdr.payload_length != __cpu_to_le32 (0)) {
-                CERROR ("Expecting a HELLO hdr with 0 payload,"
-                        " but got type %d with %d payload from "LPX64" %s\n",
-                        __le32_to_cpu (hdr.type),
-                        __le32_to_cpu (hdr.payload_length), *nid,
-                        portals_nid2str(SOCKNAL, *nid, ipbuf));
+        if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
+                CERROR ("Expecting a HELLO hdr,"
+                        " but got type %d from %u.%u.%u.%u\n",
+                        le32_to_cpu (hdr.type),
+                        HIPQUAD(conn->ksnc_ipaddr));
                 return (-EPROTO);
         }
 
-        if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
-                CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
+        if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
+                CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY"
+                       "from %u.%u.%u.%u\n", HIPQUAD(conn->ksnc_ipaddr));
                 return (-EPROTO);
         }
 
         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
-                *nid = __le64_to_cpu(hdr.src_nid);
-        } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
-                CERROR ("Connected to nid "LPX64" %s, but expecting "LPX64" %s\n",
-                        __le64_to_cpu (hdr.src_nid),
-                        portals_nid2str(SOCKNAL,
-                                        __le64_to_cpu(hdr.src_nid),
-                                        ipbuf),
-                        *nid, portals_nid2str(SOCKNAL, *nid, ipbuf2));
+                *nid = le64_to_cpu(hdr.src_nid);
+        } else if (*nid != le64_to_cpu (hdr.src_nid)) {
+                CERROR ("Connected to nid "LPX64"@%u.%u.%u.%u "
+                        "but expecting "LPX64"\n",
+                        le64_to_cpu (hdr.src_nid),
+                        HIPQUAD(conn->ksnc_ipaddr), *nid);
                 return (-EPROTO);
         }
 
-        if (*type == SOCKNAL_CONN_NONE) {
+        type = __le32_to_cpu(hdr.msg.hello.type);
+
+        if (conn->ksnc_type == SOCKNAL_CONN_NONE) {
                 /* I've accepted this connection; peer determines type */
-                *type = __le32_to_cpu(hdr.msg.hello.type);
-                switch (*type) {
-                case SOCKNAL_CONN_ANY:
-                case SOCKNAL_CONN_CONTROL:
-                        break;
-                case SOCKNAL_CONN_BULK_IN:
-                        *type = SOCKNAL_CONN_BULK_OUT;
-                        break;
-                case SOCKNAL_CONN_BULK_OUT:
-                        *type = SOCKNAL_CONN_BULK_IN;
-                        break;
-                default:
-                        CERROR ("Unexpected type %d from "LPX64" %s\n",
-                                *type, *nid,
-                                portals_nid2str(SOCKNAL, *nid, ipbuf));
+                conn->ksnc_type = ksocknal_invert_type(type);
+                if (conn->ksnc_type == SOCKNAL_CONN_NONE) {
+                        CERROR ("Unexpected type %d from "LPX64"@%u.%u.%u.%u\n",
+                                type, *nid, HIPQUAD(conn->ksnc_ipaddr));
                         return (-EPROTO);
                 }
-        } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) {
-                CERROR ("Mismatched types: me %d "LPX64" %s %d\n",
-                        *type, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf),
-                        __le32_to_cpu(hdr.msg.hello.type));
+        } else if (ksocknal_invert_type(type) != conn->ksnc_type) {
+                CERROR ("Mismatched types: me %d, "LPX64"@%u.%u.%u.%u %d\n",
+                        conn->ksnc_type, *nid, HIPQUAD(conn->ksnc_ipaddr),
+                        le32_to_cpu(hdr.msg.hello.type));
                 return (-EPROTO);
         }
 
-        *incarnation = __le64_to_cpu(hdr.msg.hello.incarnation);
+        *incarnation = le64_to_cpu(hdr.msg.hello.incarnation);
 
-        return (0);
+        nips = __le32_to_cpu (hdr.payload_length) / sizeof (__u32);
+
+        if (nips > SOCKNAL_MAX_INTERFACES ||
+            nips * sizeof(__u32) != __le32_to_cpu (hdr.payload_length)) {
+                CERROR("Bad payload length %d from "LPX64"@%u.%u.%u.%u\n",
+                       __le32_to_cpu (hdr.payload_length),
+                       *nid, HIPQUAD(conn->ksnc_ipaddr));
+        }
+
+        if (nips == 0)
+                return (0);
+        
+        rc = ksocknal_sock_read (sock, ipaddrs, nips * sizeof(*ipaddrs));
+        if (rc != 0) {
+                CERROR ("Error %d reading IPs from "LPX64"@%u.%u.%u.%u\n",
+                        rc, *nid, HIPQUAD(conn->ksnc_ipaddr));
+                return (rc);
+        }
+
+        for (i = 0; i < nips; i++) {
+                ipaddrs[i] = __le32_to_cpu(ipaddrs[i]);
+                
+                if (ipaddrs[i] == 0) {
+                        CERROR("Zero IP[%d] from "LPX64"@%u.%u.%u.%u\n",
+                               i, *nid, HIPQUAD(conn->ksnc_ipaddr));
+                        return (-EPROTO);
+                }
+        }
+
+        return (nips);
+}
+
+int
+ksocknal_get_conn_tunables (ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
+{
+        mm_segment_t   oldmm = get_fs ();
+        struct socket *sock = conn->ksnc_sock;
+        int            len;
+        int            rc;
+
+        rc = ksocknal_getconnsock (conn);
+        if (rc != 0) {
+                LASSERT (conn->ksnc_closing);
+                *txmem = *rxmem = *nagle = 0;
+                return (-ESHUTDOWN);
+        }
+        
+        set_fs (KERNEL_DS);
+
+        len = sizeof(*txmem);
+        rc = sock_getsockopt(sock, SOL_SOCKET, SO_SNDBUF,
+                             (char *)txmem, &len);
+        if (rc == 0) {
+                len = sizeof(*rxmem);
+                rc = sock_getsockopt(sock, SOL_SOCKET, SO_RCVBUF,
+                                     (char *)rxmem, &len);
+        }
+        if (rc == 0) {
+                len = sizeof(*nagle);
+                rc = sock->ops->getsockopt(sock, SOL_TCP, TCP_NODELAY,
+                                           (char *)nagle, &len);
+        }
+
+        set_fs (oldmm);
+        ksocknal_putconnsock (conn);
+
+        if (rc == 0)
+                *nagle = !*nagle;
+        else
+                *txmem = *rxmem = *nagle = 0;
+                
+        return (rc);
 }
 
 int
@@ -2148,13 +2292,13 @@ ksocknal_setup_sock (struct socket *sock)
         mm_segment_t    oldmm = get_fs ();
         int             rc;
         int             option;
+        int             keep_idle;
+        int             keep_intvl;
+        int             keep_count;
+        int             do_keepalive;
         struct linger   linger;
 
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
         sock->sk->sk_allocation = GFP_NOFS;
-#else
-        sock->sk->allocation = GFP_NOFS;
-#endif
 
         /* Ensure this socket aborts active sends immediately when we close
          * it. */
@@ -2181,65 +2325,121 @@ ksocknal_setup_sock (struct socket *sock)
                 return (rc);
         }
 
-#if SOCKNAL_USE_KEEPALIVES
-        /* Keepalives: If 3/4 of the timeout elapses, start probing every
-         * second until the timeout elapses. */
+        if (!ksocknal_tunables.ksnd_nagle) {
+                option = 1;
+                
+                set_fs (KERNEL_DS);
+                rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
+                                            (char *)&option, sizeof (option));
+                set_fs (oldmm);
+                if (rc != 0) {
+                        CERROR ("Can't disable nagle: %d\n", rc);
+                        return (rc);
+                }
+        }
+        
+        if (ksocknal_tunables.ksnd_buffer_size > 0) {
+                option = ksocknal_tunables.ksnd_buffer_size;
+                
+                set_fs (KERNEL_DS);
+                rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
+                                      (char *)&option, sizeof (option));
+                set_fs (oldmm);
+                if (rc != 0) {
+                        CERROR ("Can't set send buffer %d: %d\n",
+                                option, rc);
+                        return (rc);
+                }
+
+                set_fs (KERNEL_DS);
+                rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
+                                      (char *)&option, sizeof (option));
+                set_fs (oldmm);
+                if (rc != 0) {
+                        CERROR ("Can't set receive buffer %d: %d\n",
+                                option, rc);
+                        return (rc);
+                }
+        }
+
+        /* snapshot tunables */
+        keep_idle  = ksocknal_tunables.ksnd_keepalive_idle;
+        keep_count = ksocknal_tunables.ksnd_keepalive_count;
+        keep_intvl = ksocknal_tunables.ksnd_keepalive_intvl;
+        
+        do_keepalive = (keep_idle > 0 && keep_count > 0 && keep_intvl > 0);
 
-        option = (ksocknal_tunables.ksnd_io_timeout * 3) / 4;
+        option = (do_keepalive ? 1 : 0);
         set_fs (KERNEL_DS);
-        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
-                                    (char *)&option, sizeof (option));
+        rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
+                              (char *)&option, sizeof (option));
         set_fs (oldmm);
         if (rc != 0) {
-                CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
+                CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
                 return (rc);
         }
-        
-        option = 1;
+
+        if (!do_keepalive)
+                return (0);
+
         set_fs (KERNEL_DS);
-        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
-                                    (char *)&option, sizeof (option));
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
+                                    (char *)&keep_idle, sizeof (keep_idle));
         set_fs (oldmm);
         if (rc != 0) {
-                CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
+                CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
                 return (rc);
         }
-        
-        option = ksocknal_tunables.ksnd_io_timeout / 4;
+
         set_fs (KERNEL_DS);
-        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
-                                    (char *)&option, sizeof (option));
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
+                                    (char *)&keep_intvl, sizeof (keep_intvl));
         set_fs (oldmm);
         if (rc != 0) {
                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
                 return (rc);
         }
 
-        option = 1;
         set_fs (KERNEL_DS);
-        rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
-                              (char *)&option, sizeof (option));
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
+                                    (char *)&keep_count, sizeof (keep_count));
         set_fs (oldmm);
         if (rc != 0) {
-                CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
+                CERROR ("Can't set TCP_KEEPCNT: %d\n", rc);
                 return (rc);
         }
-#endif
+
         return (0);
 }
 
-int
-ksocknal_connect_peer (ksock_route_t *route, int type)
+static int
+ksocknal_connect_sock(struct socket **sockp, int *may_retry, 
+                      ksock_route_t *route, int local_port)
 {
-        struct sockaddr_in  peer_addr;
-        mm_segment_t        oldmm = get_fs();
-        struct timeval      tv;
-        int                 fd;
+        struct sockaddr_in  locaddr;
+        struct sockaddr_in  srvaddr;
         struct socket      *sock;
         int                 rc;
-        char                ipbuf[PTL_NALFMT_SIZE];
+        int                 option;
+        mm_segment_t        oldmm = get_fs();
+        struct timeval      tv;
+
+        memset(&locaddr, 0, sizeof(locaddr)); 
+        locaddr.sin_family = AF_INET; 
+        locaddr.sin_port = htons(local_port);
+        locaddr.sin_addr.s_addr = 
+                (route->ksnr_myipaddr != 0) ? htonl(route->ksnr_myipaddr) 
+                                            : INADDR_ANY;
+        memset (&srvaddr, 0, sizeof (srvaddr));
+        srvaddr.sin_family = AF_INET;
+        srvaddr.sin_port = htons (route->ksnr_port);
+        srvaddr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
+
+        *may_retry = 0;
 
         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
+        *sockp = sock;
         if (rc != 0) {
                 CERROR ("Can't create autoconnect socket: %d\n", rc);
                 return (rc);
@@ -2249,17 +2449,23 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
          * from userspace.  And we actually need the sock->file refcounting
          * that this gives you :) */
 
-        fd = sock_map_fd (sock);
-        if (fd < 0) {
+        rc = sock_map_fd (sock);
+        if (rc < 0) {
                 sock_release (sock);
-                CERROR ("sock_map_fd error %d\n", fd);
-                return (fd);
+                CERROR ("sock_map_fd error %d\n", rc);
+                return (rc);
         }
 
-        /* NB the fd now owns the ref on sock->file */
+        /* NB the file descriptor (rc) now owns the ref on sock->file */
         LASSERT (sock->file != NULL);
         LASSERT (file_count(sock->file) == 1);
 
+        get_file(sock->file);                /* extra ref makes sock->file */
+        sys_close(rc);                       /* survive this close */
+
+        /* Still got a single ref on sock->file */
+        LASSERT (file_count(sock->file) == 1);
+
         /* Set the socket timeouts, so our connection attempt completes in
          * finite time */
         tv.tv_sec = ksocknal_tunables.ksnd_io_timeout;
@@ -2272,7 +2478,7 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
         if (rc != 0) {
                 CERROR ("Can't set send timeout %d: %d\n", 
                         ksocknal_tunables.ksnd_io_timeout, rc);
-                goto out;
+                goto failed;
         }
         
         set_fs (KERNEL_DS);
@@ -2282,72 +2488,83 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
         if (rc != 0) {
                 CERROR ("Can't set receive timeout %d: %d\n",
                         ksocknal_tunables.ksnd_io_timeout, rc);
-                goto out;
+                goto failed;
         }
 
-        {
-                int  option = 1;
-                
-                set_fs (KERNEL_DS);
-                rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
-                                            (char *)&option, sizeof (option));
-                set_fs (oldmm);
-                if (rc != 0) {
-                        CERROR ("Can't disable nagle: %d\n", rc);
-                        goto out;
-                }
+        set_fs (KERNEL_DS);
+        option = 1;
+        rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
+                             (char *)&option, sizeof (option)); 
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR("Can't set SO_REUSEADDR for socket: %d\n", rc);
+                goto failed;
         }
-        
-        if (route->ksnr_buffer_size != 0) {
-                int option = route->ksnr_buffer_size;
-                
-                set_fs (KERNEL_DS);
-                rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
-                                      (char *)&option, sizeof (option));
-                set_fs (oldmm);
-                if (rc != 0) {
-                        CERROR ("Can't set send buffer %d: %d\n",
-                                route->ksnr_buffer_size, rc);
-                        goto out;
-                }
 
-                set_fs (KERNEL_DS);
-                rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
-                                      (char *)&option, sizeof (option));
-                set_fs (oldmm);
-                if (rc != 0) {
-                        CERROR ("Can't set receive buffer %d: %d\n",
-                                route->ksnr_buffer_size, rc);
-                        goto out;
-                }
+        rc = sock->ops->bind(sock, 
+                             (struct sockaddr *)&locaddr, sizeof(locaddr));
+        if (rc == -EADDRINUSE) {
+                CDEBUG(D_NET, "Port %d already in use\n", local_port);
+                *may_retry = 1;
+                goto failed;
         }
-        
-        memset (&peer_addr, 0, sizeof (peer_addr));
-        peer_addr.sin_family = AF_INET;
-        peer_addr.sin_port = htons (route->ksnr_port);
-        peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
-        
-        rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
-                                 sizeof (peer_addr), sock->file->f_flags);
         if (rc != 0) {
-                CERROR ("Error %d connecting to "LPX64" %s\n", rc,
-                        route->ksnr_peer->ksnp_nid,
-                        portals_nid2str(SOCKNAL,
-                                        route->ksnr_peer->ksnp_nid,
-                                        ipbuf));
-                goto out;
+                CERROR("Error trying to bind to reserved port %d: %d\n",
+                       local_port, rc);
+                goto failed;
         }
+
+        rc = sock->ops->connect(sock,
+                                (struct sockaddr *)&srvaddr, sizeof(srvaddr),
+                                sock->file->f_flags);
+        if (rc == 0)
+                return 0;
+
+        /* EADDRNOTAVAIL probably means we're already connected to the same
+         * peer/port on the same local port on a differently typed
+         * connection.  Let our caller retry with a different local
+         * port... */
+        *may_retry = (rc == -EADDRNOTAVAIL);
+
+        CDEBUG(*may_retry ? D_NET : D_ERROR,
+               "Error %d connecting %u.%u.%u.%u/%d -> %u.%u.%u.%u/%d\n", rc,
+               HIPQUAD(route->ksnr_myipaddr), local_port,
+               HIPQUAD(route->ksnr_ipaddr), route->ksnr_port);
+
+ failed:
+        fput(sock->file);
+        return rc;
+}
+
+int
+ksocknal_connect_peer (ksock_route_t *route, int type)
+{
+        struct socket      *sock;
+        int                 rc;
+        int                 port;
+        int                 may_retry;
         
-        rc = ksocknal_create_conn (route, sock, route->ksnr_irq_affinity, type);
-        if (rc == 0) {
-                /* Take an extra ref on sock->file to compensate for the
-                 * upcoming close which will lose fd's ref on it. */
-                get_file (sock->file);
+        /* Iterate through reserved ports.  When typed connections are
+         * used, we will need to bind to multiple ports, but we only know
+         * this at connect time.  But, by that time we've already called
+         * bind() so we need a new socket. */
+
+        for (port = 1023; port > 512; --port) {
+
+                rc = ksocknal_connect_sock(&sock, &may_retry, route, port);
+
+                if (rc == 0) {
+                        rc = ksocknal_create_conn(route, sock, type);
+                        fput(sock->file);
+                        return rc;
+                }
+
+                if (!may_retry)
+                        return rc;
         }
 
- out:
-        sys_close (fd);
-        return (rc);
+        CERROR("Out of ports trying to bind to a reserved port\n");
+        return (-EADDRINUSE);
 }
 
 void
@@ -2367,7 +2584,6 @@ ksocknal_autoconnect (ksock_route_t *route)
                 LASSERT (type < SOCKNAL_CONN_NTYPES);
 
                 rc = ksocknal_connect_peer (route, type);
-
                 if (rc != 0)
                         break;
                 
@@ -2407,12 +2623,13 @@ ksocknal_autoconnect (ksock_route_t *route)
                 } while (!list_empty (&peer->ksnp_tx_queue));
         }
 
-        /* make this route least-favourite for re-selection */
+#if 0           /* irrelevent with only eager routes */
         if (!route->ksnr_deleted) {
+                /* make this route least-favourite for re-selection */
                 list_del(&route->ksnr_list);
                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
         }
-        
+#endif        
         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
 
         while (!list_empty (&zombies)) {
@@ -2421,15 +2638,15 @@ ksocknal_autoconnect (ksock_route_t *route)
                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
 
                 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
-                        NTOH__u32 (tx->tx_hdr->type),
-                        NTOH__u32 (tx->tx_hdr->payload_length),
-                        NTOH__u64 (tx->tx_hdr->src_nid),
+                        le32_to_cpu (tx->tx_hdr->type),
+                        le32_to_cpu (tx->tx_hdr->payload_length),
+                        le64_to_cpu (tx->tx_hdr->src_nid),
                         portals_nid2str(SOCKNAL,
-                                        NTOH__u64(tx->tx_hdr->src_nid),
+                                        le64_to_cpu(tx->tx_hdr->src_nid),
                                         ipbuf),
-                        NTOH__u64 (tx->tx_hdr->dest_nid),
+                        le64_to_cpu (tx->tx_hdr->dest_nid),
                         portals_nid2str(SOCKNAL,
-                                        NTOH__u64(tx->tx_hdr->src_nid),
+                                        le64_to_cpu(tx->tx_hdr->src_nid),
                                         ipbuf2));
 
                 list_del (&tx->tx_list);
@@ -2458,24 +2675,26 @@ ksocknal_autoconnectd (void *arg)
                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
                                             ksock_route_t, ksnr_connect_list);
-                        
+
                         list_del (&route->ksnr_connect_list);
                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
 
                         ksocknal_autoconnect (route);
                         ksocknal_put_route (route);
 
-                        spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
+                        spin_lock_irqsave(&ksocknal_data.ksnd_autoconnectd_lock,
+                                          flags);
                         continue;
                 }
-                
-                spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
 
-                rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
-                                               ksocknal_data.ksnd_shuttingdown ||
-                                               !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
+                spin_unlock_irqrestore(&ksocknal_data.ksnd_autoconnectd_lock,
+                                       flags);
 
-                spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
+                rc = wait_event_interruptible(ksocknal_data.ksnd_autoconnectd_waitq,
+                                              ksocknal_data.ksnd_shuttingdown ||
+                                              !list_empty(&ksocknal_data.ksnd_autoconnectd_routes));
+
+                spin_lock_irqsave(&ksocknal_data.ksnd_autoconnectd_lock, flags);
         }
 
         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
@@ -2490,32 +2709,39 @@ ksocknal_find_timed_out_conn (ksock_peer_t *peer)
         /* We're called with a shared lock on ksnd_global_lock */
         ksock_conn_t      *conn;
         struct list_head  *ctmp;
-        ksock_sched_t     *sched;
 
         list_for_each (ctmp, &peer->ksnp_conns) {
                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
-                sched = conn->ksnc_scheduler;
 
                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
                 LASSERT (!conn->ksnc_closing);
-                
+
+                if (conn->ksnc_sock->sk->sk_err != 0) {
+                        /* Something (e.g. failed keepalive) set the socket error */
+                        atomic_inc (&conn->ksnc_refcount);
+                        CERROR ("Socket error %d: "LPX64" %p %d.%d.%d.%d\n",
+                                conn->ksnc_sock->sk->sk_err, peer->ksnp_nid,
+                                conn, HIPQUAD(conn->ksnc_ipaddr));
+                        return (conn);
+                }
+
                 if (conn->ksnc_rx_started &&
                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
                         /* Timed out incomplete incoming message */
                         atomic_inc (&conn->ksnc_refcount);
                         CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
-                                peer->ksnp_nid, conn, HIPQUAD(conn->ksnc_ipaddr));
+                                peer->ksnp_nid,conn,HIPQUAD(conn->ksnc_ipaddr));
                         return (conn);
                 }
-                
+
                 if ((!list_empty (&conn->ksnc_tx_queue) ||
                      conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
-                        /* Timed out messages queued for sending, or
-                         * messages buffered in the socket's send buffer */
+                        /* Timed out messages queued for sending or
+                         * buffered in the socket's send buffer */
                         atomic_inc (&conn->ksnc_refcount);
-                        CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n", 
-                                peer->ksnp_nid, 
+                        CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n",
+                                peer->ksnp_nid,
                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
                                 conn->ksnc_sock->sk->sk_wmem_queued, conn,
                                 HIPQUAD(conn->ksnc_ipaddr));
@@ -2698,19 +2924,11 @@ ksocknal_reaper (void *arg)
         return (0);
 }
 
-nal_cb_t ksocknal_lib = {
-        nal_data:       &ksocknal_data,                /* NAL private data */
-        cb_send:         ksocknal_send,
-        cb_send_pages:   ksocknal_send_pages,
-        cb_recv:         ksocknal_recv,
-        cb_recv_pages:   ksocknal_recv_pages,
-        cb_read:         ksocknal_read,
-        cb_write:        ksocknal_write,
-        cb_malloc:       ksocknal_malloc,
-        cb_free:         ksocknal_free,
-        cb_printf:       ksocknal_printf,
-        cb_cli:          ksocknal_cli,
-        cb_sti:          ksocknal_sti,
-        cb_callback:     ksocknal_callback,
-        cb_dist:         ksocknal_dist
+lib_nal_t ksocknal_lib = {
+        libnal_data:       &ksocknal_data,      /* NAL private data */
+        libnal_send:        ksocknal_send,
+        libnal_send_pages:  ksocknal_send_pages,
+        libnal_recv:        ksocknal_recv,
+        libnal_recv_pages:  ksocknal_recv_pages,
+        libnal_dist:        ksocknal_dist
 };