Whamcloud - gitweb
* Added socknal multi-frag I/O
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
index b22d501..ed91f94 100644 (file)
@@ -84,21 +84,17 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
         struct iovec  *iov = tx->tx_iov;
-        int            fragsize = iov->iov_len;
-        unsigned long  vaddr = (unsigned long)iov->iov_base;
-        int            more = (tx->tx_niov > 1) || 
-                              (tx->tx_nkiov > 0) ||
-                              (!list_empty (&conn->ksnc_tx_queue));
 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
+        unsigned long  vaddr = (unsigned long)iov->iov_base
         int            offset = vaddr & (PAGE_SIZE - 1);
-        int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
+        int            zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
         struct page   *page;
 #endif
+        int            nob;
         int            rc;
 
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only send 1 frag at a time. */
-        LASSERT (fragsize <= tx->tx_resid);
+         * or leave them alone. */
         LASSERT (tx->tx_niov > 0);
         
 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
@@ -106,52 +102,74 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
+                int msgflg = MSG_DONTWAIT;
                 
                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
                        (void *)vaddr, page, page_address(page), offset, zcsize);
 
-                if (fragsize > zcsize) {
-                        more = 1;
-                        fragsize = zcsize;
-                }
-
-                rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
-                                       more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
-                                       &tx->tx_zccd);
+                if (!list_empty (&conn->ksnc_tx_queue) ||
+                    zcsize < tx->tx_resid)
+                        msgflg |= MSG_MORE;
+                
+                rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
         } else
 #endif
         {
-                /* NB don't pass tx's iov; sendmsg may or may not update it */
-                struct iovec fragiov = { .iov_base = (void *)vaddr,
-                                         .iov_len  = fragsize};
+#if SOCKNAL_SINGLE_FRAG_TX
+                struct iovec    scratch;
+                struct iovec   *scratchiov = &scratch;
+                int             niov = 1;
+#else
+                struct iovec   *scratchiov = conn->ksnc_tx_scratch_iov;
+                int             niov = tx->tx_niov;
+#endif
                 struct msghdr msg = {
                         .msg_name       = NULL,
                         .msg_namelen    = 0,
-                        .msg_iov        = &fragiov,
-                        .msg_iovlen     = 1,
+                        .msg_iov        = scratchiov,
+                        .msg_iovlen     = niov,
                         .msg_control    = NULL,
                         .msg_controllen = 0,
-                        .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
+                        .msg_flags      = MSG_DONTWAIT
                 };
                 mm_segment_t oldmm = get_fs();
+                int  i;
 
+                for (nob = i = 0; i < niov; i++) {
+                        scratchiov[i] = tx->tx_iov[i];
+                        nob += scratchiov[i].iov_len;
+                }
+
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    nob < tx->tx_resid)
+                        msg.msg_flags |= MSG_MORE;
+                
                 set_fs (KERNEL_DS);
-                rc = sock_sendmsg(sock, &msg, fragsize);
+                rc = sock_sendmsg(sock, &msg, nob);
                 set_fs (oldmm);
         } 
 
-        if (rc > 0) {
-                tx->tx_resid -= rc;
+        if (rc <= 0)                            /* sent nothing? */
+                return (rc);
 
-                if (rc < iov->iov_len) {
-                        /* didn't send whole iov entry... */
-                        iov->iov_base = (void *)(vaddr + rc);
-                        iov->iov_len -= rc;
-                } else {
-                        tx->tx_iov++;
-                        tx->tx_niov--;
+        nob = rc;
+        LASSERT (nob <= tx->tx_resid);
+        tx->tx_resid -= nob;
+
+        /* "consume" iov */
+        do {
+                LASSERT (tx->tx_niov > 0);
+                
+                if (nob < iov->iov_len) {
+                        iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
+                        iov->iov_len -= nob;
+                        return (rc);
                 }
-        }
+
+                nob -= iov->iov_len;
+                tx->tx_iov = ++iov;
+                tx->tx_niov--;
+        } while (nob != 0);
         
         return (rc);
 }
@@ -161,66 +179,94 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
         struct socket *sock = conn->ksnc_sock;
         ptl_kiov_t    *kiov = tx->tx_kiov;
-        int            fragsize = kiov->kiov_len;
-        struct page   *page = kiov->kiov_page;
-        int            offset = kiov->kiov_offset;
-        int            more = (tx->tx_nkiov > 1) ||
-                              (!list_empty (&conn->ksnc_tx_queue));
         int            rc;
-
+        int            nob;
+        
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only send 1 frag at a time. */
-        LASSERT (fragsize <= tx->tx_resid);
-        LASSERT (offset + fragsize <= PAGE_SIZE);
+         * or leave them alone. */
         LASSERT (tx->tx_niov == 0);
         LASSERT (tx->tx_nkiov > 0);
 
 #if SOCKNAL_ZC
-        if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag &&
+        if (kiov->kiov_len >= ksocknal_tunables.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
+                struct page   *page = kiov->kiov_page;
+                int            offset = kiov->kiov_offset;
+                int            fragsize = kiov->kiov_len;
+                int            msgflg = MSG_DONTWAIT;
 
                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
-                               page, offset, fragsize);
+                               page, offset, kiov->kiov_len);
+
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    fragsize < tx->tx_resid)
+                        msgflg |= MSG_MORE;
 
-                rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
-                                       more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
+                rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
                                        &tx->tx_zccd);
         } else
 #endif
         {
-                char *addr = ((char *)kmap (page)) + offset;
-                struct iovec fragiov = {.iov_base = addr,
-                                        .iov_len  = fragsize};
+#if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
+                struct iovec  scratch;
+                struct iovec *scratchiov = &scratch;
+                int           niov = 1;
+#else
+#warning "XXX risk of kmap deadlock on multiple frags..."
+                struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
+                int           niov = tx->tx_nkiov;
+#endif
                 struct msghdr msg = {
                         .msg_name       = NULL,
                         .msg_namelen    = 0,
-                        .msg_iov        = &fragiov,
-                        .msg_iovlen     = 1,
+                        .msg_iov        = scratchiov,
+                        .msg_iovlen     = niov,
                         .msg_control    = NULL,
                         .msg_controllen = 0,
-                        .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
+                        .msg_flags      = MSG_DONTWAIT
                 };
                 mm_segment_t  oldmm = get_fs();
+                int           i;
                 
+                for (nob = i = 0; i < niov; i++) {
+                        scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
+                                                 kiov[i].kiov_offset;
+                        nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+                }
+
+                if (!list_empty(&conn->ksnc_tx_queue) ||
+                    nob < tx->tx_resid)
+                        msg.msg_flags |= MSG_DONTWAIT;
+
                 set_fs (KERNEL_DS);
-                rc = sock_sendmsg(sock, &msg, fragsize);
+                rc = sock_sendmsg(sock, &msg, nob);
                 set_fs (oldmm);
 
-                kunmap (page);
+                for (i = 0; i < niov; i++)
+                        kunmap(kiov[i].kiov_page);
         }
 
-        if (rc > 0) {
-                tx->tx_resid -= rc;
-                if (rc < fragsize) {
-                        kiov->kiov_offset = offset + rc;
-                        kiov->kiov_len    = fragsize - rc;
-                } else {
-                        tx->tx_kiov++;
-                        tx->tx_nkiov--;
+        if (rc <= 0)                            /* sent nothing? */
+                return (rc);
+
+        nob = rc;
+        LASSERT (nob <= tx->tx_resid);
+        tx->tx_resid -= nob;
+
+        do {
+                LASSERT(tx->tx_nkiov > 0);
+                
+                if (nob < kiov->kiov_len) {
+                        kiov->kiov_offset += nob;
+                        kiov->kiov_len -= nob;
+                        return rc;
                 }
-        }
+                
+                nob -= kiov->kiov_len;
+                tx->tx_kiov = ++kiov;
+                tx->tx_nkiov--;
+        } while (nob != 0);
 
         return (rc);
 }
@@ -269,35 +315,35 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                         mb();
                 }
 
-                if (rc <= 0) {
-                        /* Didn't write anything.
-                         *
-                         * NB: rc == 0 and rc == -EAGAIN both mean try
-                         * again later (linux stack returns -EAGAIN for
-                         * this, but Adaptech TOE returns 0).
-                         *
-                         * Also, sends never fail with -ENOMEM, just
-                         * -EAGAIN, but with the added bonus that we can't
-                         * expect write_space() to call us back to tell us
-                         * when to try sending again.  We use the
-                         * SOCK_NOSPACE flag to diagnose...  */
-
-                        LASSERT(rc != -ENOMEM);
-
-                        if (rc == 0 || rc == -EAGAIN) {
-                                if (test_bit(SOCK_NOSPACE, 
-                                             &conn->ksnc_sock->flags)) {
-                                        rc = -EAGAIN;
-                                } else {
-                                        static int counter;
-                         
-                                        counter++;
-                                        if ((counter & (-counter)) == counter)
-                                                CWARN("%d ENOMEM tx %p\n", 
-                                                      counter, conn);
-                                        rc = -ENOMEM;
-                                }
+                if (rc <= 0) { /* Didn't write anything? */
+                        unsigned long  flags;
+                        ksock_sched_t *sched;
+
+                        if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
+                                rc = -EAGAIN;
+
+                        if (rc != -EAGAIN)
+                                break;
+
+                        /* Check if EAGAIN is due to memory pressure */
+
+                        sched = conn->ksnc_scheduler;
+                        spin_lock_irqsave(&sched->kss_lock, flags);
+                                
+                        if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
+                            !conn->ksnc_tx_ready) {
+                                /* SOCK_NOSPACE is set when the socket fills
+                                 * and cleared in the write_space callback
+                                 * (which also sets ksnc_tx_ready).  If
+                                 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
+                                 * zero, I didn't fill the socket and
+                                 * write_space won't reschedule me, so I
+                                 * return -ENOMEM to get my caller to retry
+                                 * after a timeout */
+                                rc = -ENOMEM;
                         }
+
+                        spin_unlock_irqrestore(&sched->kss_lock, flags);
                         break;
                 }
 
@@ -332,114 +378,151 @@ ksocknal_eager_ack (ksock_conn_t *conn)
 int
 ksocknal_recv_iov (ksock_conn_t *conn)
 {
+#if SOCKNAL_SINGLE_FRAG_RX
+        struct iovec  scratch;
+        struct iovec *scratchiov = &scratch;
+        int           niov = 1;
+#else
+        struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+        int           niov = conn->ksnc_rx_niov;
+#endif
         struct iovec *iov = conn->ksnc_rx_iov;
-        int           fragsize  = iov->iov_len;
-        unsigned long vaddr = (unsigned long)iov->iov_base;
-        struct iovec  fragiov = { .iov_base = (void *)vaddr,
-                                  .iov_len  = fragsize};
         struct msghdr msg = {
                 .msg_name       = NULL,
                 .msg_namelen    = 0,
-                .msg_iov        = &fragiov,
-                .msg_iovlen     = 1,
+                .msg_iov        = scratchiov,
+                .msg_iovlen     = niov,
                 .msg_control    = NULL,
                 .msg_controllen = 0,
                 .msg_flags      = 0
         };
         mm_segment_t oldmm = get_fs();
+        int          nob;
+        int          i;
         int          rc;
 
         /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only receive 1 frag at a time. */
-        LASSERT (conn->ksnc_rx_niov > 0);
-        LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
+         * or leave them alone. */
+        LASSERT (niov > 0);
+
+        for (nob = i = 0; i < niov; i++) {
+                scratchiov[i] = iov[i];
+                nob += scratchiov[i].iov_len;
+        }
+        LASSERT (nob <= conn->ksnc_rx_nob_wanted);
 
         set_fs (KERNEL_DS);
-        rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
-        /* NB this is just a boolean............................^ */
+        rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
+        /* NB this is just a boolean..........................^ */
         set_fs (oldmm);
 
         if (rc <= 0)
                 return (rc);
 
         /* received something... */
+        nob = rc;
+
         conn->ksnc_peer->ksnp_last_alive = jiffies;
         conn->ksnc_rx_deadline = jiffies + 
                                  ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                           /* order with setting rx_started */
         conn->ksnc_rx_started = 1;
 
-        conn->ksnc_rx_nob_wanted -= rc;
-        conn->ksnc_rx_nob_left -= rc;
+        conn->ksnc_rx_nob_wanted -= nob;
+        conn->ksnc_rx_nob_left -= nob;
+
+        do {
+                LASSERT (conn->ksnc_rx_niov > 0);
                 
-        if (rc < fragsize) {
-                iov->iov_base = (void *)(vaddr + rc);
-                iov->iov_len = fragsize - rc;
-                return (-EAGAIN);
-        }
+                if (nob < iov->iov_len) {
+                        iov->iov_len -= nob;
+                        iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
+                        return (-EAGAIN);
+                }
+                
+                nob -= iov->iov_len;
+                conn->ksnc_rx_iov = ++iov;
+                conn->ksnc_rx_niov--;
+        } while (nob != 0);
 
-        conn->ksnc_rx_iov++;
-        conn->ksnc_rx_niov--;
-        return (1);
+        return (rc);
 }
 
 int
 ksocknal_recv_kiov (ksock_conn_t *conn)
 {
+#if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
+        struct iovec  scratch;
+        struct iovec *scratchiov = &scratch;
+        int           niov = 1;
+#else
+#warning "XXX risk of kmap deadlock on multiple frags..."
+        struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
+        int           niov = conn->ksnc_rx_nkiov;
+#endif   
         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
-        struct page  *page = kiov->kiov_page;
-        int           offset = kiov->kiov_offset;
-        int           fragsize = kiov->kiov_len;
-        unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
-        struct iovec  fragiov = { .iov_base = (void *)vaddr,
-                                  .iov_len  = fragsize};
         struct msghdr msg = {
                 .msg_name       = NULL,
                 .msg_namelen    = 0,
-                .msg_iov        = &fragiov,
-                .msg_iovlen     = 1,
+                .msg_iov        = scratchiov,
+                .msg_iovlen     = niov,
                 .msg_control    = NULL,
                 .msg_controllen = 0,
                 .msg_flags      = 0
         };
         mm_segment_t oldmm = get_fs();
+        int          nob;
+        int          i;
         int          rc;
 
-        /* NB we can't trust socket ops to either consume our iovs
-         * or leave them alone, so we only receive 1 frag at a time. */
-        LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
         LASSERT (conn->ksnc_rx_nkiov > 0);
-        LASSERT (offset + fragsize <= PAGE_SIZE);
+
+        /* NB we can't trust socket ops to either consume our iovs
+         * or leave them alone. */
+        for (nob = i = 0; i < niov; i++) {
+                scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
+                nob += scratchiov[i].iov_len = kiov[i].kiov_len;
+        }
+        LASSERT (nob <= conn->ksnc_rx_nob_wanted);
 
         set_fs (KERNEL_DS);
-        rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
-        /* NB this is just a boolean............................^ */
+        rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
+        /* NB this is just a boolean.......................^ */
         set_fs (oldmm);
 
-        kunmap (page);
-        
+        for (i = 0; i < niov; i++)
+                kunmap(kiov[i].kiov_page);
+
         if (rc <= 0)
                 return (rc);
         
         /* received something... */
+        nob = rc;
+
         conn->ksnc_peer->ksnp_last_alive = jiffies;
         conn->ksnc_rx_deadline = jiffies + 
                                  ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                           /* order with setting rx_started */
         conn->ksnc_rx_started = 1;
 
-        conn->ksnc_rx_nob_wanted -= rc;
-        conn->ksnc_rx_nob_left -= rc;
+        conn->ksnc_rx_nob_wanted -= nob;
+        conn->ksnc_rx_nob_left -= nob;
+
+        do {
+                LASSERT (conn->ksnc_rx_nkiov > 0);
                 
-        if (rc < fragsize) {
-                kiov->kiov_offset = offset + rc;
-                kiov->kiov_len = fragsize - rc;
-                return (-EAGAIN);
-        }
+                if (nob < kiov->kiov_len) {
+                        kiov->kiov_offset += nob;
+                        kiov->kiov_len -= nob;
+                        return -EAGAIN;
+                }
+                
+                nob -= kiov->kiov_len;
+                conn->ksnc_rx_kiov = ++kiov;
+                conn->ksnc_rx_nkiov--;
+        } while (nob != 0);
 
-        conn->ksnc_rx_kiov++;
-        conn->ksnc_rx_nkiov--;
-        return (1);
+        return 1;
 }
 
 int
@@ -599,6 +682,12 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                 return (rc);
 
         if (rc == -ENOMEM) {
+                static int counter;
+
+                counter++;   /* exponential backoff warnings */
+                if ((counter & (-counter)) == counter)
+                        CWARN("%d ENOMEM tx %p\n", counter, conn);
+
                 /* Queue on ksnd_enomem_conns for retry after a timeout */
                 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
 
@@ -1853,12 +1942,11 @@ ksocknal_write_space (struct sock *sk)
         }
 
         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
-                clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
-
                 sched = conn->ksnc_scheduler;
 
                 spin_lock_irqsave (&sched->kss_lock, flags);
 
+                clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
                 conn->ksnc_tx_ready = 1;
 
                 if (!conn->ksnc_tx_scheduled && // not being progressed
@@ -2324,17 +2412,34 @@ ksocknal_setup_sock (struct socket *sock)
         return (0);
 }
 
-int
-ksocknal_connect_peer (ksock_route_t *route, int type)
+static int
+ksocknal_connect_sock(struct socket **sockp, int *may_retry, 
+                      ksock_route_t *route, int local_port)
 {
-        struct sockaddr_in  ipaddr;
-        mm_segment_t        oldmm = get_fs();
-        struct timeval      tv;
-        int                 fd;
+        struct sockaddr_in  locaddr;
+        struct sockaddr_in  srvaddr;
         struct socket      *sock;
         int                 rc;
-        
+        int                 option;
+        mm_segment_t        oldmm = get_fs();
+        struct timeval      tv;
+
+        memset(&locaddr, 0, sizeof(locaddr)); 
+        locaddr.sin_family = AF_INET; 
+        locaddr.sin_port = htons(local_port);
+        locaddr.sin_addr.s_addr = 
+                (route->ksnr_myipaddr != 0) ? htonl(route->ksnr_myipaddr) 
+                                            : INADDR_ANY;
+        memset (&srvaddr, 0, sizeof (srvaddr));
+        srvaddr.sin_family = AF_INET;
+        srvaddr.sin_port = htons (route->ksnr_port);
+        srvaddr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
+
+        *may_retry = 0;
+
         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
+        *sockp = sock;
         if (rc != 0) {
                 CERROR ("Can't create autoconnect socket: %d\n", rc);
                 return (rc);
@@ -2344,17 +2449,23 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
          * from userspace.  And we actually need the sock->file refcounting
          * that this gives you :) */
 
-        fd = sock_map_fd (sock);
-        if (fd < 0) {
+        rc = sock_map_fd (sock);
+        if (rc < 0) {
                 sock_release (sock);
-                CERROR ("sock_map_fd error %d\n", fd);
-                return (fd);
+                CERROR ("sock_map_fd error %d\n", rc);
+                return (rc);
         }
 
-        /* NB the fd now owns the ref on sock->file */
+        /* NB the file descriptor (rc) now owns the ref on sock->file */
         LASSERT (sock->file != NULL);
         LASSERT (file_count(sock->file) == 1);
 
+        get_file(sock->file);                /* extra ref makes sock->file */
+        sys_close(rc);                       /* survive this close */
+
+        /* Still got a single ref on sock->file */
+        LASSERT (file_count(sock->file) == 1);
+
         /* Set the socket timeouts, so our connection attempt completes in
          * finite time */
         tv.tv_sec = ksocknal_tunables.ksnd_io_timeout;
@@ -2367,7 +2478,7 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
         if (rc != 0) {
                 CERROR ("Can't set send timeout %d: %d\n", 
                         ksocknal_tunables.ksnd_io_timeout, rc);
-                goto out;
+                goto failed;
         }
         
         set_fs (KERNEL_DS);
@@ -2377,53 +2488,83 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
         if (rc != 0) {
                 CERROR ("Can't set receive timeout %d: %d\n",
                         ksocknal_tunables.ksnd_io_timeout, rc);
-                goto out;
+                goto failed;
         }
 
-        if (route->ksnr_myipaddr != 0) {
-                /* Bind to the local IP address */
-                memset (&ipaddr, 0, sizeof (ipaddr));
-                ipaddr.sin_family = AF_INET;
-                ipaddr.sin_port = htons (0); /* ANY */
-                ipaddr.sin_addr.s_addr = htonl(route->ksnr_myipaddr);
+        set_fs (KERNEL_DS);
+        option = 1;
+        rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
+                             (char *)&option, sizeof (option)); 
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR("Can't set SO_REUSEADDR for socket: %d\n", rc);
+                goto failed;
+        }
 
-                rc = sock->ops->bind (sock, (struct sockaddr *)&ipaddr,
-                                      sizeof (ipaddr));
-                if (rc != 0) {
-                        CERROR ("Can't bind to local IP %u.%u.%u.%u: %d\n",
-                                HIPQUAD(route->ksnr_myipaddr), rc);
-                        goto out;
-                }
+        rc = sock->ops->bind(sock, 
+                             (struct sockaddr *)&locaddr, sizeof(locaddr));
+        if (rc == -EADDRINUSE) {
+                CDEBUG(D_NET, "Port %d already in use\n", local_port);
+                *may_retry = 1;
+                goto failed;
         }
-        
-        memset (&ipaddr, 0, sizeof (ipaddr));
-        ipaddr.sin_family = AF_INET;
-        ipaddr.sin_port = htons (route->ksnr_port);
-        ipaddr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
-        
-        rc = sock->ops->connect (sock, (struct sockaddr *)&ipaddr, 
-                                 sizeof (ipaddr), sock->file->f_flags);
         if (rc != 0) {
-                CERROR ("Can't connect to nid "LPX64
-                        " local IP: %u.%u.%u.%u,"
-                        " remote IP: %u.%u.%u.%u/%d: %d\n", 
-                        route->ksnr_peer->ksnp_nid,
-                        HIPQUAD(route->ksnr_myipaddr),
-                        HIPQUAD(route->ksnr_ipaddr),
-                        route->ksnr_port, rc);
-                goto out;
+                CERROR("Error trying to bind to reserved port %d: %d\n",
+                       local_port, rc);
+                goto failed;
         }
 
-        rc = ksocknal_create_conn (route, sock, type);
-        if (rc == 0) {
-                /* Take an extra ref on sock->file to compensate for the
-                 * upcoming close which will lose fd's ref on it. */
-                get_file (sock->file);
+        rc = sock->ops->connect(sock,
+                                (struct sockaddr *)&srvaddr, sizeof(srvaddr),
+                                sock->file->f_flags);
+        if (rc == 0)
+                return 0;
+
+        /* EADDRNOTAVAIL probably means we're already connected to the same
+         * peer/port on the same local port on a differently typed
+         * connection.  Let our caller retry with a different local
+         * port... */
+        *may_retry = (rc == -EADDRNOTAVAIL);
+
+        CDEBUG(*may_retry ? D_NET : D_ERROR,
+               "Error %d connecting %u.%u.%u.%u/%d -> %u.%u.%u.%u/%d\n", rc,
+               HIPQUAD(route->ksnr_myipaddr), local_port,
+               HIPQUAD(route->ksnr_ipaddr), route->ksnr_port);
+
+ failed:
+        fput(sock->file);
+        return rc;
+}
+
+int
+ksocknal_connect_peer (ksock_route_t *route, int type)
+{
+        struct socket      *sock;
+        int                 rc;
+        int                 port;
+        int                 may_retry;
+        
+        /* Iterate through reserved ports.  When typed connections are
+         * used, we will need to bind to multiple ports, but we only know
+         * this at connect time.  But, by that time we've already called
+         * bind() so we need a new socket. */
+
+        for (port = 1023; port > 512; --port) {
+
+                rc = ksocknal_connect_sock(&sock, &may_retry, route, port);
+
+                if (rc == 0) {
+                        rc = ksocknal_create_conn(route, sock, type);
+                        fput(sock->file);
+                        return rc;
+                }
+
+                if (!may_retry)
+                        return rc;
         }
 
- out:
-        sys_close (fd);
-        return (rc);
+        CERROR("Out of ports trying to bind to a reserved port\n");
+        return (-EADDRINUSE);
 }
 
 void
@@ -2443,7 +2584,6 @@ ksocknal_autoconnect (ksock_route_t *route)
                 LASSERT (type < SOCKNAL_CONN_NTYPES);
 
                 rc = ksocknal_connect_peer (route, type);
-
                 if (rc != 0)
                         break;