Whamcloud - gitweb
merge b_multinet into HEAD
authorphil <phil>
Tue, 19 Aug 2003 17:38:30 +0000 (17:38 +0000)
committerphil <phil>
Tue, 19 Aug 2003 17:38:30 +0000 (17:38 +0000)
23 files changed:
lnet/include/linux/kp30.h
lnet/include/lnet/lnetctl.h
lnet/include/lnet/ptlctl.h
lnet/klnds/qswlnd/qswlnd_cb.c
lnet/klnds/socklnd/socklnd.c
lnet/klnds/socklnd/socklnd.h
lnet/klnds/socklnd/socklnd_cb.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/utils/portals.c
lnet/utils/ptlctl.c
lustre/portals/include/linux/kp30.h
lustre/portals/include/portals/ptlctl.h
lustre/portals/knals/qswnal/qswnal_cb.c
lustre/portals/knals/socknal/socknal.c
lustre/portals/knals/socknal/socknal.h
lustre/portals/knals/socknal/socknal_cb.c
lustre/portals/portals/lib-move.c
lustre/portals/portals/lib-msg.c
lustre/portals/utils/portals.c
lustre/portals/utils/ptlctl.c
lustre/utils/lconf
lustre/utils/lctl.c

index 2133391..85fe8e7 100644 (file)
@@ -883,6 +883,10 @@ extern ptl_handle_ni_t  kscimacnal_ni;
 #define NAL_CMD_CLOSE_CONNECTION     101
 #define NAL_CMD_REGISTER_MYNID       102
 #define NAL_CMD_PUSH_CONNECTION      103
+#define NAL_CMD_GET_CONN             104
+#define NAL_CMD_DEL_AUTOCONN         105
+#define NAL_CMD_ADD_AUTOCONN         106
+#define NAL_CMD_GET_AUTOCONN         107
 
 enum {
         DEBUG_DAEMON_START       =  1,
index dc02780..ffe7e5b 100644 (file)
@@ -34,6 +34,10 @@ char * ptl_nid2str (char *buffer, ptl_nid_t nid);
 
 int ptl_initialize(int argc, char **argv);
 int jt_ptl_network(int argc, char **argv);
+int jt_ptl_print_autoconnects (int argc, char **argv);
+int jt_ptl_add_autoconnect (int argc, char **argv);
+int jt_ptl_del_autoconnect (int argc, char **argv);
+int jt_ptl_print_connections (int argc, char **argv);
 int jt_ptl_connect(int argc, char **argv);
 int jt_ptl_disconnect(int argc, char **argv);
 int jt_ptl_push_connection(int argc, char **argv);
index dc02780..ffe7e5b 100644 (file)
@@ -34,6 +34,10 @@ char * ptl_nid2str (char *buffer, ptl_nid_t nid);
 
 int ptl_initialize(int argc, char **argv);
 int jt_ptl_network(int argc, char **argv);
+int jt_ptl_print_autoconnects (int argc, char **argv);
+int jt_ptl_add_autoconnect (int argc, char **argv);
+int jt_ptl_del_autoconnect (int argc, char **argv);
+int jt_ptl_print_connections (int argc, char **argv);
 int jt_ptl_connect(int argc, char **argv);
 int jt_ptl_disconnect(int argc, char **argv);
 int jt_ptl_push_connection(int argc, char **argv);
index c03d592..a6b4b93 100644 (file)
@@ -569,11 +569,6 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         int                sumnob;
 #endif
         
-        /* NB, the return code from this procedure is ignored.
-         * If we can't send, we must still complete with lib_finalize().
-         * We'll have to wait for 3.2 to return an error event.
-         */
-
         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
                " pid %u\n", payload_nob, payload_niov, nid, pid);
 
@@ -588,8 +583,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         if (payload_nob > KQSW_MAXPAYLOAD) {
                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
                         payload_nob, KQSW_MAXPAYLOAD);
-                lib_finalize (&kqswnal_lib, private, cookie);
-                return (-1);
+                return (PTL_FAIL);
         }
 
         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
@@ -597,14 +591,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 if (rc != 0) {
                         CERROR("Can't route to "LPX64": router error %d\n",
                                nid, rc);
-                        lib_finalize (&kqswnal_lib, private, cookie);
-                        return (-1);
+                        return (PTL_FAIL);
                 }
                 if (kqswnal_nid2elanid (gatewaynid) < 0) {
                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
                                gatewaynid, nid);
-                        lib_finalize (&kqswnal_lib, private, cookie);
-                        return (-1);
+                        return (PTL_FAIL);
                 }
                 nid = gatewaynid;
         }
@@ -616,8 +608,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                           in_interrupt()));
         if (ktx == NULL) {
                 kqswnal_cerror_hdr (hdr);
-                lib_finalize (&kqswnal_lib, private, cookie);
-                return (-1);
+                return (PTL_NOSPACE);
         }
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
@@ -670,8 +661,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                                          payload_niov, payload_iov);
                         if (rc != 0) {
                                 kqswnal_put_idle_tx (ktx);
-                                lib_finalize (&kqswnal_lib, private, cookie);
-                                return (-1);
+                                return (PTL_FAIL);
                         }
                 } 
         }
@@ -686,12 +676,11 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         rc = kqswnal_launch (ktx);
         if (rc != 0) {                    /* failed? */
                 CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc);
-                lib_finalize (&kqswnal_lib, private, cookie);
-                return (-1);
+                return (PTL_FAIL);
         }
 
         CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
-        return (0);
+        return (PTL_OK);
 }
 
 static int
index 91d971c..e7232a0 100644 (file)
@@ -59,7 +59,9 @@ ksocknal_api_shutdown(nal_t *nal, int ni)
 {
         CDEBUG (D_NET, "closing all connections\n");
 
-        return ksocknal_close_sock(0);          /* close all sockets */
+        ksocknal_del_route (PTL_NID_ANY, 0, 0, 0);
+        ksocknal_close_conn (PTL_NID_ANY, 0);
+        return PTL_OK;
 }
 
 void
@@ -104,15 +106,6 @@ ksocknal_init(int interface, ptl_pt_index_t ptl_size,
  *  EXTRA functions follow
  */
 
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-#define SOCKET_I(inode) (&(inode)->u.socket_i)
-#endif
-static __inline__ struct socket *
-socki_lookup(struct inode *inode)
-{
-        return SOCKET_I(inode);
-}
-
 int
 ksocknal_set_mynid(ptl_nid_t nid)
 {
@@ -132,23 +125,43 @@ ksocknal_set_mynid(ptl_nid_t nid)
 }
 
 void
-ksocknal_bind_irq (unsigned int irq, int cpu)
+ksocknal_bind_irq (unsigned int irq)
 {
 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
-        char  cmdline[64];
-        char *argv[] = {"/bin/sh",
-                        "-c",
-                        cmdline,
-                        NULL};
-        char *envp[] = {"HOME=/",
-                        "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
-                        NULL};
+        int              bind;
+        unsigned long    flags;
+        char             cmdline[64];
+        ksock_irqinfo_t *info;
+        char            *argv[] = {"/bin/sh",
+                                   "-c",
+                                   cmdline,
+                                   NULL};
+        char            *envp[] = {"HOME=/",
+                                   "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
+                                   NULL};
+
+        LASSERT (irq < NR_IRQS);
+        if (irq == 0)                           /* software NIC */
+                return;
+
+        info = &ksocknal_data.ksnd_irqinfo[irq];
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        LASSERT (info->ksni_valid);
+        bind = !info->ksni_bound;
+        info->ksni_bound = 1;
+
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        if (!bind)                              /* bound already */
+                return;
 
         snprintf (cmdline, sizeof (cmdline),
-                  "echo %d > /proc/irq/%u/smp_affinity", 1 << cpu, irq);
+                  "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
 
         printk (KERN_INFO "Binding irq %u to CPU %d with cmd: %s\n",
-                irq, cpu, cmdline);
+                irq, info->ksni_sched, cmdline);
 
         /* FIXME: Find a better method of setting IRQ affinity...
          */
@@ -157,201 +170,854 @@ ksocknal_bind_irq (unsigned int irq, int cpu)
 #endif
 }
 
+ksock_route_t *
+ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
+                       int irq_affinity, int xchange_nids, int nonagel)
+{
+        ksock_route_t *route;
+
+        PORTAL_ALLOC (route, sizeof (*route));
+        if (route == NULL)
+                return (NULL);
+
+        atomic_set (&route->ksnr_refcount, 1);
+        route->ksnr_sharecount = 0;
+        route->ksnr_peer = NULL;
+        route->ksnr_timeout = jiffies_64;
+        route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
+        route->ksnr_ipaddr = ipaddr;
+        route->ksnr_port = port;
+        route->ksnr_buffer_size = buffer_size;
+        route->ksnr_irq_affinity = irq_affinity;
+        route->ksnr_xchange_nids = xchange_nids;
+        route->ksnr_nonagel = nonagel;
+        route->ksnr_connecting = 0;
+        route->ksnr_deleted = 0;
+        route->ksnr_generation = 0;
+        route->ksnr_conn = NULL;
+
+        return (route);
+}
+
+void
+ksocknal_destroy_route (ksock_route_t *route)
+{
+        LASSERT (route->ksnr_sharecount == 0);
+        LASSERT (route->ksnr_conn == NULL);
+
+        if (route->ksnr_peer != NULL)
+                ksocknal_put_peer (route->ksnr_peer);
+
+        PORTAL_FREE (route, sizeof (*route));
+}
+
+void
+ksocknal_put_route (ksock_route_t *route)
+{
+        CDEBUG (D_OTHER, "putting route[%p] -> "LPX64" (%d)\n",
+                route, route->ksnr_peer->ksnp_nid,
+                atomic_read (&route->ksnr_refcount));
+
+        LASSERT (atomic_read (&route->ksnr_refcount) > 0);
+        if (!atomic_dec_and_test (&route->ksnr_refcount))
+             return;
+
+        ksocknal_destroy_route (route);
+}
+
+ksock_peer_t *
+ksocknal_create_peer (ptl_nid_t nid)
+{
+        ksock_peer_t *peer;
+
+        LASSERT (nid != PTL_NID_ANY);
+
+        PORTAL_ALLOC (peer, sizeof (*peer));
+        if (peer == NULL)
+                return (NULL);
+
+        memset (peer, 0, sizeof (*peer));
+
+        peer->ksnp_nid = nid;
+        atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
+        peer->ksnp_closing = 0;
+        INIT_LIST_HEAD (&peer->ksnp_conns);
+        INIT_LIST_HEAD (&peer->ksnp_routes);
+        INIT_LIST_HEAD (&peer->ksnp_tx_queue);
+
+        /* Can't unload while peers exist; ensures all I/O has terminated
+         * before unload attempts */
+        PORTAL_MODULE_USE;
+        atomic_inc (&ksocknal_data.ksnd_npeers);
+        return (peer);
+}
+
+void
+ksocknal_destroy_peer (ksock_peer_t *peer)
+{
+        CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
+
+        LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
+        LASSERT (list_empty (&peer->ksnp_conns));
+        LASSERT (list_empty (&peer->ksnp_routes));
+        LASSERT (list_empty (&peer->ksnp_tx_queue));
+
+        PORTAL_FREE (peer, sizeof (*peer));
+
+        /* NB a peer's connections and autoconnect routes keep a reference
+         * on their peer until they are destroyed, so we can be assured
+         * that _all_ state to do with this peer has been cleaned up when
+         * its refcount drops to zero. */
+        atomic_dec (&ksocknal_data.ksnd_npeers);
+        PORTAL_MODULE_UNUSE;
+}
+
+void
+ksocknal_put_peer (ksock_peer_t *peer)
+{
+        CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
+                peer, peer->ksnp_nid,
+                atomic_read (&peer->ksnp_refcount));
+
+        LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
+        if (!atomic_dec_and_test (&peer->ksnp_refcount))
+                return;
+
+        ksocknal_destroy_peer (peer);
+}
+
+ksock_peer_t *
+ksocknal_find_peer_locked (ptl_nid_t nid)
+{
+        struct list_head *peer_list = ksocknal_nid2peerlist (nid);
+        struct list_head *tmp;
+        ksock_peer_t     *peer;
+
+        list_for_each (tmp, peer_list) {
+
+                peer = list_entry (tmp, ksock_peer_t, ksnp_list);
+
+                LASSERT (!peer->ksnp_closing);
+                LASSERT (!(list_empty (&peer->ksnp_routes) &&
+                           list_empty (&peer->ksnp_conns)));
+
+                if (peer->ksnp_nid != nid)
+                        continue;
+
+                CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
+                       peer, nid, atomic_read (&peer->ksnp_refcount));
+                return (peer);
+        }
+        return (NULL);
+}
+
+ksock_peer_t *
+ksocknal_get_peer (ptl_nid_t nid)
+{
+        ksock_peer_t     *peer;
+
+        read_lock (&ksocknal_data.ksnd_global_lock);
+        peer = ksocknal_find_peer_locked (nid);
+        if (peer != NULL)                       /* +1 ref for caller? */
+                atomic_inc (&peer->ksnp_refcount);
+        read_unlock (&ksocknal_data.ksnd_global_lock);
+
+        return (peer);
+}
+
+void
+ksocknal_unlink_peer_locked (ksock_peer_t *peer)
+{
+        LASSERT (!peer->ksnp_closing);
+        peer->ksnp_closing = 1;
+        list_del (&peer->ksnp_list);
+        /* lose peerlist's ref */
+        ksocknal_put_peer (peer);
+}
+
+ksock_route_t *
+ksocknal_get_route_by_idx (int index)
+{
+        ksock_peer_t      *peer;
+        struct list_head  *ptmp;
+        ksock_route_t     *route;
+        struct list_head  *rtmp;
+        int                i;
+
+        read_lock (&ksocknal_data.ksnd_global_lock);
+
+        for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
+                list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
+                        peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
+
+                        LASSERT (!(list_empty (&peer->ksnp_routes) &&
+                                   list_empty (&peer->ksnp_conns)));
+
+                        list_for_each (rtmp, &peer->ksnp_routes) {
+                                if (index-- > 0)
+                                        continue;
+
+                                route = list_entry (rtmp, ksock_route_t, ksnr_list);
+                                atomic_inc (&route->ksnr_refcount);
+                                read_unlock (&ksocknal_data.ksnd_global_lock);
+                                return (route);
+                        }
+                }
+        }
+
+        read_unlock (&ksocknal_data.ksnd_global_lock);
+        return (NULL);
+}
+
 int
-ksocknal_add_sock (ptl_nid_t nid, int fd, int bind_irq)
+ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
+                    int nonagle, int xchange_nids, int bind_irq, int share)
 {
         unsigned long      flags;
+        ksock_peer_t      *peer;
+        ksock_peer_t      *peer2;
+        ksock_route_t     *route;
+        struct list_head  *rtmp;
+        ksock_route_t     *route2;
+        
+        if (nid == PTL_NID_ANY)
+                return (-EINVAL);
+
+        /* Have a brand new peer ready... */
+        peer = ksocknal_create_peer (nid);
+        if (peer == NULL)
+                return (-ENOMEM);
+
+        route = ksocknal_create_route (ipaddr, port, bufnob,
+                                       nonagle, xchange_nids, bind_irq);
+        if (route == NULL) {
+                ksocknal_put_peer (peer);
+                return (-ENOMEM);
+        }
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        peer2 = ksocknal_find_peer_locked (nid);
+        if (peer2 != NULL) {
+                ksocknal_put_peer (peer);
+                peer = peer2;
+        } else {
+                /* peer table takes existing ref on peer */
+                list_add (&peer->ksnp_list,
+                          ksocknal_nid2peerlist (nid));
+        }
+
+        route2 = NULL;
+        if (share) {
+                /* check for existing route to this NID via this ipaddr */
+                list_for_each (rtmp, &peer->ksnp_routes) {
+                        route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
+                        
+                        if (route2->ksnr_ipaddr == ipaddr)
+                                break;
+
+                        route2 = NULL;
+                }
+        }
+
+        if (route2 != NULL) {
+                ksocknal_put_route (route);
+                route = route2;
+        } else {
+                /* route takes a ref on peer */
+                route->ksnr_peer = peer;
+                atomic_inc (&peer->ksnp_refcount);
+                /* peer's route list takes existing ref on route */
+                list_add (&route->ksnr_list, &peer->ksnp_routes);
+        }
+        
+        route->ksnr_sharecount++;
+
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        return (0);
+}
+
+void
+ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
+{
+        ksock_peer_t *peer = route->ksnr_peer;
+        ksock_conn_t *conn = route->ksnr_conn;
+
+        if (!share)
+                route->ksnr_sharecount = 0;
+        else {
+                route->ksnr_sharecount--;
+                if (route->ksnr_sharecount != 0)
+                        return;
+        }
+
+        if (conn != NULL) {
+                if (!keep_conn)
+                        ksocknal_close_conn_locked (conn);
+                else {
+                        /* keeping the conn; just dissociate it and route... */
+                        conn->ksnc_route = NULL;
+                        route->ksnr_conn = NULL;
+                        ksocknal_put_route (route); /* drop conn's ref on route */
+                        ksocknal_put_conn (conn); /* drop route's ref on conn */
+                }
+        }
+
+        route->ksnr_deleted = 1;
+        list_del (&route->ksnr_list);
+        ksocknal_put_route (route);             /* drop peer's ref */
+
+        if (list_empty (&peer->ksnp_routes) &&
+            list_empty (&peer->ksnp_conns)) {
+                /* I've just removed the last autoconnect route of a peer
+                 * with no active connections */
+                ksocknal_unlink_peer_locked (peer);
+        }
+}
+
+int
+ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
+{
+        unsigned long      flags;
+        struct list_head  *ptmp;
+        struct list_head  *pnxt;
+        ksock_peer_t      *peer;
+        struct list_head  *rtmp;
+        struct list_head  *rnxt;
+        ksock_route_t     *route;
+        int                lo;
+        int                hi;
+        int                i;
+        int                rc = -ENOENT;
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        if (nid != PTL_NID_ANY)
+                lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
+        else {
+                lo = 0;
+                hi = ksocknal_data.ksnd_peer_hash_size - 1;
+        }
+
+        for (i = lo; i <= hi; i++) {
+                list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
+                        peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
+
+                        if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
+                                continue;
+
+                        list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
+                                route = list_entry (rtmp, ksock_route_t,
+                                                    ksnr_list);
+
+                                if (!(ipaddr == 0 ||
+                                      route->ksnr_ipaddr == ipaddr))
+                                        continue;
+
+                                ksocknal_del_route_locked (route, share, keep_conn);
+                                rc = 0;         /* matched something */
+                                if (share)
+                                        goto out;
+                        }
+                }
+        }
+ out:
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        return (rc);
+}
+
+ksock_conn_t *
+ksocknal_get_conn_by_idx (int index)
+{
+        ksock_peer_t      *peer;
+        struct list_head  *ptmp;
         ksock_conn_t      *conn;
-        struct file       *file = NULL;
-        struct socket     *sock = NULL;
-        ksock_sched_t     *sched = NULL;
-        unsigned int       irq = 0;
-        struct net_device *dev = NULL;
-        int                ret;
-        int                idx;
-        ENTRY;
-
-        LASSERT (!in_interrupt());
-
-        file = fget(fd);
-        if (file == NULL)
-                RETURN(-EINVAL);
-
-        ret = -EINVAL;
-        sock = socki_lookup(file->f_dentry->d_inode);
-        if (sock == NULL)
-                GOTO(error, ret);
-
-        ret = -ENOMEM;
-        PORTAL_ALLOC(conn, sizeof(*conn));
-        if (!conn)
-                GOTO(error, ret);
+        struct list_head  *ctmp;
+        int                i;
 
-        sock->sk->allocation = GFP_NOFS;    /* don't call info fs for alloc */
+        read_lock (&ksocknal_data.ksnd_global_lock);
 
-        conn->ksnc_file = file;
-        conn->ksnc_sock = sock;
-        conn->ksnc_saved_data_ready = sock->sk->data_ready;
-        conn->ksnc_saved_write_space = sock->sk->write_space;
-        conn->ksnc_peernid = nid;
-        atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for socklist */
+        for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
+                list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
+                        peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
 
-        conn->ksnc_rx_ready = 0;
-        conn->ksnc_rx_scheduled = 0;
-        ksocknal_new_packet (conn, 0);
+                        LASSERT (!(list_empty (&peer->ksnp_routes) &&
+                                   list_empty (&peer->ksnp_conns)));
 
-        INIT_LIST_HEAD (&conn->ksnc_tx_queue);
-        conn->ksnc_tx_ready = 0;
-        conn->ksnc_tx_scheduled = 0;
+                        list_for_each (ctmp, &peer->ksnp_conns) {
+                                if (index-- > 0)
+                                        continue;
+
+                                conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
+                                atomic_inc (&conn->ksnc_refcount);
+                                read_unlock (&ksocknal_data.ksnd_global_lock);
+                                return (conn);
+                        }
+                }
+        }
+
+        read_unlock (&ksocknal_data.ksnd_global_lock);
+        return (NULL);
+}
+
+void
+ksocknal_get_peer_addr (ksock_conn_t *conn)
+{
+        struct sockaddr_in sin;
+        int                len = sizeof (sin);
+        int                rc;
 
-#warning check it is OK to derefence sk->dst_cache->dev like this...
-        lock_sock (conn->ksnc_sock->sk);
+        rc = ksocknal_getconnsock (conn);
+        LASSERT (rc == 0);
+        
+        rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
+                                            (struct sockaddr *)&sin, &len, 2);
+        LASSERT (len <= sizeof (sin));
+        ksocknal_putconnsock (conn);
+
+        if (rc != 0) {
+                CERROR ("Error %d getting sock peer IP\n", rc);
+                return;
+        }
 
-        if (conn->ksnc_sock->sk->dst_cache != NULL) {
-                dev = conn->ksnc_sock->sk->dst_cache->dev;
-                if (dev != NULL) {
-                        irq = dev->irq;
+        conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
+        conn->ksnc_port   = ntohs (sin.sin_port);
+}
+
+unsigned int
+ksocknal_conn_irq (ksock_conn_t *conn)
+{
+        int                irq = 0;
+        int                rc;
+        struct dst_entry  *dst;
+
+        rc = ksocknal_getconnsock (conn);
+        LASSERT (rc == 0);
+        
+        dst = sk_dst_get (conn->ksnc_sock->sk);
+        if (dst != NULL) {
+                if (dst->dev != NULL) {
+                        irq = dst->dev->irq;
                         if (irq >= NR_IRQS) {
                                 CERROR ("Unexpected IRQ %x\n", irq);
                                 irq = 0;
                         }
                 }
+                dst_release (dst);
         }
+        
+        ksocknal_putconnsock (conn);
+        return (irq);
+}
 
-        release_sock (conn->ksnc_sock->sk);
+ksock_sched_t *
+ksocknal_choose_scheduler_locked (unsigned int irq)
+{
+        ksock_sched_t    *sched;
+        ksock_irqinfo_t  *info;
+        int               i;
 
-        write_lock_irqsave (&ksocknal_data.ksnd_socklist_lock, flags);
+        LASSERT (irq < NR_IRQS);
+        info = &ksocknal_data.ksnd_irqinfo[irq];
 
-        if (irq == 0 ||
-            ksocknal_data.ksnd_irq_info[irq] == SOCKNAL_IRQ_UNASSIGNED) {
-                /* This is a software NIC, or we haven't associated it with
-                 * a CPU yet */
+        if (irq != 0 &&                         /* hardware NIC */
+            info->ksni_valid) {                 /* already set up */
+                return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
+        }
 
-                /* Choose the CPU with the fewest connections */
-                sched = ksocknal_data.ksnd_schedulers;
-                for (idx = 1; idx < SOCKNAL_N_SCHED; idx++)
-                        if (sched->kss_nconns >
-                            ksocknal_data.ksnd_schedulers[idx].kss_nconns)
-                                sched = &ksocknal_data.ksnd_schedulers[idx];
+        /* software NIC (irq == 0) || not associated with a scheduler yet.
+         * Choose the CPU with the fewest connections... */
+        sched = &ksocknal_data.ksnd_schedulers[0];
+        for (i = 1; i < SOCKNAL_N_SCHED; i++)
+                if (sched->kss_nconns >
+                    ksocknal_data.ksnd_schedulers[i].kss_nconns)
+                        sched = &ksocknal_data.ksnd_schedulers[i];
 
-                if (irq != 0) {                 /* Hardware NIC */
-                        /* Remember which scheduler we chose */
-                        idx = sched - ksocknal_data.ksnd_schedulers;
+        if (irq != 0) {                         /* Hardware NIC */
+                info->ksni_valid = 1;
+                info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
 
-                        LASSERT (idx < SOCKNAL_IRQ_SCHED_MASK);
+                /* no overflow... */
+                LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
+        }
 
-                        if (bind_irq)       /* remember if we will bind below */
-                                idx |= SOCKNAL_IRQ_BOUND;
+        return (sched);
+}
 
-                        ksocknal_data.ksnd_irq_info[irq] = idx;
+int
+ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
+                      struct socket *sock, int bind_irq)
+{
+        unsigned long      flags;
+        ksock_conn_t      *conn;
+        ksock_peer_t      *peer;
+        ksock_peer_t      *peer2;
+        ksock_sched_t     *sched;
+        unsigned int       irq;
+        ksock_tx_t        *tx;
+        int                rc;
+
+        /* NB, sock has an associated file since (a) this connection might
+         * have been created in userland and (b) we need the refcounting so
+         * that we don't close the socket while I/O is being done on it. */
+        LASSERT (sock->file != NULL);
+
+        rc = ksocknal_set_linger (sock);
+        if (rc != 0)
+                return (rc);
+
+        peer = NULL;
+        if (route == NULL) {                    /* not autoconnect */
+                /* Assume this socket connects to a brand new peer */
+                peer = ksocknal_create_peer (nid);
+                if (peer == NULL)
+                        return (-ENOMEM);
+        }
+
+        PORTAL_ALLOC(conn, sizeof(*conn));
+        if (conn == NULL) {
+                if (peer != NULL)
+                        ksocknal_put_peer (peer);
+                return (-ENOMEM);
+        }
+
+        memset (conn, 0, sizeof (*conn));
+        conn->ksnc_peer = NULL;
+        conn->ksnc_route = NULL;
+        conn->ksnc_sock = sock;
+        conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
+        conn->ksnc_saved_write_space = sock->sk->sk_write_space;
+        atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
+
+        conn->ksnc_rx_ready = 0;
+        conn->ksnc_rx_scheduled = 0;
+        ksocknal_new_packet (conn, 0);
+
+        INIT_LIST_HEAD (&conn->ksnc_tx_queue);
+#if SOCKNAL_ZC
+        INIT_LIST_HEAD (&conn->ksnc_tx_pending);
+#endif
+        conn->ksnc_tx_ready = 0;
+        conn->ksnc_tx_scheduled = 0;
+        atomic_set (&conn->ksnc_tx_nob, 0);
+
+        ksocknal_get_peer_addr (conn);
+
+        irq = ksocknal_conn_irq (conn);
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        if (route != NULL) {
+                /* Autoconnected! */
+                LASSERT (route->ksnr_conn == NULL && route->ksnr_connecting);
+
+                if (route->ksnr_deleted) {
+                        /* This conn was autoconnected, but the autoconnect
+                         * route got deleted while it was being
+                         * established! */
+                        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
+                                                 flags);
+                        PORTAL_FREE (conn, sizeof (*conn));
+                        return (-ESTALE);
                 }
-        } else { 
-                /* This is a hardware NIC, associated with a CPU */
-                idx = ksocknal_data.ksnd_irq_info[irq];
 
-                /* Don't bind again if we've bound already */
-                if ((idx & SOCKNAL_IRQ_BOUND) != 0)
-                        bind_irq = 0;
-                
-                sched = &ksocknal_data.ksnd_schedulers[idx & SOCKNAL_IRQ_SCHED_MASK];
+
+                /* associate conn/route for auto-reconnect */
+                route->ksnr_conn = conn;
+                atomic_inc (&conn->ksnc_refcount);
+                conn->ksnc_route = route;
+                atomic_inc (&route->ksnr_refcount);
+                route->ksnr_connecting = 0;
+
+                route->ksnr_generation++;
+                route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
+
+                peer = route->ksnr_peer;
+        } else {
+                /* Not an autoconnected connection; see if there is an
+                 * existing peer for this NID */
+                peer2 = ksocknal_find_peer_locked (nid);
+                if (peer2 != NULL) {
+                        ksocknal_put_peer (peer);
+                        peer = peer2;
+                } else {
+                        list_add (&peer->ksnp_list,
+                                  ksocknal_nid2peerlist (nid));
+                        /* peer list takes over existing ref */
+                }
         }
 
+        LASSERT (!peer->ksnp_closing);
+
+        conn->ksnc_peer = peer;
+        atomic_inc (&peer->ksnp_refcount);
+
+        list_add (&conn->ksnc_list, &peer->ksnp_conns);
+        atomic_inc (&conn->ksnc_refcount);
+
+        sched = ksocknal_choose_scheduler_locked (irq);
         sched->kss_nconns++;
         conn->ksnc_scheduler = sched;
 
-        list_add(&conn->ksnc_list, &ksocknal_data.ksnd_socklist);
-
-        write_unlock_irqrestore (&ksocknal_data.ksnd_socklist_lock, flags);
+        /* NB my callbacks block while I hold ksnd_global_lock */
+        sock->sk->sk_user_data = conn;
+        sock->sk->sk_data_ready = ksocknal_data_ready;
+        sock->sk->sk_write_space = ksocknal_write_space;
+
+        /* Take all the packets blocking for a connection.
+         * NB, it might be nicer to share these blocked packets among any
+         * other connections that are becoming established, however that
+         * confuses the normal packet launching operation, which selects a
+         * connection and queues the packet on it without needing an
+         * exclusive lock on ksnd_global_lock. */
+        while (!list_empty (&peer->ksnp_tx_queue)) {
+                tx = list_entry (peer->ksnp_tx_queue.next,
+                                 ksock_tx_t, tx_list);
+
+                list_del (&tx->tx_list);
+                ksocknal_queue_tx_locked (tx, conn);
+        }
 
-        if (bind_irq &&                         /* irq binding required */
-            irq != 0)                           /* hardware NIC */
-                ksocknal_bind_irq (irq, sched - ksocknal_data.ksnd_schedulers);
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
 
-        /* NOW it's safe to get called back when socket is ready... */
-        sock->sk->user_data = conn;
-        sock->sk->data_ready = ksocknal_data_ready;
-        sock->sk->write_space = ksocknal_write_space;
+        if (bind_irq)                           /* irq binding required */
+                ksocknal_bind_irq (irq);
 
-        /* ...which I call right now to get things going */
+        /* Call the callbacks right now to get things going. */
         ksocknal_data_ready (sock->sk, 0);
         ksocknal_write_space (sock->sk);
 
         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64"\n",
-               conn, conn->ksnc_peernid);
+               conn, conn->ksnc_peer->ksnp_nid);
 
-        /* Can't unload while connection active */
-        PORTAL_MODULE_USE;
-        RETURN(0);
+        ksocknal_put_conn (conn);
+        return (0);
+}
+
+void
+ksocknal_close_conn_locked (ksock_conn_t *conn)
+{
+        /* This just does the immmediate housekeeping, and queues the
+         * connection for the reaper to terminate. 
+         * Caller holds ksnd_global_lock exclusively in irq context */
+        ksock_peer_t   *peer = conn->ksnc_peer;
+        ksock_route_t  *route;
+
+        LASSERT (!conn->ksnc_closing);
+        conn->ksnc_closing = 1;
+        atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
+        
+        route = conn->ksnc_route;
+        if (route != NULL) {
+                /* dissociate conn from route... */
+                LASSERT (!route->ksnr_connecting &&
+                         !route->ksnr_deleted);
+
+                route->ksnr_conn = NULL;
+                conn->ksnc_route = NULL;
+
+                ksocknal_put_route (route);     /* drop conn's ref on route */
+                ksocknal_put_conn (conn);       /* drop route's ref on conn */
+        }
+
+        /* ksnd_deathrow_conns takes over peer's ref */
+        list_del (&conn->ksnc_list);
 
-error:
-        fput(file);
-        return (ret);
+        if (list_empty (&peer->ksnp_conns) &&
+            list_empty (&peer->ksnp_routes)) {
+                /* I've just closed last conn belonging to a
+                 * non-autoconnecting peer */
+                ksocknal_unlink_peer_locked (peer);
+        }
+
+        spin_lock (&ksocknal_data.ksnd_reaper_lock);
+
+        list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
+        if (waitqueue_active (&ksocknal_data.ksnd_reaper_waitq))
+                wake_up (&ksocknal_data.ksnd_reaper_waitq);
+                
+        spin_unlock (&ksocknal_data.ksnd_reaper_lock);
 }
 
-/* Passing in a zero nid will close all connections */
 int
-ksocknal_close_sock(ptl_nid_t nid)
+ksocknal_close_conn_unlocked (ksock_conn_t *conn) 
 {
-        long               flags;
-        ksock_conn_t      *conn;
-        LIST_HEAD         (death_row);
-        struct list_head  *tmp;
+        unsigned long flags;
+        int           did_it = 0;
+        
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+                
+        if (!conn->ksnc_closing) {
+                did_it = 1;
+                ksocknal_close_conn_locked (conn);
+        }
+        
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
 
-        LASSERT (!in_interrupt());
-        write_lock_irqsave (&ksocknal_data.ksnd_socklist_lock, flags);
+        return (did_it);
+}
 
-        if (nid == 0) {                         /* close ALL connections */
-                /* insert 'death row' into the socket list... */
-                list_add (&death_row, &ksocknal_data.ksnd_socklist);
-                /* ...extract and reinitialise the socket list itself... */
-                list_del_init (&ksocknal_data.ksnd_socklist);
-                /* ...and voila, death row is the proud owner of all conns */
-        } else list_for_each (tmp, &ksocknal_data.ksnd_socklist) {
+void
+ksocknal_terminate_conn (ksock_conn_t *conn)
+{
+        /* This gets called by the reaper (guaranteed thread context) to
+         * disengage the socket from its callbacks and close it.
+         * ksnc_refcount will eventually hit zero, and then the reaper will
+         * destroy it. */
+        unsigned long   flags;
+
+        /* serialise with callbacks */
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        LASSERT (conn->ksnc_closing);
+        
+        /* Remove conn's network callbacks.
+         * NB I _have_ to restore the callback, rather than storing a noop,
+         * since the socket could survive past this module being unloaded!! */
+        conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
+        conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
+
+        /* A callback could be in progress already; they hold a read lock
+         * on ksnd_global_lock (to serialise with me) and NOOP if
+         * sk_user_data is NULL. */
+        conn->ksnc_sock->sk->sk_user_data = NULL;
+
+        conn->ksnc_scheduler->kss_nconns--;
+
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        /* The socket is closed on the final put; either here, or in
+         * ksocknal_{send,recv}msg().  Since we set up the linger2 option
+         * when the connection was established, this will close the socket
+         * immediately, aborting anything buffered in it. Any hung
+         * zero-copy transmits will therefore complete in finite time. */
+        ksocknal_putconnsock (conn);
+}
 
-                conn = list_entry (tmp, ksock_conn_t, ksnc_list);
+void
+ksocknal_destroy_conn (ksock_conn_t *conn)
+{
+        /* Final coup-de-grace of the reaper */
+        CDEBUG (D_NET, "connection %p\n", conn);
 
-                if (conn->ksnc_peernid == nid) {
-                        list_del (&conn->ksnc_list);
-                        list_add (&conn->ksnc_list, &death_row);
-                        break;
-                }
+        LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
+        LASSERT (conn->ksnc_route == NULL);
+        LASSERT (!conn->ksnc_tx_scheduled);
+        LASSERT (!conn->ksnc_rx_scheduled);
+#if SOCKNAL_ZC
+        LASSERT (list_empty (&conn->ksnc_tx_pending));
+#endif
+        /* complete queued packets */
+        while (!list_empty (&conn->ksnc_tx_queue)) {
+                ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next,
+                                             ksock_tx_t, tx_list);
+
+                CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
+                        NTOH__u32 (tx->tx_hdr->type),
+                        NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
+                        NTOH__u64 (tx->tx_hdr->src_nid),
+                        NTOH__u64 (tx->tx_hdr->dest_nid));
+
+                list_del (&tx->tx_list);
+                ksocknal_tx_done (tx, 0);
+        }
+
+        /* complete current receive if any */
+        switch (conn->ksnc_rx_state) {
+        case SOCKNAL_RX_BODY:
+                lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
+                break;
+        case SOCKNAL_RX_BODY_FWD:
+                ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
+                break;
+        case SOCKNAL_RX_HEADER:
+        case SOCKNAL_RX_SLOP:
+                break;
+        default:
+                LBUG ();
+                break;
         }
 
-        write_unlock_irqrestore (&ksocknal_data.ksnd_socklist_lock, flags);
+        ksocknal_put_peer (conn->ksnc_peer);
 
-        if (nid && list_empty (&death_row))
-                return (-ENOENT);
+        PORTAL_FREE (conn, sizeof (*conn));
+        atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
+}
 
-        while (!list_empty (&death_row)) {
-                conn = list_entry (death_row.next, ksock_conn_t, ksnc_list);
-                list_del (&conn->ksnc_list);
+void
+ksocknal_put_conn (ksock_conn_t *conn)
+{
+        unsigned long flags;
 
-                /* NB I _have_ to restore the callback, rather than storing
-                 * a noop, since the socket could survive past this module
-                 * being unloaded!! */
-                conn->ksnc_sock->sk->data_ready = conn->ksnc_saved_data_ready;
-                conn->ksnc_sock->sk->write_space = conn->ksnc_saved_write_space;
+        CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
+                conn, conn->ksnc_peer->ksnp_nid,
+                atomic_read (&conn->ksnc_refcount));
 
-                /* OK; no more callbacks, but they could be in progress now,
-                 * so wait for them to complete... */
-                write_lock_irqsave (&ksocknal_data.ksnd_socklist_lock, flags);
+        LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
+        if (!atomic_dec_and_test (&conn->ksnc_refcount))
+                return;
 
-                /* ...however if I get the lock before a callback gets it,
-                 * this will make them noop
-                 */
-                conn->ksnc_sock->sk->user_data = NULL;
+        spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
 
-                /* And drop the scheduler's connection count while I've got
-                 * the exclusive lock */
-                conn->ksnc_scheduler->kss_nconns--;
+        list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
+        if (waitqueue_active (&ksocknal_data.ksnd_reaper_waitq))
+                wake_up (&ksocknal_data.ksnd_reaper_waitq);
 
-                write_unlock_irqrestore(&ksocknal_data.ksnd_socklist_lock,
-                                        flags);
+        spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
+}
 
-                ksocknal_put_conn (conn);       /* drop ref for ksnd_socklist */
+int
+ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr)
+{
+        unsigned long       flags;
+        ksock_conn_t       *conn;
+        struct list_head   *ctmp;
+        struct list_head   *cnxt;
+        ksock_peer_t       *peer;
+        struct list_head   *ptmp;
+        struct list_head   *pnxt;
+        int                 lo;
+        int                 hi;
+        int                 i;
+        int                 rc = -ENOENT;
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        if (nid != PTL_NID_ANY)
+                lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
+        else {
+                lo = 0;
+                hi = ksocknal_data.ksnd_peer_hash_size - 1;
         }
 
-        return (0);
+        for (i = lo; i <= hi; i++) {
+                list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
+
+                        peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
+
+                        if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
+                                continue;
+
+                        list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
+
+                                conn = list_entry (ctmp, ksock_conn_t,
+                                                   ksnc_list);
+
+                                if (!(ipaddr == 0 ||
+                                      conn->ksnc_ipaddr == ipaddr))
+                                        continue;
+
+                                rc = 0;
+                                ksocknal_close_conn_locked (conn);
+                        }
+                }
+        }
+
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        return (rc);
 }
 
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
@@ -370,13 +1036,20 @@ struct tcp_opt *sock2tcp_opt(struct sock *sk)
 void
 ksocknal_push_conn (ksock_conn_t *conn)
 {
-        struct sock    *sk = conn->ksnc_sock->sk;
-        struct tcp_opt *tp = sock2tcp_opt(sk);
+        struct sock    *sk;
+        struct tcp_opt *tp;
         int             nonagle;
         int             val = 1;
         int             rc;
         mm_segment_t    oldmm;
 
+        rc = ksocknal_getconnsock (conn);
+        if (rc != 0)                            /* being shut down */
+                return;
+        
+        sk = conn->ksnc_sock->sk;
+        tp = sock2tcp_opt(sk);
+        
         lock_sock (sk);
         nonagle = tp->nonagle;
         tp->nonagle = 1;
@@ -385,8 +1058,8 @@ ksocknal_push_conn (ksock_conn_t *conn)
         oldmm = get_fs ();
         set_fs (KERNEL_DS);
 
-        rc = sk->prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
-                                   (char *)&val, sizeof (val));
+        rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
+                                      (char *)&val, sizeof (val));
         LASSERT (rc == 0);
 
         set_fs (oldmm);
@@ -394,47 +1067,33 @@ ksocknal_push_conn (ksock_conn_t *conn)
         lock_sock (sk);
         tp->nonagle = nonagle;
         release_sock (sk);
+
+        ksocknal_putconnsock (conn);
 }
 
-/* Passing in a zero nid pushes all connections */
-int
-ksocknal_push_sock (ptl_nid_t nid)
+void
+ksocknal_push_peer (ksock_peer_t *peer)
 {
-        ksock_conn_t      *conn;
-        struct list_head  *tmp;
-        int                index;
-        int                i;
-
-        if (nid != 0) {
-                conn = ksocknal_get_conn (nid);
-
-                if (conn == NULL)
-                        return (-ENOENT);
-
-                ksocknal_push_conn (conn);
-                ksocknal_put_conn (conn);
-
-                return (0);
-        }
+        int               index;
+        int               i;
+        struct list_head *tmp;
+        ksock_conn_t     *conn;
 
-        /* NB we can't remove connections from the socket list so we have to
-         * cope with them being removed from under us...
-         */
         for (index = 0; ; index++) {
-                read_lock (&ksocknal_data.ksnd_socklist_lock);
+                read_lock (&ksocknal_data.ksnd_global_lock);
 
                 i = 0;
                 conn = NULL;
 
-                list_for_each (tmp, &ksocknal_data.ksnd_socklist) {
+                list_for_each (tmp, &peer->ksnp_conns) {
                         if (i++ == index) {
-                                conn = list_entry(tmp, ksock_conn_t, ksnc_list);
-                                atomic_inc (&conn->ksnc_refcount); // take a ref
+                                conn = list_entry (tmp, ksock_conn_t, ksnc_list);
+                                atomic_inc (&conn->ksnc_refcount);
                                 break;
                         }
                 }
 
-                read_unlock (&ksocknal_data.ksnd_socklist_lock);
+                read_unlock (&ksocknal_data.ksnd_global_lock);
 
                 if (conn == NULL)
                         break;
@@ -442,85 +1101,57 @@ ksocknal_push_sock (ptl_nid_t nid)
                 ksocknal_push_conn (conn);
                 ksocknal_put_conn (conn);
         }
-
-        return (0);
 }
 
-ksock_conn_t *
-ksocknal_get_conn (ptl_nid_t nid)
+int
+ksocknal_push (ptl_nid_t nid)
 {
-        struct list_head *tmp;
-        ksock_conn_t     *conn;
-
-        PROF_START(conn_list_walk);
-
-        read_lock (&ksocknal_data.ksnd_socklist_lock);
-
-        list_for_each(tmp, &ksocknal_data.ksnd_socklist) {
-
-                conn = list_entry(tmp, ksock_conn_t, ksnc_list);
-
-                if (conn->ksnc_peernid == nid) {
-                        /* caller is referencing */
-                        atomic_inc (&conn->ksnc_refcount);
-
-                        read_unlock (&ksocknal_data.ksnd_socklist_lock);
+        ksock_peer_t      *peer;
+        struct list_head  *tmp;
+        int                index;
+        int                i;
+        int                j;
+        int                rc = -ENOENT;
 
-                        CDEBUG(D_NET, "got conn [%p] -> "LPX64" (%d)\n",
-                               conn, nid, atomic_read (&conn->ksnc_refcount));
+        if (nid != PTL_NID_ANY) {
+                peer = ksocknal_get_peer (nid);
 
-                        PROF_FINISH(conn_list_walk);
-                        return (conn);
+                if (peer != NULL) {
+                        rc = 0;
+                        ksocknal_push_peer (peer);
+                        ksocknal_put_peer (peer);
                 }
+                return (rc);
         }
 
-        read_unlock (&ksocknal_data.ksnd_socklist_lock);
+        for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
+                for (j = 0; ; j++) {
+                        read_lock (&ksocknal_data.ksnd_global_lock);
 
-        CDEBUG(D_NET, "No connection found when looking for nid "LPX64"\n",
-               nid);
-        PROF_FINISH(conn_list_walk);
-        return (NULL);
-}
+                        index = 0;
+                        peer = NULL;
 
-void
-ksocknal_close_conn (ksock_conn_t *conn)
-{
-        CDEBUG (D_NET, "connection [%p] closed \n", conn);
-
-        fput (conn->ksnc_file);
-        PORTAL_FREE (conn, sizeof (*conn));
-
-        /* One less connection keeping us hanging on */
-        PORTAL_MODULE_UNUSE;
-}
-
-void
-_ksocknal_put_conn (ksock_conn_t *conn)
-{
-        unsigned long flags;
-
-        CDEBUG (D_NET, "connection [%p] handed the black spot\n", conn);
+                        list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
+                                if (index++ == j) {
+                                        peer = list_entry(tmp, ksock_peer_t,
+                                                          ksnp_list);
+                                        atomic_inc (&peer->ksnp_refcount);
+                                        break;
+                                }
+                        }
 
-        /* "But what is the black spot, captain?" I asked.
-         * "That's a summons, mate..." */
+                        read_unlock (&ksocknal_data.ksnd_global_lock);
 
-        LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
-        LASSERT (conn->ksnc_sock->sk->data_ready != ksocknal_data_ready);
-        LASSERT (conn->ksnc_sock->sk->write_space != ksocknal_write_space);
-        LASSERT (conn->ksnc_sock->sk->user_data == NULL);
-        LASSERT (!conn->ksnc_rx_scheduled);
+                        if (peer != NULL) {
+                                rc = 0;
+                                ksocknal_push_peer (peer);
+                                ksocknal_put_peer (peer);
+                        }
+                }
 
-        if (!in_interrupt()) {
-                ksocknal_close_conn (conn);
-                return;
         }
 
-        spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
-
-        list_add (&conn->ksnc_list, &ksocknal_data.ksnd_reaper_list);
-        wake_up (&ksocknal_data.ksnd_reaper_waitq);
-
-        spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
+        return (rc);
 }
 
 int
@@ -531,13 +1162,68 @@ ksocknal_cmd(struct portal_ioctl_data * data, void * private)
         LASSERT (data != NULL);
 
         switch(data->ioc_nal_cmd) {
+        case NAL_CMD_GET_AUTOCONN: {
+                ksock_route_t *route = ksocknal_get_route_by_idx (data->ioc_count);
+
+                if (route == NULL)
+                        rc = -ENOENT;
+                else {
+                        rc = 0;
+                        data->ioc_nid   = route->ksnr_peer->ksnp_nid;
+                        data->ioc_id    = route->ksnr_ipaddr;
+                        data->ioc_misc  = route->ksnr_port;
+                        data->ioc_count = route->ksnr_generation;
+                        data->ioc_size  = route->ksnr_buffer_size;
+                        data->ioc_wait  = route->ksnr_sharecount;
+                        data->ioc_flags = (route->ksnr_nonagel      ? 1 : 0) |
+                                          (route->ksnr_xchange_nids ? 2 : 0) |
+                                          (route->ksnr_irq_affinity ? 4 : 0);
+                        ksocknal_put_route (route);
+                }
+                break;
+        }
+        case NAL_CMD_ADD_AUTOCONN: {
+                rc = ksocknal_add_route (data->ioc_nid, data->ioc_id,
+                                         data->ioc_misc, data->ioc_size,
+                                         (data->ioc_flags & 1) != 0,
+                                         (data->ioc_flags & 2) != 0,
+                                         (data->ioc_flags & 4) != 0,
+                                         (data->ioc_flags & 8) != 0);
+                break;
+        }
+        case NAL_CMD_DEL_AUTOCONN: {
+                rc = ksocknal_del_route (data->ioc_nid, data->ioc_id, 
+                                         (data->ioc_flags & 1) != 0,
+                                         (data->ioc_flags & 2) != 0);
+                break;
+        }
+        case NAL_CMD_GET_CONN: {
+                ksock_conn_t *conn = ksocknal_get_conn_by_idx (data->ioc_count);
+
+                if (conn == NULL)
+                        rc = -ENOENT;
+                else {
+                        rc = 0;
+                        data->ioc_nid  = conn->ksnc_peer->ksnp_nid;
+                        data->ioc_id   = conn->ksnc_ipaddr;
+                        data->ioc_misc = conn->ksnc_port;
+                        ksocknal_put_conn (conn);
+                }
+                break;
+        }
         case NAL_CMD_REGISTER_PEER_FD: {
-                rc = ksocknal_add_sock(data->ioc_nid, data->ioc_fd,
-                                       data->ioc_flags);
+                struct socket *sock = sockfd_lookup (data->ioc_fd, &rc);
+
+                if (sock != NULL) {
+                        rc = ksocknal_create_conn (data->ioc_nid, NULL,
+                                                   sock, data->ioc_flags);
+                        if (rc != 0)
+                                fput (sock->file);
+                }
                 break;
         }
         case NAL_CMD_CLOSE_CONNECTION: {
-                rc = ksocknal_close_sock(data->ioc_nid);
+                rc = ksocknal_close_conn (data->ioc_nid, data->ioc_id);
                 break;
         }
         case NAL_CMD_REGISTER_MYNID: {
@@ -545,7 +1231,7 @@ ksocknal_cmd(struct portal_ioctl_data * data, void * private)
                 break;
         }
         case NAL_CMD_PUSH_CONNECTION: {
-                rc = ksocknal_push_sock (data->ioc_nid);
+                rc = ksocknal_push (data->ioc_nid);
                 break;
         }
         }
@@ -573,6 +1259,7 @@ ksocknal_free_buffers (void)
                                                      SOCKNAL_LARGE_FWD_NMSGS));
         }
 
+        LASSERT (ksocknal_data.ksnd_active_ltxs == 0);
         if (ksocknal_data.ksnd_ltxs != NULL)
                 PORTAL_FREE (ksocknal_data.ksnd_ltxs,
                              sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS +
@@ -581,9 +1268,13 @@ ksocknal_free_buffers (void)
         if (ksocknal_data.ksnd_schedulers != NULL)
                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
+
+        PORTAL_FREE (ksocknal_data.ksnd_peers,
+                     sizeof (struct list_head) * 
+                     ksocknal_data.ksnd_peer_hash_size);
 }
 
-void __exit
+void /*__exit*/
 ksocknal_module_fini (void)
 {
         int   i;
@@ -606,10 +1297,15 @@ ksocknal_module_fini (void)
                 /* fall through */
 
         case SOCKNAL_INIT_DATA:
-                /* Module refcount only gets to zero when all connections
+                /* Module refcount only gets to zero when all peers
                  * have been closed so all lists must be empty */
-                LASSERT (list_empty (&ksocknal_data.ksnd_socklist));
-                LASSERT (list_empty (&ksocknal_data.ksnd_reaper_list));
+                LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
+                LASSERT (ksocknal_data.ksnd_peers != NULL);
+                for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
+                        LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
+                }
+                LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
+                LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
 
@@ -628,6 +1324,7 @@ ksocknal_module_fini (void)
 
                 /* flag threads to terminate; wake and wait for them to die */
                 ksocknal_data.ksnd_shuttingdown = 1;
+                wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
 
                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
@@ -682,8 +1379,16 @@ ksocknal_module_init (void)
 
         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
 
-        INIT_LIST_HEAD(&ksocknal_data.ksnd_socklist);
-        rwlock_init(&ksocknal_data.ksnd_socklist_lock);
+        ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
+        PORTAL_ALLOC (ksocknal_data.ksnd_peers,
+                      sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
+        if (ksocknal_data.ksnd_peers == NULL)
+                RETURN (-ENOMEM);
+
+        for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
+                INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
+
+        rwlock_init(&ksocknal_data.ksnd_global_lock);
 
         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
@@ -702,19 +1407,26 @@ ksocknal_module_init (void)
         init_waitqueue_head(&ksocknal_data.ksnd_idle_ltx_waitq);
 
         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
-        INIT_LIST_HEAD (&ksocknal_data.ksnd_reaper_list);
+        INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
+        INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
 
-        memset (&ksocknal_data.ksnd_irq_info, SOCKNAL_IRQ_UNASSIGNED,
-                sizeof (ksocknal_data.ksnd_irq_info));
+        spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
+        INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
+        init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
+
+        /* NB memset above zeros whole of ksocknal_data, including
+         * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
 
         /* flag lists/ptrs/locks initialised */
         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
 
         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
-        if (ksocknal_data.ksnd_schedulers == NULL)
+        if (ksocknal_data.ksnd_schedulers == NULL) {
+                ksocknal_module_fini ();
                 RETURN(-ENOMEM);
+        }
 
         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
@@ -728,7 +1440,7 @@ ksocknal_module_init (void)
                 init_waitqueue_head (&kss->kss_waitq);
         }
 
-        CERROR ("ltx "LPSZ", total "LPSZ"\n", sizeof (ksock_ltx_t),
+        CDEBUG (D_MALLOC, "ltx "LPSZ", total "LPSZ"\n", sizeof (ksock_ltx_t),
                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
 
         PORTAL_ALLOC(ksocknal_data.ksnd_ltxs,
@@ -745,6 +1457,7 @@ ksocknal_module_init (void)
         for (i = 0; i < SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS; i++) {
                 ksock_ltx_t *ltx = &((ksock_ltx_t *)ksocknal_data.ksnd_ltxs)[i];
 
+                ltx->ltx_tx.tx_hdr = &ltx->ltx_hdr;
                 ltx->ltx_idle = i < SOCKNAL_NLTXS ?
                                 &ksocknal_data.ksnd_idle_ltx_list :
                                 &ksocknal_data.ksnd_idle_nblk_ltx_list;
@@ -772,9 +1485,18 @@ ksocknal_module_init (void)
                 }
         }
 
+        for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
+                rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
+                if (rc != 0) {
+                        CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
+                        ksocknal_module_fini ();
+                        RETURN (rc);
+                }
+        }
+
         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
         if (rc != 0) {
-                CERROR("Can't spawn socknal reaper: %d\n", rc);
+                CERROR ("Can't spawn socknal reaper: %d\n", rc);
                 ksocknal_module_fini ();
                 RETURN (rc);
         }
index 86cdeb0..69daa02 100644 (file)
@@ -50,6 +50,7 @@
 #include <linux/kmod.h>
 #include <asm/uaccess.h>
 #include <asm/segment.h>
+#include <asm/div64.h>
 
 #define DEBUG_SUBSYSTEM S_SOCKNAL
 
 #include <portals/p30.h>
 #include <portals/lib-p30.h>
 
-#define SOCKNAL_N_SCHED num_online_cpus()       /* # socknal schedulers */
+#if CONFIG_SMP
+# define SOCKNAL_N_SCHED        smp_num_cpus    /* # socknal schedulers */
+#else
+# define SOCKNAL_N_SCHED        1               /* # socknal schedulers */
+#endif
+#define SOCKNAL_N_AUTOCONNECTD  4               /* # socknal autoconnect daemons */
+
+#define SOCKNAL_MIN_RECONNECT_INTERVAL HZ      /* first failed connection retry... */
+#define SOCKNAL_MAX_RECONNECT_INTERVAL (60*HZ) /* ...exponentially increasing to this */
+
+#define SOCKNAL_IO_TIMEOUT      (60*HZ)         /* default comms timeout */
+
+#define SOCKNAL_PEER_HASH_SIZE   101            /* # peer lists */
 
 #if PTL_LARGE_MTU
 # define SOCKNAL_MAX_FWD_PAYLOAD (256<<10)      /* biggest payload I can forward */
@@ -65,6 +78,8 @@
 # define SOCKNAL_MAX_FWD_PAYLOAD (64<<10)       /* biggest payload I can forward */
 #endif
 
+#define SOCKNAL_ZC_MIN_FRAG    (2<<10)          /* default smallest zerocopy fragment */
+
 #define SOCKNAL_NLTXS           128             /* # normal transmit messages */
 #define SOCKNAL_NNBLK_LTXS     128             /* # transmit messages reserved if can't block */
 
 
 #define SOCKNAL_RESCHED         100             /* # scheduler loops before reschedule */
 
-#define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sndbuf*8)/10)
+#define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sk_sndbuf*8)/10)
+
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+# define jiffies_64  jiffies
+#endif
+
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,72))
+# define sk_data_ready data_ready
+# define sk_write_space write_space
+# define sk_user_data   user_data
+# define sk_prot        prot
+# define sk_sndbuf      sndbuf
+# define sk_socket      socket
+#endif
 
 typedef struct                                  /* pool of forwarding buffers */
 {
@@ -101,10 +129,17 @@ typedef struct                                  /* per scheduler state */
 } ksock_sched_t;
 
 typedef struct {
+        int               ksni_valid:1;         /* been set yet? */
+        int               ksni_bound:1;         /* bound to a cpu yet? */
+        int               ksni_sched:6;         /* which scheduler (assumes < 64) */
+} ksock_irqinfo_t;
+
+typedef struct {
         int               ksnd_init;            /* initialisation state */
         
-        struct list_head  ksnd_socklist;        /* all my connections */
-        rwlock_t          ksnd_socklist_lock;   /* stabilise add/find/remove */
+        rwlock_t          ksnd_global_lock;     /* stabilize peer/conn ops */
+        struct list_head *ksnd_peers;           /* hash table of all my known peers */
+        int               ksnd_peer_hash_size;  /* size of ksnd_peers */
 
         nal_cb_t         *ksnd_nal_cb;
         spinlock_t        ksnd_nal_cb_lock;     /* lib cli/sti lock */
@@ -112,7 +147,10 @@ typedef struct {
         atomic_t          ksnd_nthreads;        /* # live threads */
         int               ksnd_shuttingdown;    /* tell threads to exit */
         ksock_sched_t    *ksnd_schedulers;      /* scheduler state */
-        
+
+        atomic_t          ksnd_npeers;          /* total # peers extant */
+        atomic_t          ksnd_nclosing_conns;  /* # closed conns extant */
+
         kpr_router_t      ksnd_router;          /* THE router */
 
         void             *ksnd_fmbs;            /* all the pre-allocated FMBs */
@@ -124,11 +162,21 @@ typedef struct {
         struct list_head  ksnd_idle_ltx_list;   /* where to get an idle LTX */
         struct list_head  ksnd_idle_nblk_ltx_list; /* where to get an idle LTX if you can't block */
         wait_queue_head_t ksnd_idle_ltx_waitq;  /* where to block for an idle LTX */
+        int               ksnd_active_ltxs;     /* #active ltxs */
 
-        struct list_head  ksnd_reaper_list;     /* conn waiting to be reaped */
-        wait_queue_head_t ksnd_reaper_waitq;    /* reaper sleeps here */
+        struct list_head  ksnd_deathrow_conns;  /* conns to be closed */
+        struct list_head  ksnd_zombie_conns;    /* conns to be freed */
+        wait_queue_head_t ksnd_reaper_waitq;    /* reaper sleep here */
         spinlock_t        ksnd_reaper_lock;     /* serialise */
-        unsigned char     ksnd_irq_info[NR_IRQS]; /* irq->scheduler lookup */
+
+        int               ksnd_stall_tx;        /* test sluggish sender */
+        int               ksnd_stall_rx;        /* test sluggish receiver */
+
+        struct list_head  ksnd_autoconnectd_routes; /* routes waiting to be connected */
+        wait_queue_head_t ksnd_autoconnectd_waitq; /* autoconnectds sleep here */
+        spinlock_t        ksnd_autoconnectd_lock; /* serialise */
+
+        ksock_irqinfo_t   ksnd_irqinfo[NR_IRQS];/* irq->scheduler lookup */
 } ksock_nal_data_t;
 
 #define SOCKNAL_INIT_NOTHING    0
@@ -136,10 +184,6 @@ typedef struct {
 #define SOCKNAL_INIT_PTL        2
 #define SOCKNAL_INIT_ALL        3
 
-#define SOCKNAL_IRQ_BOUND       0x80            /* flag we _did_ bind already */
-#define SOCKNAL_IRQ_SCHED_MASK 0x7f            /* we assume < 127 CPUs */
-#define SOCKNAL_IRQ_UNASSIGNED  0xff            /* flag unassigned */
-
 /* A packet just assembled for transmission is represented by 1 or more
  * struct iovec fragments and 0 or more ptl_kiov_t fragments.  Forwarded
  * messages, or messages from an MD with PTL_MD_KIOV _not_ set have 0
@@ -154,17 +198,24 @@ typedef struct {
  * Otherwise up to PTL_MD_MAX_IOV ptl_kiov_t fragments are used.
  */
 
+struct ksock_conn;                              /* forward ref */
+struct ksock_peer;                              /* forward ref */
+struct ksock_route;                             /* forward ref */
+
 typedef struct                                  /* transmit packet */
 {
         struct list_head        tx_list;        /* queue on conn for transmission etc */
+        __u64                   tx_deadline;    /* when (in jiffies) tx times out */
         char                    tx_isfwd;       /* forwarding / sourced here */
         int                     tx_nob;         /* # packet bytes */
+        int                     tx_resid;       /* residual bytes */
         int                     tx_niov;        /* # packet iovec frags */
         struct iovec           *tx_iov;         /* packet iovec frags */
         int                     tx_nkiov;       /* # packet page frags */
         ptl_kiov_t             *tx_kiov;        /* packet page frags */
+        struct ksock_conn      *tx_conn;        /* owning conn */
+        ptl_hdr_t              *tx_hdr;         /* packet header (for debug only) */
 #if SOCKNAL_ZC        
-        ksock_sched_t          *tx_sched;       /* who to wake on callback */
         zccd_t                  tx_zccd;        /* zero copy callback descriptor */
 #endif
 } ksock_tx_t;
@@ -200,8 +251,7 @@ typedef struct                                  /* locally transmitted packet */
 /* local packets (lib->socknal) embedded in ksock_ltx_t::ltx_tx */
 
 /* NB list_entry() is used here as convenient macro for calculating a
- * pointer to a struct from the address of a member.
- */
+ * pointer to a struct from the address of a member. */
 
 typedef struct                                  /* Kernel portals Socket Forwarding message buffer */
 {                                               /* (socknal->router) */
@@ -209,6 +259,7 @@ typedef struct                                  /* Kernel portals Socket Forward
         kpr_fwd_desc_t          fmb_fwd;        /* router's descriptor */
         int                     fmb_npages;     /* # pages allocated */
         ksock_fmb_pool_t       *fmb_pool;       /* owning pool */
+        struct ksock_peer      *fmb_peer;       /* peer received from */
         struct page            *fmb_pages[SOCKNAL_LARGE_FWD_PAGES];
         struct iovec            fmb_iov[SOCKNAL_LARGE_FWD_PAGES];
 } ksock_fmb_t;
@@ -227,20 +278,24 @@ typedef union {
 #define SOCKNAL_RX_GET_FMB      5               /* scheduled for forwarding */
 #define SOCKNAL_RX_FMB_SLEEP    6               /* blocked waiting for a fwd desc */
 
-typedef struct 
+typedef struct ksock_conn
 { 
-        struct list_head    ksnc_list;          /* stash on global socket list */
-        struct file        *ksnc_file;          /* socket filp */
+        struct ksock_peer  *ksnc_peer;          /* owning peer */
+        struct ksock_route *ksnc_route;         /* owning route */
+        struct list_head    ksnc_list;          /* stash on peer's conn list */
         struct socket      *ksnc_sock;          /* actual socket */
         void               *ksnc_saved_data_ready; /* socket's original data_ready() callback */
         void               *ksnc_saved_write_space; /* socket's original write_space() callback */
-        ptl_nid_t           ksnc_peernid;       /* who's on the other end */
         atomic_t            ksnc_refcount;      /* # users */
         ksock_sched_t     *ksnc_scheduler;     /* who schedules this connection */
-        
+        __u32               ksnc_ipaddr;        /* peer's IP */
+        int                 ksnc_port;          /* peer's port */
+        int                 ksnc_closing;       /* being shut down */
+
         /* READER */
         struct list_head    ksnc_rx_list;       /* where I enq waiting input or a forwarding descriptor */
-        volatile int        ksnc_rx_ready;      /* data ready to read */
+        __u64               ksnc_rx_deadline;   /* when receive times out */
+        int                 ksnc_rx_ready;      /* data ready to read */
         int                 ksnc_rx_scheduled;  /* being progressed */
         int                 ksnc_rx_state;      /* what is being read */
         int                 ksnc_rx_nob_left;   /* # bytes to next hdr/body  */
@@ -256,37 +311,104 @@ typedef struct
         /* WRITER */
         struct list_head    ksnc_tx_list;       /* where I enq waiting for output space */
         struct list_head    ksnc_tx_queue;      /* packets waiting to be sent */
-        volatile int        ksnc_tx_ready;      /* write space */
+#if SOCKNAL_ZC
+        struct list_head    ksnc_tx_pending;    /* zc packets pending callback */
+#endif
+        atomic_t            ksnc_tx_nob;        /* # bytes queued */
+        int                 ksnc_tx_ready;      /* write space */
         int                 ksnc_tx_scheduled;  /* being progressed */
-
 } ksock_conn_t;
 
-extern int ksocknal_add_sock (ptl_nid_t nid, int fd, int client);
-extern int ksocknal_close_sock(ptl_nid_t nid);
-extern int ksocknal_set_mynid(ptl_nid_t nid);
-extern int ksocknal_push_sock(ptl_nid_t nid);
-extern ksock_conn_t *ksocknal_get_conn (ptl_nid_t nid);
-extern void _ksocknal_put_conn (ksock_conn_t *conn);
-extern void ksocknal_close_conn (ksock_conn_t *conn);
+typedef struct ksock_route
+{
+        struct list_head    ksnr_list;          /* chain on peer route list */
+        struct list_head    ksnr_connect_list;  /* chain on autoconnect list */
+        struct ksock_peer  *ksnr_peer;          /* owning peer */
+        atomic_t            ksnr_refcount;      /* # users */
+        int                 ksnr_sharecount;    /* lconf usage counter */
+        __u64               ksnr_timeout;       /* when reconnection can happen next */
+        unsigned int        ksnr_retry_interval; /* how long between retries */
+        __u32               ksnr_ipaddr;        /* an IP address for this peer */
+        int                 ksnr_port;          /* port to connect to */
+        int                 ksnr_buffer_size;   /* size of socket buffers */
+        unsigned int        ksnr_irq_affinity:1; /* set affinity? */
+        unsigned int        ksnr_xchange_nids:1; /* do hello protocol? */
+        unsigned int        ksnr_nonagel:1;     /* disable nagle? */
+        unsigned int        ksnr_connecting;    /* autoconnect in progress? */
+        unsigned int        ksnr_deleted;       /* been removed from peer? */
+        int                 ksnr_generation;    /* connection incarnation # */
+        ksock_conn_t       *ksnr_conn;          /* NULL/active connection */
+} ksock_route_t;
+
+typedef struct ksock_peer
+{
+        struct list_head    ksnp_list;          /* stash on global peer list */
+        ptl_nid_t           ksnp_nid;           /* who's on the other end(s) */
+        atomic_t            ksnp_refcount;      /* # users */
+        int                 ksnp_closing;       /* being closed */
+        struct list_head    ksnp_conns;         /* all active connections */
+        struct list_head    ksnp_routes;        /* routes */
+        struct list_head    ksnp_tx_queue;      /* waiting packets */
+} ksock_peer_t;
 
-static inline void
-ksocknal_put_conn (ksock_conn_t *conn)
+
+
+extern nal_cb_t         ksocknal_lib;
+extern ksock_nal_data_t ksocknal_data;
+
+static inline struct list_head *
+ksocknal_nid2peerlist (ptl_nid_t nid) 
 {
-        CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
-                conn, conn->ksnc_peernid, atomic_read (&conn->ksnc_refcount));
+        unsigned int hash = ((unsigned int)nid) % ksocknal_data.ksnd_peer_hash_size;
+        
+        return (&ksocknal_data.ksnd_peers [hash]);
+}
 
-        if (atomic_dec_and_test (&conn->ksnc_refcount))
-                _ksocknal_put_conn (conn);
+static inline int
+ksocknal_getconnsock (ksock_conn_t *conn) 
+{
+        int   rc = -ESHUTDOWN;
+        
+        read_lock (&ksocknal_data.ksnd_global_lock);
+        if (!conn->ksnc_closing) {
+                rc = 0;
+                get_file (conn->ksnc_sock->file);
+        }
+        read_unlock (&ksocknal_data.ksnd_global_lock);
+
+        return (rc);
 }
 
+static inline void
+ksocknal_putconnsock (ksock_conn_t *conn)
+{
+        fput (conn->ksnc_sock->file);
+}
+
+extern void ksocknal_put_route (ksock_route_t *route);
+extern void ksocknal_put_peer (ksock_peer_t *peer);
+extern ksock_peer_t *ksocknal_find_peer_locked (ptl_nid_t nid);
+extern ksock_peer_t *ksocknal_get_peer (ptl_nid_t nid);
+extern int ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr,
+                               int single, int keep_conn);
+extern int ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
+                                 struct socket *sock, int bind_irq);
+extern void ksocknal_close_conn_locked (ksock_conn_t *conn);
+extern int ksocknal_close_conn_unlocked (ksock_conn_t *conn);
+extern void ksocknal_terminate_conn (ksock_conn_t *conn);
+extern void ksocknal_destroy_conn (ksock_conn_t *conn);
+extern void ksocknal_put_conn (ksock_conn_t *conn);
+extern int ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr);
+
+extern void ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn);
+extern void ksocknal_tx_done (ksock_tx_t *tx, int asynch);
+extern void ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd);
+extern void ksocknal_fmb_callback (void *arg, int error);
 extern int ksocknal_thread_start (int (*fn)(void *arg), void *arg);
 extern int ksocknal_new_packet (ksock_conn_t *conn, int skip);
-extern void ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd);
 extern int ksocknal_scheduler (void *arg);
-extern int ksocknal_reaper (void *arg);
 extern void ksocknal_data_ready(struct sock *sk, int n);
 extern void ksocknal_write_space(struct sock *sk);
-
-
-extern nal_cb_t         ksocknal_lib;
-extern ksock_nal_data_t ksocknal_data;
+extern int ksocknal_autoconnectd (void *arg);
+extern int ksocknal_reaper (void *arg);
+extern int ksocknal_set_linger (struct socket *sock);
index 6147d8a..3341596 100644 (file)
 
 #include "socknal.h"
 
-atomic_t   ksocknal_packets_received;
-atomic_t   ksocknal_packets_launched;
-atomic_t   ksocknal_packets_being_sent;
-
+int        ksocknal_io_timeout = SOCKNAL_IO_TIMEOUT;
 #if SOCKNAL_ZC
 int        ksocknal_do_zc = 1;
-int        ksocknal_zc_min_frag = 2048;
+int        ksocknal_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
 #endif
 
 /*
@@ -127,7 +124,7 @@ ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
 int
 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
 {
-        /* I would guess that if ksocknal_get_conn(nid) == NULL,
+        /* I would guess that if ksocknal_get_peer (nid) == NULL,
            and we're not routing, then 'nid' is very distant :) */
         if ( nal->ni.nid == nid ) {
                 *dist = 0;
@@ -141,7 +138,7 @@ ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
 ksock_ltx_t *
 ksocknal_get_ltx (int may_block)
 {
-        long             flags;
+        unsigned long flags;
         ksock_ltx_t *ltx = NULL;
 
         for (;;) {
@@ -151,6 +148,7 @@ ksocknal_get_ltx (int may_block)
                         ltx = list_entry(ksocknal_data.ksnd_idle_ltx_list.next,
                                          ksock_ltx_t, ltx_tx.tx_list);
                         list_del (&ltx->ltx_tx.tx_list);
+                        ksocknal_data.ksnd_active_ltxs++;
                         break;
                 }
 
@@ -159,6 +157,7 @@ ksocknal_get_ltx (int may_block)
                                 ltx = list_entry(ksocknal_data.ksnd_idle_nblk_ltx_list.next,
                                                  ksock_ltx_t, ltx_tx.tx_list);
                                 list_del (&ltx->ltx_tx.tx_list);
+                                ksocknal_data.ksnd_active_ltxs++;
                         }
                         break;
                 }
@@ -175,6 +174,24 @@ ksocknal_get_ltx (int may_block)
         return (ltx);
 }
 
+void
+ksocknal_put_ltx (ksock_ltx_t *ltx)
+{
+        unsigned long   flags;
+        
+        spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
+
+        ksocknal_data.ksnd_active_ltxs--;
+        list_add_tail (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
+
+        /* normal tx desc => wakeup anyone blocking for one */
+        if (ltx->ltx_idle == &ksocknal_data.ksnd_idle_ltx_list &&
+            waitqueue_active (&ksocknal_data.ksnd_idle_ltx_waitq))
+                wake_up (&ksocknal_data.ksnd_idle_ltx_waitq);
+
+        spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
+}
+
 #if SOCKNAL_ZC
 struct page *
 ksocknal_kvaddr_to_page (unsigned long vaddr)
@@ -202,11 +219,15 @@ ksocknal_kvaddr_to_page (unsigned long vaddr)
 #endif
 
 int
-ksocknal_send_iov (struct socket *sock, ksock_tx_t *tx, int more)
+ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
+        struct socket *sock = conn->ksnc_sock;
         struct iovec  *iov = tx->tx_iov;
         int            fragsize = iov->iov_len;
         unsigned long  vaddr = (unsigned long)iov->iov_base;
+        int            more = !list_empty (&conn->ksnc_tx_queue) |
+                              (tx->tx_niov > 1) |
+                              (tx->tx_nkiov > 1);
 #if SOCKNAL_ZC
         int            offset = vaddr & (PAGE_SIZE - 1);
         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
@@ -216,9 +237,8 @@ ksocknal_send_iov (struct socket *sock, ksock_tx_t *tx, int more)
 
         /* NB we can't trust socket ops to either consume our iovs
          * or leave them alone, so we only send 1 frag at a time. */
-        LASSERT (fragsize <= tx->tx_nob);
+        LASSERT (fragsize <= tx->tx_resid);
         LASSERT (tx->tx_niov > 0);
-        more |= (tx->tx_niov > 1);
         
 #if SOCKNAL_ZC
         if (ksocknal_do_zc &&
@@ -230,7 +250,10 @@ ksocknal_send_iov (struct socket *sock, ksock_tx_t *tx, int more)
                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
                        (void *)vaddr, page, page_address(page), offset, zcsize);
 
-                more |= (zcsize < fragsize);
+                if (fragsize > zcsize) {
+                        more = 1;
+                        fragsize = zcsize;
+                }
 
                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
@@ -253,44 +276,46 @@ ksocknal_send_iov (struct socket *sock, ksock_tx_t *tx, int more)
                 mm_segment_t oldmm = get_fs();
                 
                 set_fs (KERNEL_DS);
-                rc = sock->sk->prot->sendmsg(sock->sk, &msg, fragsize);
+                rc = sock_sendmsg(sock, &msg, fragsize);
                 set_fs (oldmm);
         } 
 
         if (rc <= 0)
                 return (rc);
 
-        tx->tx_nob -= rc;
+        tx->tx_resid -= rc;
 
-        if (rc < fragsize) {
-                /* didn't send whole frag */
+        if (rc < iov->iov_len) {
+                /* didn't send whole iov entry... */
                 iov->iov_base = (void *)(vaddr + rc);
-                iov->iov_len  = fragsize - rc;
-                return (-EAGAIN);
+                iov->iov_len -= rc;
+                /* ...but did we send everything we tried to send? */
+                return ((rc == fragsize) ? 1 : -EAGAIN);
         }
 
-        /* everything went */
-        LASSERT (rc == fragsize);
         tx->tx_iov++;
         tx->tx_niov--;
         return (1);
 }
 
 int
-ksocknal_send_kiov (struct socket *sock, ksock_tx_t *tx, int more)
+ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
+        struct socket *sock = conn->ksnc_sock;
         ptl_kiov_t    *kiov = tx->tx_kiov;
         int            fragsize = kiov->kiov_len;
         struct page   *page = kiov->kiov_page;
         int            offset = kiov->kiov_offset;
+        int            more = !list_empty (&conn->ksnc_tx_queue) |
+                              (tx->tx_nkiov > 1);
         int            rc;
 
         /* NB we can't trust socket ops to either consume our iovs
          * or leave them alone, so we only send 1 frag at a time. */
-        LASSERT (fragsize <= tx->tx_nob);
+        LASSERT (fragsize <= tx->tx_resid);
         LASSERT (offset + fragsize <= PAGE_SIZE);
+        LASSERT (tx->tx_niov == 0);
         LASSERT (tx->tx_nkiov > 0);
-        more |= (tx->tx_nkiov > 1);
 
 #if SOCKNAL_ZC
         if (ksocknal_do_zc &&
@@ -322,7 +347,7 @@ ksocknal_send_kiov (struct socket *sock, ksock_tx_t *tx, int more)
                 mm_segment_t  oldmm = get_fs();
                 
                 set_fs (KERNEL_DS);
-                rc = sock->sk->prot->sendmsg(sock->sk, &msg, fragsize);
+                rc = sock_sendmsg(sock, &msg, fragsize);
                 set_fs (oldmm);
                 kunmap (page);
         }
@@ -330,8 +355,8 @@ ksocknal_send_kiov (struct socket *sock, ksock_tx_t *tx, int more)
         if (rc <= 0)
                 return (rc);
 
-        tx->tx_nob -= rc;
-
+        tx->tx_resid -= rc;
         if (rc < fragsize) {
                 /* didn't send whole frag */
                 kiov->kiov_offset = offset + rc;
@@ -347,29 +372,52 @@ ksocknal_send_kiov (struct socket *sock, ksock_tx_t *tx, int more)
 }
 
 int
-ksocknal_sendmsg (struct socket *sock, ksock_tx_t *tx, int more)
+ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
 {
-        int    rc;
-        int    sent_some = 0;
+        /* Return 0 on success, < 0 on error.
+         * caller checks tx_resid to determine progress/completion */
+        int      rc;
         ENTRY;
         
-        LASSERT (!in_interrupt());
+        if (ksocknal_data.ksnd_stall_tx != 0) {
+                set_current_state (TASK_UNINTERRUPTIBLE);
+                schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
+        }
+
+        rc = ksocknal_getconnsock (conn);
+        if (rc != 0)
+                return (rc);
 
         for (;;) {
+                LASSERT (tx->tx_resid != 0);
+
+                if (conn->ksnc_closing) {
+                        rc = -ESHUTDOWN;
+                        break;
+                }
+
                 if (tx->tx_niov != 0)
-                        rc = ksocknal_send_iov (sock, tx, more || tx->tx_nkiov != 0);
+                        rc = ksocknal_send_iov (conn, tx);
                 else
-                        rc = ksocknal_send_kiov (sock, tx, more);
-
-                /* Interpret a zero rc the same as -EAGAIN (Adaptech TOE) */
-                if (rc <= 0)                    /* error or partial send */
-                        RETURN ((sent_some || rc == -EAGAIN) ? 0 : rc);
-                
-                if (tx->tx_nob == 0)            /* sent everything */
-                        RETURN (0);
+                        rc = ksocknal_send_kiov (conn, tx);
+
+                if (rc <= 0) {                  /* error or socket full? */
+                        /* NB: rc == 0 and rc == -EAGAIN both mean try
+                         * again later (linux stack returns -EAGAIN for
+                         * this, but Adaptech TOE returns 0) */
+                        if (rc == -EAGAIN)
+                                rc = 0;
+                        break;
+                }
 
-                sent_some = 1;
+                if (tx->tx_resid == 0) {        /* sent everything */
+                        rc = 0;
+                        break;
+                }
         }
+
+        ksocknal_putconnsock (conn);
+        RETURN (rc);
 }
 
 int
@@ -414,7 +462,6 @@ ksocknal_recv_iov (ksock_conn_t *conn)
                 return (-EAGAIN);
         }
 
-        LASSERT (rc == fragsize);
         conn->ksnc_rx_iov++;
         conn->ksnc_rx_niov--;
         return (1);
@@ -466,7 +513,6 @@ ksocknal_recv_kiov (ksock_conn_t *conn)
                 return (-EAGAIN);
         }
 
-        LASSERT (rc == fragsize);
         conn->ksnc_rx_kiov++;
         conn->ksnc_rx_nkiov--;
         return (1);
@@ -475,31 +521,47 @@ ksocknal_recv_kiov (ksock_conn_t *conn)
 int
 ksocknal_recvmsg (ksock_conn_t *conn) 
 {
-        int    rc;
-        int    got_some = 0;
+        /* Return 1 on success, 0 on EOF, < 0 on error.
+         * Caller checks ksnc_rx_nob_wanted to determine
+         * progress/completion. */
+        int     rc;
         ENTRY;
         
-        LASSERT (!in_interrupt ());
+        if (ksocknal_data.ksnd_stall_rx != 0) {
+                set_current_state (TASK_UNINTERRUPTIBLE);
+                schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
+        }
+
+        rc = ksocknal_getconnsock (conn);
+        if (rc != 0)
+                return (rc);
 
         for (;;) {
-                LASSERT (conn->ksnc_rx_nob_wanted > 0);
+                if (conn->ksnc_closing) {
+                        rc = -ESHUTDOWN;
+                        break;
+                }
                 
                 if (conn->ksnc_rx_niov != 0)
                         rc = ksocknal_recv_iov (conn);
                 else
                         rc = ksocknal_recv_kiov (conn);
-
-                /* CAVEAT EMPTOR: we return...
-                 * <= 0 for error (0 == EOF) and > 0 for success (unlike sendmsg()) */
-
-                if (rc <= 0)                    /* error/EOF or partial receive */
-                        RETURN ((got_some || rc == -EAGAIN) ? 1 : rc);
                 
-                if (conn->ksnc_rx_nob_wanted == 0)
-                        RETURN (1);
+                if (rc <= 0) {
+                        /* error/EOF or partial receive */
+                        if (rc == -EAGAIN)
+                                rc = 1;
+                        break;
+                }
 
-                got_some = 0;
+                if (conn->ksnc_rx_nob_wanted == 0) {
+                        rc = 1;
+                        break;
+                }
         }
+
+        ksocknal_putconnsock (conn);
+        RETURN (rc);
 }
 
 #if SOCKNAL_ZC
@@ -507,7 +569,7 @@ void
 ksocknal_zc_callback (zccd_t *zcd)
 {
         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
-        ksock_sched_t *sched = tx->tx_sched;
+        ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
         unsigned long  flags;
         ENTRY;
 
@@ -515,6 +577,7 @@ ksocknal_zc_callback (zccd_t *zcd)
 
         spin_lock_irqsave (&sched->kss_lock, flags);
 
+        list_del (&tx->tx_list); /* remove from kss_zctxpending_list */
         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
         if (waitqueue_active (&sched->kss_waitq))
                 wake_up (&sched->kss_waitq);
@@ -525,13 +588,24 @@ ksocknal_zc_callback (zccd_t *zcd)
 #endif
 
 void
-ksocknal_tx_done (ksock_tx_t *tx)
+ksocknal_tx_done (ksock_tx_t *tx, int asynch)
 {
-        long           flags;
         ksock_ltx_t   *ltx;
         ENTRY;
 
-        atomic_dec (&ksocknal_packets_being_sent);
+        if (tx->tx_conn != NULL) {
+                /* This tx got queued on a conn; do the accounting... */
+                atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
+#if SOCKNAL_ZC
+                /* zero copy completion isn't always from
+                 * process_transmit() so it needs to keep a ref on
+                 * tx_conn... */
+                if (asynch)
+                        ksocknal_put_conn (tx->tx_conn);
+#else
+                LASSERT (!asynch);
+#endif
+        }
 
         if (tx->tx_isfwd) {             /* was a forwarded packet? */
                 kpr_fwd_done (&ksocknal_data.ksnd_router,
@@ -545,21 +619,45 @@ ksocknal_tx_done (ksock_tx_t *tx)
 
         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
 
-        spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
-
-        list_add_tail (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
-
-        /* normal tx desc => wakeup anyone blocking for one */
-        if (ltx->ltx_idle == &ksocknal_data.ksnd_idle_ltx_list &&
-            waitqueue_active (&ksocknal_data.ksnd_idle_ltx_waitq))
-                wake_up (&ksocknal_data.ksnd_idle_ltx_waitq);
-
-        spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
+        ksocknal_put_ltx (ltx);
         EXIT;
 }
 
 void
-ksocknal_process_transmit (ksock_sched_t *sched, long *irq_flags)
+ksocknal_tx_launched (ksock_tx_t *tx) 
+{
+#if SOCKNAL_ZC
+        if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
+                unsigned long  flags;
+                ksock_conn_t  *conn = tx->tx_conn;
+                ksock_sched_t *sched = conn->ksnc_scheduler;
+                
+                /* zccd skbufs are still in-flight.  First take a ref on
+                 * conn, so it hangs about for ksocknal_tx_done... */
+                atomic_inc (&conn->ksnc_refcount);
+
+                /* Stash it for timeout...
+                 * NB We have to hold a lock to stash the tx, and we have
+                 * stash it before we zcc_put(), but we have to _not_ hold
+                 * this lock when we zcc_put(), otherwise we could deadlock
+                 * if it turns out to be the last put. Aaaaarrrrggghhh! */
+                spin_lock_irqsave (&sched->kss_lock, flags);
+                list_add_tail (&tx->tx_list, &conn->ksnc_tx_pending);
+                spin_unlock_irqrestore (&sched->kss_lock, flags);
+                
+                /* ...then drop the initial ref on zccd, so the zero copy
+                 * callback can occur */
+                zccd_put (&tx->tx_zccd);
+                return;
+        }
+#endif
+        /* Any zero-copy-ness (if any) has completed; I can complete the
+         * transmit now, avoiding an extra schedule */
+        ksocknal_tx_done (tx, 0);
+}
+
+void
+ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
 {
         ksock_conn_t *conn;
         ksock_tx_t *tx;
@@ -578,42 +676,32 @@ ksocknal_process_transmit (ksock_sched_t *sched, long *irq_flags)
 
         spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
 
-        LASSERT (tx->tx_nob > 0);
+        LASSERT (tx->tx_resid > 0);
 
         conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */
         mb();                   /* => clear BEFORE trying to write */
 
-        rc = ksocknal_sendmsg (conn->ksnc_sock, tx, 
-                               !list_empty (&conn->ksnc_tx_queue)); /* more to come? */
+        rc = ksocknal_sendmsg (conn, tx);
 
-        CDEBUG (D_NET, "send(%d) %d\n", tx->tx_nob, rc);
+        CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
 
         if (rc != 0) {
-#warning FIXME: handle socket errors properly
-                CERROR("Error socknal send(%d) %p: %d\n", tx->tx_nob, conn, rc);
-                /* kid on for now the whole packet went.
-                 * NB when we handle the error better, we'll still need to
-                 * block for zccd completion.
-                 */
-                tx->tx_nob = 0;
-        }
+                if (ksocknal_close_conn_unlocked (conn)) {
+                        /* I'm the first to close */
+                        CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
+                                conn, rc, conn->ksnc_peer->ksnp_nid,
+                                conn->ksnc_ipaddr, conn->ksnc_port);
+                }
+                ksocknal_tx_launched (tx);
+                spin_lock_irqsave (&sched->kss_lock, *irq_flags);
 
-        if (tx->tx_nob == 0)                    /* nothing left to send */
-        {
-                /* everything went; assume more can go, so prevent write_space locking */
-                conn->ksnc_tx_ready = 1;
+        } else if (tx->tx_resid == 0) {  
 
-                ksocknal_put_conn (conn);       /* release packet's ref */
-                atomic_inc (&ksocknal_packets_being_sent);
-#if SOCKNAL_ZC
-                if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
-                        /* zccd skbufs are still in-flight.  Release my
-                         * initial ref on zccd, so callback can occur */
-                        zccd_put (&tx->tx_zccd);
-                } else
-#endif
-                        ksocknal_tx_done (tx);
+                /* everything went; assume more can go, and avoid
+                 * write_space locking */
+                conn->ksnc_tx_ready = 1;
 
+                ksocknal_tx_launched (tx);
                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
         } else {
                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
@@ -622,84 +710,247 @@ ksocknal_process_transmit (ksock_sched_t *sched, long *irq_flags)
                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
         }
 
-        if (!conn->ksnc_tx_ready ||             /* no space to write now */
-            list_empty (&conn->ksnc_tx_queue)) {/* nothing to write */
-                conn->ksnc_tx_scheduled = 0;    /* not being scheduled */
-                ksocknal_put_conn (conn);       /* release scheduler's ref */
-        } else                                 /* let scheduler call me again */
+        /* no space to write, or nothing to write? */
+        if (!conn->ksnc_tx_ready ||
+            list_empty (&conn->ksnc_tx_queue)) {
+                /* mark not scheduled */
+                conn->ksnc_tx_scheduled = 0;
+                /* drop scheduler's ref */
+                ksocknal_put_conn (conn);
+        } else {
+                /* stay scheduled */
                 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
+        }
+}
+
+void
+ksocknal_launch_autoconnect_locked (ksock_route_t *route)
+{
+        unsigned long     flags;
+
+        /* called holding write lock on ksnd_global_lock */
+
+        LASSERT (route->ksnr_conn == NULL);
+        LASSERT (!route->ksnr_deleted && !route->ksnr_connecting);
+        
+        route->ksnr_connecting = 1;
+        atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
+        
+        spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
+        
+        list_add_tail (&route->ksnr_connect_list,
+                       &ksocknal_data.ksnd_autoconnectd_routes);
+
+        if (waitqueue_active (&ksocknal_data.ksnd_autoconnectd_waitq))
+                wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
+        
+        spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
+}
+
+ksock_peer_t *
+ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
+{
+        ptl_nid_t     target_nid;
+        int           rc;
+        ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
+        
+        if (peer != NULL)
+                return (peer);
+        
+        if (tx->tx_isfwd) {
+                CERROR ("Can't send packet to "LPX64
+                        ": routed target is not a peer\n", nid);
+                return (NULL);
+        }
+        
+        rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, &target_nid);
+        if (rc != 0) {
+                CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
+                return (NULL);
+        }
+
+        peer = ksocknal_find_peer_locked (target_nid);
+        if (peer != NULL)
+                return (peer);
+
+        CERROR ("Can't send packet to "LPX64": no peer entry\n", target_nid);
+        return (NULL);
+}
+
+ksock_conn_t *
+ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) 
+{
+        struct list_head *tmp;
+        ksock_conn_t     *conn = NULL;
+
+        /* Find the conn with the shortest tx queue */
+        list_for_each (tmp, &peer->ksnp_conns) {
+                ksock_conn_t *c = list_entry (tmp, ksock_conn_t, ksnc_list);
+
+                LASSERT (!c->ksnc_closing);
+                
+                if (conn == NULL ||
+                    atomic_read (&conn->ksnc_tx_nob) >
+                    atomic_read (&c->ksnc_tx_nob))
+                        conn = c;
+        }
+
+        return (conn);
 }
 
 void
-ksocknal_launch_packet (ksock_conn_t *conn, ksock_tx_t *tx)
+ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 {
         unsigned long  flags;
         ksock_sched_t *sched = conn->ksnc_scheduler;
 
-        /* Ensure the frags we've been given EXACTLY match the number of
-         * bytes we want to send.  Many TCP/IP stacks disregard any total
-         * size parameters passed to them and just look at the frags. 
-         *
-         * We always expect at least 1 mapped fragment containing the
-         * complete portals header.
-         */
-        LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
-                 lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
-        LASSERT (tx->tx_niov >= 1);
-        LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
-        
-        CDEBUG (D_NET, "type %d, nob %d niov %d nkiov %d\n",
-                ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, tx->tx_nob, 
-                tx->tx_niov, tx->tx_nkiov);
+        /* called holding global lock (read or irq-write) */
+
+        CDEBUG (D_NET, "Sending to "LPX64" on port %d\n", 
+                conn->ksnc_peer->ksnp_nid, conn->ksnc_port);
+
+        atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
+        tx->tx_resid = tx->tx_nob;
+        tx->tx_conn = conn;
 
 #if SOCKNAL_ZC
         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
-        /* NB this sets 1 ref on zccd, so the callback can only occur
-         * after I've released this ref */
-        tx->tx_sched = sched;
+        /* NB this sets 1 ref on zccd, so the callback can only occur after
+         * I've released this ref. */
 #endif
-        spin_lock_irqsave (&sched->kss_lock, flags);
 
+        spin_lock_irqsave (&sched->kss_lock, flags);
+                
+        tx->tx_deadline = jiffies_64 + ksocknal_io_timeout;
         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
-
-        if (conn->ksnc_tx_ready &&              /* able to send */
-            !conn->ksnc_tx_scheduled) {          /* not scheduled to send */
-                list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
+                
+        if (conn->ksnc_tx_ready &&      /* able to send */
+            !conn->ksnc_tx_scheduled) { /* not scheduled to send */
+                /* +1 ref for scheduler */
+                atomic_inc (&conn->ksnc_refcount);
+                list_add_tail (&conn->ksnc_tx_list, 
+                               &sched->kss_tx_conns);
                 conn->ksnc_tx_scheduled = 1;
-                atomic_inc (&conn->ksnc_refcount); /* extra ref for scheduler */
                 if (waitqueue_active (&sched->kss_waitq))
                         wake_up (&sched->kss_waitq);
         }
 
         spin_unlock_irqrestore (&sched->kss_lock, flags);
+}
 
-        atomic_inc (&ksocknal_packets_launched);
+ksock_route_t *
+ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
+{
+        struct list_head  *tmp;
+        ksock_route_t     *route;
+        
+        list_for_each (tmp, &peer->ksnp_routes) {
+                route = list_entry (tmp, ksock_route_t, ksnr_list);
+                
+                if (route->ksnr_conn == NULL && /* not connected */
+                    !route->ksnr_connecting &&  /* not connecting */
+                    route->ksnr_timeout <= jiffies_64) /* OK to retry */
+                        return (route);
+        }
+        
+        return (NULL);
 }
 
-ksock_conn_t *
-ksocknal_send_target (ptl_nid_t nid) 
+ksock_route_t *
+ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
 {
-        ptl_nid_t     gatewaynid;
-        ksock_conn_t *conn;
-        int           rc;
+        struct list_head  *tmp;
+        ksock_route_t     *route;
 
-        if ((conn = ksocknal_get_conn (nid)) == NULL) {
-                /* It's not a peer; try to find a gateway */
-                rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, &gatewaynid);
-                if (rc != 0) {
-                        CERROR("Can't route to "LPX64": router error %d\n",
-                               nid, rc);
-                        return (NULL);
-                }
+        list_for_each (tmp, &peer->ksnp_routes) {
+                route = list_entry (tmp, ksock_route_t, ksnr_list);
+                
+                if (route->ksnr_connecting)
+                        return (route);
+        }
+        
+        return (NULL);
+}
 
-                if ((conn = ksocknal_get_conn (gatewaynid)) == NULL) {
-                        CERROR ("Can't route to "LPX64": gateway "LPX64
-                                " is not a peer\n", nid, gatewaynid);
-                        return (NULL);
+int
+ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
+{
+        unsigned long     flags;
+        ksock_peer_t     *peer;
+        ksock_conn_t     *conn;
+        ksock_route_t    *route;
+        rwlock_t         *g_lock;
+        
+        /* Ensure the frags we've been given EXACTLY match the number of
+         * bytes we want to send.  Many TCP/IP stacks disregard any total
+         * size parameters passed to them and just look at the frags. 
+         *
+         * We always expect at least 1 mapped fragment containing the
+         * complete portals header. */
+        LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
+                 lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
+        LASSERT (tx->tx_niov >= 1);
+        LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
+
+        CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
+                tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
+                tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
+
+        tx->tx_conn = NULL;                     /* only set when assigned a conn */
+
+        g_lock = &ksocknal_data.ksnd_global_lock;
+        read_lock (g_lock);
+        
+        peer = ksocknal_find_target_peer_locked (tx, nid);
+        if (peer == NULL) {
+                read_unlock (g_lock);
+                return (PTL_FAIL);
+        }
+
+        /* Any routes need to be connected? (need write lock if so) */
+        if (ksocknal_find_connectable_route_locked (peer) == NULL) {
+                conn = ksocknal_find_conn_locked (tx, peer);
+                if (conn != NULL) {
+                        ksocknal_queue_tx_locked (tx, conn);
+                        read_unlock (g_lock);
+                        return (PTL_OK);
                 }
         }
+        
+        /* need a write lock now to change peer state... */
 
-        return (conn);
+        atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
+        read_unlock (g_lock);
+        write_lock_irqsave (g_lock, flags);
+        
+        if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
+                write_unlock_irqrestore (g_lock, flags);
+                ksocknal_put_peer (peer);
+                return (PTL_FAIL);
+        }
+        ksocknal_put_peer (peer);               /* drop ref I got above */
+
+        /* I may launch autoconnects, now we're write locked... */
+        while ((route = ksocknal_find_connectable_route_locked (peer)) != NULL)
+                ksocknal_launch_autoconnect_locked (route);
+
+        conn = ksocknal_find_conn_locked (tx, peer);
+        if (conn != NULL) {
+                ksocknal_queue_tx_locked (tx, conn);
+                write_unlock_irqrestore (g_lock, flags);
+                return (PTL_OK);
+        }
+                
+        if (ksocknal_find_connecting_route_locked (peer) == NULL) {
+                /* no routes actually connecting now */
+                write_unlock_irqrestore (g_lock, flags);
+                return (PTL_FAIL);
+        }
+
+        list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
+
+        write_unlock_irqrestore (g_lock, flags);
+        return (PTL_OK);
 }
 
 ksock_ltx_t *
@@ -746,32 +997,19 @@ ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
                size_t payload_len)
 {
         ksock_ltx_t  *ltx;
-        ksock_conn_t *conn;
+        int           rc;
 
         /* NB 'private' is different depending on what we're sending.
          * Just ignore it until we can rely on it
-         *
-         * Also, the return code from this procedure is ignored.
-         * If we can't send, we must still complete with lib_finalize().
-         * We'll have to wait for 3.2 to return an error event.
          */
 
         CDEBUG(D_NET,
                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64
                " pid %d\n", payload_len, payload_niov, nid, pid);
 
-        conn = ksocknal_send_target (nid);
-        if (conn == NULL) {
-                lib_finalize (&ksocknal_lib, private, cookie);
-                return (-1);
-        }
-
         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
-        if (ltx == NULL) {
-                ksocknal_put_conn (conn);
-                lib_finalize (&ksocknal_lib, private, cookie);
-                return (-1);
-        }
+        if (ltx == NULL)
+                return (PTL_FAIL);
 
         /* append the payload_iovs to the one pointing at the header */
         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
@@ -782,8 +1020,11 @@ ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
         ltx->ltx_tx.tx_niov = 1 + payload_niov;
         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
 
-        ksocknal_launch_packet (conn, &ltx->ltx_tx);
-        return (0);
+        rc = ksocknal_launch_packet (&ltx->ltx_tx, nid);
+        if (rc != PTL_OK)
+                ksocknal_put_ltx (ltx);
+        
+        return (rc);
 }
 
 int
@@ -792,8 +1033,8 @@ ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie,
                      unsigned int payload_niov, ptl_kiov_t *payload_iov, size_t payload_len)
 {
         ksock_ltx_t *ltx;
-        ksock_conn_t *conn;
-        
+        int          rc;
+
         /* NB 'private' is different depending on what we're sending.
          * Just ignore it until we can rely on it */
 
@@ -801,15 +1042,9 @@ ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie,
                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64" pid %d\n",
                payload_len, payload_niov, nid, pid);
 
-        conn = ksocknal_send_target (nid);
-        if (conn == NULL)
-                return (-1);
-
         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
-        if (ltx == NULL) {
-                ksocknal_put_conn (conn);
-                return (-1);
-        }
+        if (ltx == NULL)
+                return (PTL_FAIL);
 
         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
@@ -820,17 +1055,20 @@ ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie,
         ltx->ltx_tx.tx_nkiov = payload_niov;
         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
 
-        ksocknal_launch_packet (conn, &ltx->ltx_tx);
-        return (0);
+        rc = ksocknal_launch_packet (&ltx->ltx_tx, nid);
+        if (rc != PTL_OK) 
+                ksocknal_put_ltx (ltx);
+                
+        return (rc);
 }
 
 void
 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
-        ksock_conn_t *conn;
         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
-
+        int           rc;
+        
         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
 
@@ -838,23 +1076,19 @@ ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (nid == ksocknal_lib.ni.nid)
                 nid = fwd->kprfd_target_nid;
 
-        conn = ksocknal_get_conn (nid);
-        if (conn == NULL) {
-                CERROR ("[%p] fwd to "LPX64" isn't a peer\n", fwd, nid);
-                kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, -EHOSTUNREACH);
-                return;
-        }
-
-        /* This forward has now got a ref on conn */
-
         tx->tx_isfwd = 1;                   /* This is a forwarding packet */
         tx->tx_nob   = fwd->kprfd_nob;
         tx->tx_niov  = fwd->kprfd_niov;
         tx->tx_iov   = fwd->kprfd_iov;
         tx->tx_nkiov = 0;
         tx->tx_kiov  = NULL;
-        
-        ksocknal_launch_packet (conn, tx);
+        tx->tx_hdr   = (ptl_hdr_t *)fwd->kprfd_iov[0].iov_base;
+
+        rc = ksocknal_launch_packet (tx, nid);
+        if (rc != 0) {
+                /* FIXME, could pass a better completion error */
+                kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, -EHOSTUNREACH);
+        }
 }
 
 int
@@ -883,7 +1117,7 @@ ksocknal_fmb_callback (void *arg, int error)
         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
         ksock_conn_t      *conn = NULL;
         ksock_sched_t     *sched;
-        long               flags;
+        unsigned long      flags;
 
         if (error != 0)
                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
@@ -905,6 +1139,9 @@ ksocknal_fmb_callback (void *arg, int error)
 
         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
 
+        /* drop peer ref taken on init */
+        ksocknal_put_peer (fmb->fmb_peer);
+        
         if (conn == NULL)
                 return;
 
@@ -931,7 +1168,7 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn)
 {
         int               payload_nob = conn->ksnc_rx_nob_left;
         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
-        long              flags;
+        unsigned long     flags;
         ksock_fmb_pool_t *pool;
         ksock_fmb_t      *fmb;
 
@@ -965,7 +1202,6 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn)
         return (NULL);
 }
 
-
 int
 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
 {
@@ -983,22 +1219,26 @@ ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
 
         /* Got a forwarding buffer; copy the header we just read into the
-         * forwarding buffer.  If there's payload start reading reading it
+         * forwarding buffer.  If there's payload, start reading reading it
          * into the buffer, otherwise the forwarding buffer can be kicked
          * off immediately.
          *
          * NB fmb->fmb_iov spans the WHOLE packet.
          *    conn->ksnc_rx_iov spans just the payload.
          */
-
         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
 
         /* copy header */
         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
 
-        if (payload_nob == 0) {         /* got complete packet already */
-                atomic_inc (&ksocknal_packets_received);
+        /* Take a ref on the conn's peer to prevent module unload before
+         * forwarding completes.  NB we ref peer and not conn since because
+         * all refs on conn after it has been closed must remove themselves
+         * in finite time */
+        fmb->fmb_peer = conn->ksnc_peer;
+        atomic_inc (&conn->ksnc_peer->ksnp_refcount);
 
+        if (payload_nob == 0) {         /* got complete packet already */
                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
                         dest_nid, packet_nob);
@@ -1037,11 +1277,10 @@ ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
                       packet_nob, niov, fmb->fmb_iov,
                       ksocknal_fmb_callback, fmb);
 
-        /* stash router's descriptor ready for call to kpr_fwd_start */
-        conn->ksnc_cookie = &fmb->fmb_fwd;
-
+        conn->ksnc_cookie = fmb;                /* stash fmb for later */
         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
-
+        conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout; /* start timeout */
+        
         /* payload is desc's iov-ed buffer, but skipping the hdr */
         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
                  sizeof (struct iovec));
@@ -1067,7 +1306,7 @@ ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
 void
 ksocknal_fwd_parse (ksock_conn_t *conn)
 {
-        ksock_conn_t *conn2;
+        ksock_peer_t *peer;
         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
         int           body_len = NTOH__u32 (PTL_HDR_LENGTH(&conn->ksnc_hdr));
 
@@ -1082,7 +1321,9 @@ ksocknal_fwd_parse (ksock_conn_t *conn)
                 CERROR("dropping packet from "LPX64" for "LPX64": packet "
                        "size %d illegal\n", NTOH__u64 (conn->ksnc_hdr.src_nid),
                        dest_nid, body_len);
-                ksocknal_new_packet (conn, 0);          /* on to new packet */
+
+                ksocknal_new_packet (conn, 0);  /* on to new packet */
+                ksocknal_close_conn_unlocked (conn); /* give up on conn */
                 return;
         }
 
@@ -1105,12 +1346,12 @@ ksocknal_fwd_parse (ksock_conn_t *conn)
         }
 
         /* should have gone direct */
-        conn2 = ksocknal_get_conn (conn->ksnc_hdr.dest_nid);
-        if (conn2 != NULL) {
+        peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
+        if (peer != NULL) {
                 CERROR ("dropping packet from "LPX64" for "LPX64
                         ": target is a peer\n", conn->ksnc_hdr.src_nid,
                         conn->ksnc_hdr.dest_nid);
-                ksocknal_put_conn (conn2);  /* drop ref from get above */
+                ksocknal_put_peer (peer);  /* drop ref from get above */
 
                 /* on to next packet (skip this one's body) */
                 ksocknal_new_packet (conn, body_len);
@@ -1175,7 +1416,7 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
 }
 
 void
-ksocknal_process_receive (ksock_sched_t *sched, long *irq_flags)
+ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
 {
         ksock_conn_t *conn;
         ksock_fmb_t  *fmb;
@@ -1222,11 +1463,18 @@ ksocknal_process_receive (ksock_sched_t *sched, long *irq_flags)
 
         rc = ksocknal_recvmsg(conn);
 
-        if (rc == 0)
-                goto out;
-        if (rc < 0) {
-#warning FIXME: handle socket errors properly
-                CERROR ("Error socknal read %p: %d\n", conn, rc);
+        if (rc <= 0) {
+                if (ksocknal_close_conn_unlocked (conn)) {
+                        /* I'm the first to close */
+                        if (rc < 0)
+                                CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
+                                        conn, rc, conn->ksnc_peer->ksnp_nid,
+                                        conn->ksnc_ipaddr, conn->ksnc_port);
+                        else
+                                CERROR ("[%p] EOF from "LPX64" ip %08x:%d\n",
+                                        conn, conn->ksnc_peer->ksnp_nid,
+                                        conn->ksnc_ipaddr, conn->ksnc_port);
+                }
                 goto out;
         }
 
@@ -1238,9 +1486,9 @@ ksocknal_process_receive (ksock_sched_t *sched, long *irq_flags)
 
         switch (conn->ksnc_rx_state) {
         case SOCKNAL_RX_HEADER:
-                /* It's not for me */
-                if (conn->ksnc_hdr.type != PTL_MSG_HELLO &&
+                if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
                     NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
+                        /* This packet isn't for me */
                         ksocknal_fwd_parse (conn);
                         switch (conn->ksnc_rx_state) {
                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
@@ -1255,10 +1503,11 @@ ksocknal_process_receive (ksock_sched_t *sched, long *irq_flags)
                         /* Not Reached */
                 }
 
-                PROF_START(lib_parse);
                 /* sets wanted_len, iovs etc */
                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
-                PROF_FINISH(lib_parse);
+
+                /* start timeout (lib is waiting for finalize) */
+                conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout;
 
                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
@@ -1267,8 +1516,8 @@ ksocknal_process_receive (ksock_sched_t *sched, long *irq_flags)
                 /* Fall through (completed packet for me) */
 
         case SOCKNAL_RX_BODY:
-                atomic_inc (&ksocknal_packets_received);
-                /* packet is done now */
+                /* payload all received */
+                conn->ksnc_rx_deadline = 0;     /* cancel timeout */
                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
                 /* Fall through */
 
@@ -1279,16 +1528,19 @@ ksocknal_process_receive (ksock_sched_t *sched, long *irq_flags)
                 goto try_read;          /* try to finish reading slop now */
 
         case SOCKNAL_RX_BODY_FWD:
+                /* payload all received */
                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
                         conn->ksnc_rx_nob_left);
 
-                atomic_inc (&ksocknal_packets_received);
+                /* cancel timeout (only needed it while fmb allocated) */
+                conn->ksnc_rx_deadline = 0;
 
-                /* ksocknal_init_fmb() put router desc. in conn->ksnc_cookie */
-                kpr_fwd_start (&ksocknal_data.ksnd_router,
-                               (kpr_fwd_desc_t *)conn->ksnc_cookie);
+                /* forward the packet. NB ksocknal_init_fmb() put fmb into
+                 * conn->ksnc_cookie */
+                fmb = (ksock_fmb_t *)conn->ksnc_cookie;
+                kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
 
                 /* no slop in forwarded packets */
                 LASSERT (conn->ksnc_rx_nob_left == 0);
@@ -1297,6 +1549,7 @@ ksocknal_process_receive (ksock_sched_t *sched, long *irq_flags)
                 goto out;                       /* (later) */
 
         default:
+                break;
         }
 
         /* Not Reached */
@@ -1309,9 +1562,12 @@ ksocknal_process_receive (ksock_sched_t *sched, long *irq_flags)
         if (!conn->ksnc_rx_ready) {
                 /* let socket callback schedule again */
                 conn->ksnc_rx_scheduled = 0;
-                ksocknal_put_conn (conn);       /* release scheduler's ref */
-        } else                              /* let scheduler call me again */
+                /* drop scheduler's ref */
+                ksocknal_put_conn (conn);   
+        } else {
+                /* stay scheduled */
                 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
+        }
 }
 
 int
@@ -1374,21 +1630,17 @@ int ksocknal_scheduler (void *arg)
         int                nloops = 0;
         int                id = sched - ksocknal_data.ksnd_schedulers;
         char               name[16];
-#if (CONFIG_SMP && CPU_AFFINITY)
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        int                cpu = cpu_logical_map(id % num_online_cpus());
-#else
-#warning "Take care of architecure specific logical APIC map"
-        int cpu = 1;    /* Have to change later. */
-#endif /* LINUX_VERSION_CODE */
-        
-        set_cpus_allowed (current, 1 << cpu);
-        id = cpu;
-#endif /* CONFIG_SMP && CPU_AFFINITY */
 
         snprintf (name, sizeof (name),"ksocknald[%d]", id);
         kportal_daemonize (name);
         kportal_blockallsigs ();
+
+#if (CONFIG_SMP && CPU_AFFINITY)
+        if ((cpu_online_map & (1 << id)) != 0)
+                current->cpus_allowed = (1 << id);
+        else
+                CERROR ("Can't set CPU affinity for %s\n", name);
+#endif /* CONFIG_SMP && CPU_AFFINITY */
         
         spin_lock_irqsave (&sched->kss_lock, flags);
 
@@ -1418,7 +1670,7 @@ int ksocknal_scheduler (void *arg)
                         list_del (&tx->tx_list);
                         spin_unlock_irqrestore (&sched->kss_lock, flags);
 
-                        ksocknal_tx_done (tx);
+                        ksocknal_tx_done (tx, 1);
 
                         spin_lock_irqsave (&sched->kss_lock, flags);
                 }
@@ -1464,12 +1716,12 @@ ksocknal_data_ready (struct sock *sk, int n)
         ENTRY;
 
         /* interleave correctly with closing sockets... */
-        read_lock (&ksocknal_data.ksnd_socklist_lock);
+        read_lock (&ksocknal_data.ksnd_global_lock);
 
-        conn = sk->user_data;
+        conn = sk->sk_user_data;
         if (conn == NULL) {             /* raced with ksocknal_close_sock */
-                LASSERT (sk->data_ready != &ksocknal_data_ready);
-                sk->data_ready (sk, n);
+                LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
+                sk->sk_data_ready (sk, n);
         } else if (!conn->ksnc_rx_ready) {        /* new news */
                 /* Set ASAP in case of concurrent calls to me */
                 conn->ksnc_rx_ready = 1;
@@ -1495,7 +1747,7 @@ ksocknal_data_ready (struct sock *sk, int n)
                 spin_unlock_irqrestore (&sched->kss_lock, flags);
         }
 
-        read_unlock (&ksocknal_data.ksnd_socklist_lock);
+        read_unlock (&ksocknal_data.ksnd_global_lock);
 
         EXIT;
 }
@@ -1508,13 +1760,13 @@ ksocknal_write_space (struct sock *sk)
         ksock_sched_t *sched;
 
         /* interleave correctly with closing sockets... */
-        read_lock (&ksocknal_data.ksnd_socklist_lock);
+        read_lock (&ksocknal_data.ksnd_global_lock);
 
-        conn = sk->user_data;
+        conn = sk->sk_user_data;
 
         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
-               (conn == NULL) ? "" : (test_bit (0, &conn->ksnc_tx_ready) ?
+               (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
                                       " ready" : " blocked"),
                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
                                       " scheduled" : " idle"),
@@ -1522,10 +1774,10 @@ ksocknal_write_space (struct sock *sk)
                                       " empty" : " queued"));
 
         if (conn == NULL) {             /* raced with ksocknal_close_sock */
-                LASSERT (sk->write_space != &ksocknal_write_space);
-                sk->write_space (sk);
+                LASSERT (sk->sk_write_space != &ksocknal_write_space);
+                sk->sk_write_space (sk);
         } else if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
-                clear_bit (SOCK_NOSPACE, &sk->socket->flags);
+                clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
 
                 if (!conn->ksnc_tx_ready) {      /* new news */
                         /* Set ASAP in case of concurrent calls to me */
@@ -1555,42 +1807,587 @@ ksocknal_write_space (struct sock *sk)
                 }
         }
 
-        read_unlock (&ksocknal_data.ksnd_socklist_lock);
+        read_unlock (&ksocknal_data.ksnd_global_lock);
+}
+
+int
+ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
+{
+        int           rc;
+        mm_segment_t  oldmm = get_fs();
+
+        while (nob > 0) {
+                struct iovec  iov = {
+                        .iov_base = buffer,
+                        .iov_len  = nob
+                };
+                struct msghdr msg = {
+                        .msg_name       = NULL,
+                        .msg_namelen    = 0,
+                        .msg_iov        = &iov,
+                        .msg_iovlen     = 1,
+                        .msg_control    = NULL,
+                        .msg_controllen = 0,
+                        .msg_flags      = 0
+                };
+
+                set_fs (KERNEL_DS);
+                rc = sock_sendmsg (sock, &msg, iov.iov_len);
+                set_fs (oldmm);
+                
+                if (rc < 0)
+                        return (rc);
+
+                if (rc == 0) {
+                        CERROR ("Unexpected zero rc\n");
+                        return (-ECONNABORTED);
+                }
+
+                buffer = ((char *)buffer) + rc;
+                nob -= rc;
+        }
+        
+        return (0);
+}
+
+int
+ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
+{
+        int           rc;
+        mm_segment_t  oldmm = get_fs();
+        
+        while (nob > 0) {
+                struct iovec  iov = {
+                        .iov_base = buffer,
+                        .iov_len  = nob
+                };
+                struct msghdr msg = {
+                        .msg_name       = NULL,
+                        .msg_namelen    = 0,
+                        .msg_iov        = &iov,
+                        .msg_iovlen     = 1,
+                        .msg_control    = NULL,
+                        .msg_controllen = 0,
+                        .msg_flags      = 0
+                };
+
+                set_fs (KERNEL_DS);
+                rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
+                set_fs (oldmm);
+                
+                if (rc < 0)
+                        return (rc);
+
+                if (rc == 0)
+                        return (-ECONNABORTED);
+
+                buffer = ((char *)buffer) + rc;
+                nob -= rc;
+        }
+        
+        return (0);
+}
+
+int
+ksocknal_exchange_nids (struct socket *sock, ptl_nid_t nid)
+{
+        int                 rc;
+        ptl_hdr_t           hdr;
+        ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
+
+        LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
+
+        memset (&hdr, 0, sizeof (hdr));
+        hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
+        hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
+        hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
+        
+        hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
+        hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
+        
+        /* Assume sufficient socket buffering for this message */
+        rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
+        if (rc != 0) {
+                CERROR ("Error %d sending HELLO to "LPX64"\n", rc, nid);
+                return (rc);
+        }
+
+        rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
+        if (rc != 0) {
+                CERROR ("Error %d reading HELLO from "LPX64"\n", rc, nid);
+                return (rc);
+        }
+        
+        if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
+                CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
+                        __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, nid);
+                return (-EINVAL);
+        }
+
+        if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
+            hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
+                CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
+                        " from "LPX64"\n",
+                        __le16_to_cpu (hmv->version_major),
+                        __le16_to_cpu (hmv->version_minor),
+                        PORTALS_PROTO_VERSION_MAJOR,
+                        PORTALS_PROTO_VERSION_MINOR,
+                        nid);
+                return (-EINVAL);
+        }
+
+        LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
+        /* version 0 sends magic/version as the dest_nid of a 'hello' header,
+         * so read the rest of it in now... */
+
+        rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
+        if (rc != 0) {
+                CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
+                        rc, nid);
+                return (rc);
+        }
+
+        /* ...and check we got what we expected */
+        if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
+            PTL_HDR_LENGTH (&hdr) != __cpu_to_le32 (0)) {
+                CERROR ("Expecting a HELLO hdr with 0 payload,"
+                        " but got type %d with %d payload from "LPX64"\n",
+                        __le32_to_cpu (hdr.type),
+                        __le32_to_cpu (PTL_HDR_LENGTH (&hdr)), nid);
+                return (-EINVAL);
+        }
+        
+        if (__le64_to_cpu (hdr.src_nid) != nid) {
+                CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
+                        __le64_to_cpu (hdr.src_nid), nid);
+                return (-EINVAL);
+        }
+
+        return (0);
+}
+
+int
+ksocknal_set_linger (struct socket *sock) 
+{
+        mm_segment_t    oldmm = get_fs ();
+        int             rc;
+        int             option;
+        struct linger   linger;
+
+        /* Ensure this socket aborts active sends immediately when we close
+         * it. */
+        
+        linger.l_onoff = 0;
+        linger.l_linger = 0;
+
+        set_fs (KERNEL_DS);
+        rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER, 
+                              (char *)&linger, sizeof (linger));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set SO_LINGER: %d\n", rc);
+                return (rc);
+        }
+        
+        option = -1;
+        set_fs (KERNEL_DS);
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
+                                    (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set SO_LINGER2: %d\n", rc);
+                return (rc);
+        }
+        
+        return (0);
+}
+
+int
+ksocknal_connect_peer (ksock_route_t *route)
+{
+        struct sockaddr_in  peer_addr;
+        mm_segment_t        oldmm = get_fs();
+        __u64               n;
+        struct timeval      tv;
+        int                 fd;
+        struct socket      *sock;
+        int                 rc;
+
+        rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
+        if (rc != 0) {
+                CERROR ("Can't create autoconnect socket: %d\n", rc);
+                return (rc);
+        }
+
+        /* Ugh; have to map_fd for compatibility with sockets passed in
+         * from userspace.  And we actually need the refcounting that
+         * this gives you :) */
+
+        fd = sock_map_fd (sock);
+        if (fd < 0) {
+                sock_release (sock);
+                CERROR ("sock_map_fd error %d\n", fd);
+                return (fd);
+        }
+        
+        /* Set the socket timeouts, so our connection attempt completes in
+         * finite time */
+        tv.tv_sec = ksocknal_io_timeout / HZ;
+        n = ksocknal_io_timeout % HZ;
+        n = n * 1000000 + HZ - 1;
+        do_div (n, HZ);
+        tv.tv_usec = n;
+
+        set_fs (KERNEL_DS);
+        rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
+                              (char *)&tv, sizeof (tv));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set send timeout %d (in HZ): %d\n", 
+                        ksocknal_io_timeout, rc);
+                goto failed;
+        }
+        
+        set_fs (KERNEL_DS);
+        rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
+                              (char *)&tv, sizeof (tv));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set receive timeout %d (in HZ): %d\n",
+                        ksocknal_io_timeout, rc);
+                goto failed;
+        }
+
+        if (route->ksnr_nonagel) {
+                int  option = 1;
+                
+                set_fs (KERNEL_DS);
+                rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
+                                            (char *)&option, sizeof (option));
+                set_fs (oldmm);
+                if (rc != 0) {
+                        CERROR ("Can't disable nagel: %d\n", rc);
+                        goto failed;
+                }
+        }
+        
+        if (route->ksnr_buffer_size != 0) {
+                int option = route->ksnr_buffer_size;
+                
+                set_fs (KERNEL_DS);
+                rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
+                                      (char *)&option, sizeof (option));
+                set_fs (oldmm);
+                if (rc != 0) {
+                        CERROR ("Can't set send buffer %d: %d\n",
+                                route->ksnr_buffer_size, rc);
+                        goto failed;
+                }
+
+                set_fs (KERNEL_DS);
+                rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
+                                      (char *)&option, sizeof (option));
+                set_fs (oldmm);
+                if (rc != 0) {
+                        CERROR ("Can't set receive buffer %d: %d\n",
+                                route->ksnr_buffer_size, rc);
+                        goto failed;
+                }
+        }
+        
+        memset (&peer_addr, 0, sizeof (peer_addr));
+        peer_addr.sin_family = AF_INET;
+        peer_addr.sin_port = htons (route->ksnr_port);
+        peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
+        
+        rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
+                                 sizeof (peer_addr), sock->file->f_flags);
+        if (rc != 0) {
+                CERROR ("Error %d connecting to "LPX64"\n", rc,
+                        route->ksnr_peer->ksnp_nid);
+                goto failed;
+        }
+        
+        if (route->ksnr_xchange_nids) {
+                rc = ksocknal_exchange_nids (sock, route->ksnr_peer->ksnp_nid);
+                if (rc != 0)
+                        goto failed;
+        }
+
+        rc = ksocknal_create_conn (route->ksnr_peer->ksnp_nid,
+                                   route, sock, route->ksnr_irq_affinity);
+        if (rc == 0)
+                return (0);
+
+ failed:
+        fput (sock->file);
+        return (rc);
+}
+
+void
+ksocknal_autoconnect (ksock_route_t *route)
+{
+        LIST_HEAD        (zombies);
+        ksock_tx_t       *tx;
+        ksock_peer_t     *peer;
+        unsigned long     flags;
+        int               rc;
+        
+        rc = ksocknal_connect_peer (route);
+        if (rc == 0) {
+                /* successfully autoconnected: create_conn did the
+                 * route/conn binding and scheduled any blocked packets, 
+                 * so there's nothing left to do now. */
+                return;
+        }
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        peer = route->ksnr_peer;
+        route->ksnr_connecting = 0;
+
+        LASSERT (route->ksnr_retry_interval != 0);
+        route->ksnr_timeout = jiffies_64 + route->ksnr_retry_interval;
+        route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
+                                          SOCKNAL_MAX_RECONNECT_INTERVAL);
+
+        if (!list_empty (&peer->ksnp_tx_queue) &&
+            ksocknal_find_connecting_route_locked (peer) == NULL) {
+                LASSERT (list_empty (&peer->ksnp_conns));
+
+                /* None of the connections that the blocked packets are
+                 * waiting for have been successful.  Complete them now... */
+                do {
+                        tx = list_entry (peer->ksnp_tx_queue.next,
+                                         ksock_tx_t, tx_list);
+                        list_del (&tx->tx_list);
+                        list_add_tail (&tx->tx_list, &zombies);
+                } while (!list_empty (&peer->ksnp_tx_queue));
+        }
+
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        while (!list_empty (&zombies)) {
+                tx = list_entry (zombies.next, ksock_tx_t, tx_list);
+                
+                CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
+                        NTOH__u32 (tx->tx_hdr->type),
+                        NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
+                        NTOH__u64 (tx->tx_hdr->src_nid),
+                        NTOH__u64 (tx->tx_hdr->dest_nid));
+
+                list_del (&tx->tx_list);
+                /* complete now */
+                ksocknal_tx_done (tx, 0);
+        }
+}
+
+int
+ksocknal_autoconnectd (void *arg)
+{
+        long               id = (long)arg;
+        char               name[16];
+        unsigned long      flags;
+        ksock_route_t     *route;
+        int                rc;
+
+        snprintf (name, sizeof (name), "ksocknal_ad[%ld]", id);
+        kportal_daemonize (name);
+        kportal_blockallsigs ();
+
+        spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
+
+        while (!ksocknal_data.ksnd_shuttingdown) {
+
+                if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
+                        route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
+                                            ksock_route_t, ksnr_connect_list);
+                        
+                        list_del (&route->ksnr_connect_list);
+                        spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
+
+                        ksocknal_autoconnect (route);
+                        ksocknal_put_route (route);
+
+                        spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
+                        continue;
+                }
+                
+                spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
+
+                rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
+                                               ksocknal_data.ksnd_shuttingdown ||
+                                               !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
+
+                spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
+        }
+
+        spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
+
+        ksocknal_thread_fini ();
+        return (0);
+}
+
+ksock_conn_t *
+ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
+{
+        /* We're called with a shared lock on ksnd_global_lock */
+        unsigned long      flags;
+        ksock_conn_t      *conn;
+        struct list_head  *ctmp;
+        ksock_tx_t        *tx;
+        struct list_head  *ttmp;
+        ksock_sched_t     *sched;
+
+        list_for_each (ctmp, &peer->ksnp_conns) {
+                conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
+                sched = conn->ksnc_scheduler;
+                
+                if (conn->ksnc_rx_deadline != 0 &&
+                    conn->ksnc_rx_deadline <= jiffies_64)
+                        goto timed_out;
+
+                spin_lock_irqsave (&sched->kss_lock, flags);
+                
+                list_for_each (ttmp, &conn->ksnc_tx_queue) {
+                        tx = list_entry (ttmp, ksock_tx_t, tx_list);
+                        LASSERT (tx->tx_deadline != 0);
+                        
+                        if (tx->tx_deadline <= jiffies_64)
+                                goto timed_out_locked;
+                }
+#if SOCKNAL_ZC
+                list_for_each (ttmp, &conn->ksnc_tx_pending) {
+                        tx = list_entry (ttmp, ksock_tx_t, tx_list);
+                        LASSERT (tx->tx_deadline != 0);
+
+                        if (tx->tx_deadline <= jiffies_64)
+                                goto timed_out_locked;
+                }
+#endif                
+                spin_unlock_irqrestore (&sched->kss_lock, flags);
+                continue;
+
+        timed_out_locked:
+                spin_unlock_irqrestore (&sched->kss_lock, flags);
+        timed_out:
+                atomic_inc (&conn->ksnc_refcount);
+                return (conn);
+        }
+
+        return (NULL);
+}
+
+void
+ksocknal_check_peer_timeouts (struct list_head *peers)
+{
+        struct list_head *ptmp;
+        ksock_peer_t     *peer;
+        ksock_conn_t     *conn;
+
+ again:
+        /* NB. We expect to have a look at all the peers and not find any
+         * connections to time out, so we just use a shared lock while we
+         * take a look... */
+        read_lock (&ksocknal_data.ksnd_global_lock);
+
+        list_for_each (ptmp, peers) {
+                peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
+                conn = ksocknal_find_timed_out_conn (peer);
+                
+                if (conn != NULL) {
+                        read_unlock (&ksocknal_data.ksnd_global_lock);
+
+                        if (ksocknal_close_conn_unlocked (conn)) {
+                                /* I actually closed... */
+                                CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
+                                        peer->ksnp_nid, conn->ksnc_ipaddr,
+                                        conn->ksnc_port);
+                        }
+                
+                        /* NB we won't find this one again, but we can't
+                         * just proceed with the next peer, since we dropped
+                         * ksnd_global_lock and it might be dead already! */
+                        ksocknal_put_conn (conn);
+                        goto again;
+                }
+        }
+
+        read_unlock (&ksocknal_data.ksnd_global_lock);
 }
 
 int
 ksocknal_reaper (void *arg)
 {
+        wait_queue_t       wait;
         unsigned long      flags;
         ksock_conn_t      *conn;
-        int                rc;
+        int                timeout;
+        int                peer_index = 0;
+        __u64              deadline = jiffies_64;
         
         kportal_daemonize ("ksocknal_reaper");
         kportal_blockallsigs ();
 
+        init_waitqueue_entry (&wait, current);
+
+        spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
+
         while (!ksocknal_data.ksnd_shuttingdown) {
-                spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
 
-                if (list_empty (&ksocknal_data.ksnd_reaper_list)) {
-                        conn = NULL;
-                } else {
-                        conn = list_entry (ksocknal_data.ksnd_reaper_list.next,
+                if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
+                        conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
                                            ksock_conn_t, ksnc_list);
                         list_del (&conn->ksnc_list);
+                        
+                        spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
+
+                        ksocknal_terminate_conn (conn);
+                        ksocknal_put_conn (conn);
+
+                        spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
+                        continue;
                 }
 
+                if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
+                        conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
+                                           ksock_conn_t, ksnc_list);
+                        list_del (&conn->ksnc_list);
+                        
+                        spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
+
+                        ksocknal_destroy_conn (conn);
+
+                        spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
+                        continue;
+                }
+                
                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
 
-                if (conn != NULL)
-                        ksocknal_close_conn (conn);
-                else {
-                        rc = wait_event_interruptible (ksocknal_data.ksnd_reaper_waitq,
-                                                       ksocknal_data.ksnd_shuttingdown ||
-                                                       !list_empty(&ksocknal_data.ksnd_reaper_list));
-                        LASSERT (rc == 0);
+                while ((timeout = deadline - jiffies_64) <= 0) {
+                        /* Time to check for timeouts on a few more peers */
+                        ksocknal_check_peer_timeouts (&ksocknal_data.ksnd_peers[peer_index]);
+
+                        peer_index = (peer_index + 1) % SOCKNAL_PEER_HASH_SIZE;
+                        deadline += HZ;
                 }
+
+                add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
+                set_current_state (TASK_INTERRUPTIBLE);
+
+                if (!ksocknal_data.ksnd_shuttingdown &&
+                    list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
+                    list_empty (&ksocknal_data.ksnd_zombie_conns))
+                        schedule_timeout (timeout);
+
+                set_current_state (TASK_RUNNING);
+                remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
+
+                spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
         }
 
+        spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
+
         ksocknal_thread_fini ();
         return (0);
 }
index 02f8b60..6915885 100644 (file)
@@ -560,7 +560,6 @@ get_new_msg (nal_cb_t *nal, lib_md_t *md)
         return (msg);
 }
 
-
 /*
  * Incoming messages have a ptl_msg_t object associated with them
  * by the library.  This object encapsulates the state of the
@@ -756,9 +755,13 @@ static int parse_get(nal_cb_t * nal, ptl_hdr_t * hdr, void *private)
 
         rc = lib_send (nal, private, msg, &reply, PTL_MSG_REPLY, 
                        hdr->src_nid, hdr->src_pid, md, offset, mlength);
-        if (rc != 0) {
+        if (rc != PTL_OK) {
                 CERROR(LPU64": Dropping GET from "LPU64": send REPLY failed\n",
                        ni->nid, hdr->src_nid);
+                /* Hmm, this will create a GET event and make believe
+                 * the reply completed, which it kind of did, only the
+                 * source won't get her reply */
+                lib_finalize (nal, private, msg);
                 state_lock (nal, &flags);
                 goto drop;
         }
@@ -1099,7 +1102,8 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
         lib_msg_t *msg = NULL;
         ptl_process_id_t *id = &args->target_in;
         unsigned long flags;
-
+        int           rc;
+        
         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
             fail_peer (nal, id->nid, 1))           /* shall we now? */
         {
@@ -1177,9 +1181,15 @@ int do_PtlPut(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
 
         state_unlock(nal, &flags);
         
-        lib_send (nal, private, msg, &hdr, PTL_MSG_PUT,
-                  id->nid, id->pid, md, 0, md->length);
-
+        rc = lib_send (nal, private, msg, &hdr, PTL_MSG_PUT,
+                       id->nid, id->pid, md, 0, md->length);
+        if (rc != PTL_OK) {
+                /* get_new_msg() committed us to sending by decrementing
+                 * md->threshold, so we have to act like we did send, but
+                 * the network dropped it. */
+                lib_finalize (nal, private, msg);
+        }
+        
         return ret->rc = PTL_OK;
 }
 
@@ -1206,7 +1216,8 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
         ptl_process_id_t *id = &args->target_in;
         lib_md_t *md;
         unsigned long flags;
-
+        int           rc;
+        
         if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
             fail_peer (nal, id->nid, 1))           /* shall we now? */
         {
@@ -1280,9 +1291,15 @@ int do_PtlGet(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
 
         state_unlock(nal, &flags);
 
-        lib_send (nal, private, msg, &hdr, PTL_MSG_GET,
-                  id->nid, id->pid, NULL, 0, 0);
-
+        rc = lib_send (nal, private, msg, &hdr, PTL_MSG_GET,
+                       id->nid, id->pid, NULL, 0, 0);
+        if (rc != PTL_OK) {
+                /* get_new_msg() committed us to sending by decrementing
+                 * md->threshold, so we have to act like we did send, but
+                 * the network dropped it. */
+                lib_finalize (nal, private, msg);
+        }
+        
         return ret->rc = PTL_OK;
 }
 
index f10892c..9363251 100644 (file)
@@ -67,6 +67,7 @@ int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
 
                 rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
                                msg->nid, msg->pid, NULL, 0, 0);
+                /* If this send fails, there's nothing else to clean up */
         }
 
         md = msg->md;
index a89f4f7..6a9030c 100644 (file)
@@ -33,6 +33,7 @@
 #include <errno.h>
 #include <unistd.h>
 #include <time.h>
+#include <stdarg.h>
 #include <asm/byteorder.h>
 
 #include <portals/api-support.h>
@@ -46,9 +47,7 @@ unsigned int portal_printk;
 unsigned int portal_stack;
 
 
-static ptl_nid_t g_nid = 0;
 static unsigned int g_nal = 0;
-static unsigned short g_port = 0;
 
 static int g_socket_txmem = 0;
 static int g_socket_rxmem = 0;
@@ -129,21 +128,25 @@ ptl_gethostbyname(char * hname) {
 }
 
 int
-ptl_parse_nid (ptl_nid_t *nidp, char *str)
+ptl_parse_ipaddr (__u32 *ipaddrp, char *str)
 {
         struct hostent *he;
         int             a;
         int             b;
         int             c;
         int             d;
-        
+
+        if (!strcmp (str, "_all_")) 
+        {
+                *ipaddrp = 0;
+                return (0);
+        }
+
         if (sscanf (str, "%d.%d.%d.%d", &a, &b, &c, &d) == 4 &&
             (a & ~0xff) == 0 && (b & ~0xff) == 0 &&
             (c & ~0xff) == 0 && (d & ~0xff) == 0)
         {
-                __u32 addr = (a<<24)|(b<<16)|(c<<8)|d;
-
-                *nidp = (ptl_nid_t)addr;
+                *ipaddrp = (a<<24)|(b<<16)|(c<<8)|d;
                 return (0);
         }
         
@@ -153,19 +156,55 @@ ptl_parse_nid (ptl_nid_t *nidp, char *str)
         {
                 __u32 addr = *(__u32 *)he->h_addr;
 
-                *nidp = (ptl_nid_t)ntohl(addr);  /* HOST byte order */
+                *ipaddrp = ntohl(addr);         /* HOST byte order */
+                return (0);
+        }
+
+        return (-1);
+}
+
+char *
+ptl_ipaddr_2_str (__u32 ipaddr, char *str)
+{
+        __u32           net_ip;
+        struct hostent *he;
+        
+        net_ip = htonl (ipaddr);
+        he = gethostbyaddr (&net_ip, sizeof (net_ip), AF_INET);
+        if (he != NULL)
+                return (he->h_name);
+        
+        sprintf (str, "%d.%d.%d.%d",
+                 (ipaddr >> 24) & 0xff, (ipaddr >> 16) & 0xff,
+                 (ipaddr >> 8) & 0xff, ipaddr & 0xff);
+        return (str);
+}
+
+int
+ptl_parse_nid (ptl_nid_t *nidp, char *str)
+{
+        __u32 ipaddr;
+        long  lval;
+        
+        if (!strcmp (str, "_all_")) {
+                *nidp = PTL_NID_ANY;
                 return (0);
         }
 
-        if (sscanf (str, "%i", &a) == 1)
+        if (ptl_parse_ipaddr (&ipaddr, str) == 0) {
+                *nidp = (ptl_nid_t)ipaddr;
+                return (0);
+        }
+
+        if (sscanf (str, "%li", &lval) == 1)
         {
-                *nidp = (ptl_nid_t)a;
+                *nidp = (ptl_nid_t)lval;
                 return (0);
         }
 
-        if (sscanf (str, "%x", &a) == 1)
+        if (sscanf (str, "%lx", &lval) == 1)
         {
-                *nidp = (ptl_nid_t) a;
+                *nidp = (ptl_nid_t)lval;
                 return (0);
         }
 
@@ -186,6 +225,32 @@ ptl_nid2str (char *buffer, ptl_nid_t nid)
         return (buffer);
 }
 
+int g_nal_is_compatible (char *cmd, ...)
+{
+        va_list       ap;
+        int           nal;
+        
+        if (g_nal == 0) {
+                fprintf (stderr, "Error: you must run the 'network' command first.\n");
+                return (0);
+        }
+        
+        va_start (ap, cmd);
+
+        do {
+                nal = va_arg (ap, int);
+        } while (nal != 0 && nal != g_nal);
+        
+        va_end (ap);
+        
+        if (g_nal == nal)
+                return (1);
+        
+        fprintf (stderr, "Command %s not compatible with nal %s\n",
+                 cmd, nal2name (g_nal));
+        return (0);
+}
+
 int
 sock_write (int cfd, void *buffer, int nob)
 {
@@ -251,22 +316,231 @@ int ptl_initialize(int argc, char **argv)
 
 int jt_ptl_network(int argc, char **argv)
 {
-        int  nal;
+        name2num_t *entry;
+        int         nal;
         
-        if (argc != 2 ||
-            (nal = ptl_name2nal (argv[1])) == 0)
-        {
-                name2num_t *entry;
+        if (argc == 2 &&
+            (nal = ptl_name2nal (argv[1])) != 0) {
+                g_nal = nal;
+                return (0);
+        }
                 
-                fprintf(stderr, "usage: %s \n", argv[0]);
-                for (entry = nalnames; entry->name != NULL; entry++)
-                        fprintf (stderr, "%s%s", entry == nalnames ? "<" : "|", entry->name);
-                fprintf(stderr, ">\n");
+        fprintf(stderr, "usage: %s \n", argv[0]);
+        for (entry = nalnames; entry->name != NULL; entry++)
+                fprintf (stderr, "%s%s", entry == nalnames ? "<" : "|", entry->name);
+        fprintf(stderr, ">\n");
+        return (-1);
+}
+
+int 
+jt_ptl_print_autoconnects (int argc, char **argv)
+{
+        struct portal_ioctl_data data;
+        char                     buffer[64];
+        int                      index;
+        int                      rc;
+
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, 0))
+                return -1;
+
+        for (index = 0;;index++) {
+                PORTAL_IOC_INIT (data);
+                data.ioc_nal     = g_nal;
+                data.ioc_nal_cmd = NAL_CMD_GET_AUTOCONN;
+                data.ioc_count   = index;
+                
+                rc = l_ioctl (PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
+                if (rc != 0)
+                        break;
+
+                printf (LPX64"@%s:%d #%d buffer %d nonagle %s xchg %s affinity %s share %d\n",
+                        data.ioc_nid, ptl_ipaddr_2_str (data.ioc_id, buffer),
+                        data.ioc_misc, data.ioc_count, data.ioc_size, 
+                        (data.ioc_flags & 1) ? "on" : "off",
+                        (data.ioc_flags & 2) ? "on" : "off",
+                        (data.ioc_flags & 4) ? "on" : "off",
+                        data.ioc_wait);
         }
-        else
-                g_nal = nal;
 
-        return (0);
+        if (index == 0)
+                printf ("<no autoconnect routes>\n");
+        return 0;
+}
+
+int 
+jt_ptl_add_autoconnect (int argc, char **argv)
+{
+        struct portal_ioctl_data data;
+        ptl_nid_t                nid;
+        __u32                    ip;
+        int                      port;
+        int                      xchange_nids = 0;
+        int                      irq_affinity = 0;
+        int                      share = 0;
+        int                      rc;
+
+        if (argc < 4 || argc > 5) {
+                fprintf (stderr, "usage: %s nid ipaddr port [ixs]\n", argv[0]);
+                return 0;
+        }
+
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, 0))
+                return -1;
+
+        if (ptl_parse_nid (&nid, argv[1]) != 0 ||
+                nid == PTL_NID_ANY) {
+                fprintf (stderr, "Can't parse NID: %s\n", argv[1]);
+                return -1;
+        }
+
+        if (ptl_parse_ipaddr (&ip, argv[2]) != 0) {
+                fprintf (stderr, "Can't parse ip addr: %s\n", argv[2]);
+                return -1;
+        }
+
+        port = atol (argv[3]);
+        
+        if (argc > 4) {
+                char *opts = argv[4];
+                
+                while (*opts != 0)
+                        switch (*opts++) {
+                        case 'x':
+                                xchange_nids = 1;
+                                break;
+                        case 'i':
+                                irq_affinity = 1;
+                                break;
+                        case 's':
+                                share = 1;
+                                break;
+                        default:
+                                fprintf (stderr, "Can't parse options: %s\n",
+                                         argv[4]);
+                                return -1;
+                        }
+        }
+
+        PORTAL_IOC_INIT (data);
+        data.ioc_nal     = g_nal;
+        data.ioc_nal_cmd = NAL_CMD_ADD_AUTOCONN;
+        data.ioc_nid     = nid;
+        data.ioc_id      = ip;
+        data.ioc_misc    = port;
+        /* only passing one buffer size! */
+        data.ioc_size    = MAX (g_socket_rxmem, g_socket_txmem);
+        data.ioc_flags   = (g_socket_nonagle ? 1 : 0) |
+                           (xchange_nids     ? 2 : 0) |
+                           (irq_affinity     ? 4 : 0) |
+                           (share            ? 8 : 0);
+
+        rc = l_ioctl (PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
+        if (rc != 0) {
+                fprintf (stderr, "failed to enable autoconnect: %s\n",
+                         strerror (errno));
+                return -1;
+        }
+        
+        return 0;
+}
+
+int 
+jt_ptl_del_autoconnect (int argc, char **argv)
+{
+        struct portal_ioctl_data data;
+        ptl_nid_t                nid = PTL_NID_ANY;
+        __u32                    ip  = 0;
+        int                      share = 0;
+        int                      keep_conn = 0;
+        int                      rc;
+
+        if (argc > 4) {
+                fprintf (stderr, "usage: %s [nid] [ipaddr] [sk]\n",
+                         argv[0]);
+                return 0;
+        }
+
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, 0))
+                return -1;
+
+        if (argc > 1 &&
+            ptl_parse_nid (&nid, argv[1]) != 0) {
+                fprintf (stderr, "Can't parse nid: %s\n", argv[1]);
+                return -1;
+        }
+
+        if (argc > 2 &&
+            ptl_parse_ipaddr (&ip, argv[2]) != 0) {
+                fprintf (stderr, "Can't parse ip addr: %s\n", argv[2]);
+                return -1;
+        }
+
+        if (argc > 3) {
+                char *opts = argv[3];
+                
+                while (*opts != 0)
+                        switch (*opts++) {
+                        case 's':
+                                share = 1;
+                                break;
+                        case 'k':
+                                keep_conn = 1;
+                                break;
+                        default:
+                                fprintf (stderr, "Can't parse flags: %s\n", 
+                                         argv[3]);
+                                return -1;
+                        }
+        }
+
+        PORTAL_IOC_INIT (data);
+        data.ioc_nal     = g_nal;
+        data.ioc_nal_cmd = NAL_CMD_DEL_AUTOCONN;
+        data.ioc_nid     = nid;
+        data.ioc_id      = ip;
+        data.ioc_flags   = (share     ? 1 : 0) |
+                           (keep_conn ? 2 : 0);
+        
+        rc = l_ioctl (PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
+        if (rc != 0) {
+                fprintf (stderr, "failed to remove autoconnect route: %s\n",
+                         strerror (errno));
+                return -1;
+        }
+        
+        return 0;
+}
+
+int 
+jt_ptl_print_connections (int argc, char **argv)
+{
+        struct portal_ioctl_data data;
+        char                     buffer[64];
+        int                      index;
+        int                      rc;
+
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, 0))
+                return -1;
+
+        for (index = 0;;index++) {
+                PORTAL_IOC_INIT (data);
+                data.ioc_nal     = g_nal;
+                data.ioc_nal_cmd = NAL_CMD_GET_CONN;
+                data.ioc_count   = index;
+                
+                rc = l_ioctl (PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
+                if (rc != 0)
+                        break;
+
+                printf (LPD64"@%s:%d\n",
+                        data.ioc_nid, 
+                        ptl_ipaddr_2_str (data.ioc_id, buffer),
+                        data.ioc_misc);
+        }
+
+        if (index == 0)
+                printf ("<no connections>\n");
+        return 0;
 }
 
 int
@@ -303,17 +577,17 @@ exchange_nids (int cfd, ptl_nid_t my_nid, ptl_nid_t *peer_nid)
                 return (-1);
         }
 
-        if (__cpu_to_le32 (hmv->magic) != PORTALS_PROTO_MAGIC) {
+        if (hmv->magic != __cpu_to_le32 (PORTALS_PROTO_MAGIC)) {
                 fprintf (stderr, "Bad magic %#08x (%#08x expected)\n", 
-                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC);
+                         __le32_to_cpu (hmv->magic), PORTALS_PROTO_MAGIC);
                 return (-1);
         }
 
-        if (__cpu_to_le16 (hmv->version_major) != PORTALS_PROTO_VERSION_MAJOR ||
-            __cpu_to_le16 (hmv->version_minor) != PORTALS_PROTO_VERSION_MINOR) {
+        if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
+            hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
                 fprintf (stderr, "Incompatible protocol version %d.%d (%d.%d expected)\n",
-                         __cpu_to_le16 (hmv->version_major),
-                         __cpu_to_le16 (hmv->version_minor),
+                         __le16_to_cpu (hmv->version_major),
+                         __le16_to_cpu (hmv->version_minor),
                          PORTALS_PROTO_VERSION_MAJOR,
                          PORTALS_PROTO_VERSION_MINOR);
         }
@@ -328,12 +602,12 @@ exchange_nids (int cfd, ptl_nid_t my_nid, ptl_nid_t *peer_nid)
         }
 
         /* ...and check we got what we expected */
-        if (__cpu_to_le32 (hdr.type) != PTL_MSG_HELLO ||
-            __cpu_to_le32 (PTL_HDR_LENGTH (&hdr)) != 0) {
+        if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
+            PTL_HDR_LENGTH (&hdr) != __cpu_to_le32 (0)) {
                 fprintf (stderr, "Expecting a HELLO hdr with 0 payload,"
                          " but got type %d with %d payload\n",
-                         __cpu_to_le32 (hdr.type),
-                         __cpu_to_le32 (PTL_HDR_LENGTH (&hdr)));
+                         __le32_to_cpu (hdr.type),
+                         __le32_to_cpu (PTL_HDR_LENGTH (&hdr)));
                 return (-1);
         }
         
@@ -343,237 +617,189 @@ exchange_nids (int cfd, ptl_nid_t my_nid, ptl_nid_t *peer_nid)
 
 int jt_ptl_connect(int argc, char **argv)
 {
-        if (argc < 2) {
-        usage:
-                fprintf(stderr, "usage: %s <hostname port [xi]> or <elan ID>\n",
-                        argv[0]);
+        ptl_nid_t peer_nid;
+        struct portal_ioctl_data data;
+        struct sockaddr_in srvaddr;
+        __u32 ipaddr;
+        char *flag;
+        int fd, rc;
+        int nonagle = 0;
+        int rxmem = 0;
+        int txmem = 0;
+        int bind_irq = 0;
+        int xchange_nids = 0;
+        int port;
+        int o;
+        int olen;
+
+        if (argc < 3) {
+                fprintf(stderr, "usage: %s ip port [xi]\n", argv[0]);
                 return 0;
         }
-        if (g_nal == 0) {
-                fprintf(stderr, "Error: you must run the 'network' command "
-                        "first.\n");
+
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, TOENAL, 0))
+                return -1;
+        
+        rc = ptl_parse_ipaddr (&ipaddr, argv[1]);
+        if (rc != 0) {
+                fprintf(stderr, "Can't parse hostname: %s\n", argv[1]);
                 return -1;
         }
-        if (g_nal == SOCKNAL || g_nal == TOENAL) {
-                ptl_nid_t peer_nid;
-                struct hostent *he;
-                struct portal_ioctl_data data;
-                struct sockaddr_in srvaddr;
-                char *flag;
-                int fd, rc;
-                int nonagle = 0;
-                int rxmem = 0;
-                int txmem = 0;
-                int bind_irq = 0;
-                int xchange_nids = 0;
-                int o;
-                int olen;
-                
-                if (argc < 3) {
-                        goto usage;
-                }
-
-                he = ptl_gethostbyname(argv[1]);
-                if (!he)
-                        return -1;
-
-                g_port = atol(argv[2]);
-
-                if (argc > 3)
-                        for (flag = argv[3]; *flag != 0; flag++)
-                                switch (*flag)
-                                {
-                                case 'i':
-                                        bind_irq = 1;
-                                        break;
-                                        
-                                case 'x':
-                                        xchange_nids = 1;
-                                        break;
-
-                                default:
-                                        fprintf (stderr, "unrecognised flag '%c'\n",
-                                                 *flag);
-                                        return (-1);
-                                }
-                
-                memset(&srvaddr, 0, sizeof(srvaddr));
-                srvaddr.sin_family = AF_INET;
-                srvaddr.sin_port = htons(g_port);
-                srvaddr.sin_addr.s_addr = *(__u32 *)he->h_addr;
-        
-                fd = socket(PF_INET, SOCK_STREAM, 0);
-                if ( fd < 0 ) {
-                        fprintf(stderr, "socket() failed: %s\n",
-                                strerror(errno));
-                        return -1;
-                }
 
-                if (g_socket_nonagle)
-                {
-                        o = 1;
-                        if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &o, sizeof (o)) != 0)
-                        { 
-                                fprintf(stderr, "cannot disable nagle: %s\n", strerror(errno));
+        port = atol(argv[2]);
+        if (argc > 3)
+                for (flag = argv[3]; *flag != 0; flag++)
+                        switch (*flag)
+                        {
+                        case 'i':
+                                bind_irq = 1;
+                                break;
+                                
+                        case 'x':
+                                xchange_nids = 1;
+                                break;
+                                
+                        default:
+                                fprintf (stderr, "unrecognised flag '%c'\n",
+                                         *flag);
                                 return (-1);
                         }
-                }
 
-                if (g_socket_rxmem != 0)
-                {
-                        o = g_socket_rxmem;
-                        if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &o, sizeof (o)) != 0)
-                        { 
-                                fprintf(stderr, "cannot set receive buffer size: %s\n", strerror(errno));
-                                return (-1);
-                        }
-                }
+        memset(&srvaddr, 0, sizeof(srvaddr));
+        srvaddr.sin_family = AF_INET;
+        srvaddr.sin_port = htons(port);
+        srvaddr.sin_addr.s_addr = htonl(ipaddr);
 
-                if (g_socket_txmem != 0)
-                {
-                        o = g_socket_txmem;
-                        if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &o, sizeof (o)) != 0)
-                        { 
-                                fprintf(stderr, "cannot set send buffer size: %s\n", strerror(errno));
-                                return (-1);
-                        }
+        fd = socket(PF_INET, SOCK_STREAM, 0);
+        if ( fd < 0 ) {
+                fprintf(stderr, "socket() failed: %s\n", strerror(errno));
+                return -1;
+        }
+
+        if (g_socket_nonagle)
+        {
+                o = 1;
+                if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &o, sizeof (o)) != 0) { 
+                        fprintf(stderr, "cannot disable nagle: %s\n", strerror(errno));
+                        return (-1);
                 }
+        }
 
-                rc = connect(fd, (struct sockaddr *)&srvaddr, sizeof(srvaddr));
-                if ( rc == -1 ) { 
-                        fprintf(stderr, "connect() failed: %s\n",
-                                strerror(errno));
-                        return -1;
+        if (g_socket_rxmem != 0) {
+                o = g_socket_rxmem;
+                if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &o, sizeof (o)) != 0) { 
+                        fprintf(stderr, "cannot set receive buffer size: %s\n", strerror(errno));
+                        return (-1);
                 }
+        }
 
-                olen = sizeof (txmem);
-                if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txmem, &olen) != 0)
-                        fprintf (stderr, "Can't get send buffer size: %s\n", strerror (errno));
-                olen = sizeof (rxmem);
-                if (getsockopt (fd, SOL_SOCKET, SO_RCVBUF, &rxmem, &olen) != 0)
-                        fprintf (stderr, "Can't get receive buffer size: %s\n", strerror (errno));
-                olen = sizeof (nonagle);
-                if (getsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &nonagle, &olen) != 0)
-                        fprintf (stderr, "Can't get nagle: %s\n", strerror (errno));
-
-                if (xchange_nids) {
-                        
-                        PORTAL_IOC_INIT (data);
-                        data.ioc_nal = g_nal;
-                        rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_GET_NID, &data);
-                        if (rc != 0)
-                        {
-                                fprintf (stderr, "failed to get my nid: %s\n",
-                                         strerror (errno));
-                                close (fd);
-                                return (-1);
-                        }
-                        
-                        rc = exchange_nids (fd, data.ioc_nid, &peer_nid);
-                        if (rc != 0)
-                        {
-                                close (fd);
-                                return (-1);
-                        }
+        if (g_socket_txmem != 0) {
+                o = g_socket_txmem;
+                if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &o, sizeof (o)) != 0) { 
+                        fprintf(stderr, "cannot set send buffer size: %s\n", strerror(errno));
+                        return (-1);
                 }
-                else
-                        peer_nid = ntohl (srvaddr.sin_addr.s_addr); /* HOST byte order */
+        }
 
-                printf("Connected host: %s NID "LPX64" snd: %d rcv: %d nagle: %s\n", argv[1],
-                       peer_nid, txmem, rxmem, nonagle ? "Disabled" : "Enabled");
+        rc = connect(fd, (struct sockaddr *)&srvaddr, sizeof(srvaddr));
+        if ( rc == -1 ) { 
+                fprintf(stderr, "connect() failed: %s\n", strerror(errno));
+                return -1;
+        }
 
-                PORTAL_IOC_INIT(data);
-                data.ioc_fd = fd;
+        olen = sizeof (txmem);
+        if (getsockopt (fd, SOL_SOCKET, SO_SNDBUF, &txmem, &olen) != 0)
+                fprintf (stderr, "Can't get send buffer size: %s\n", strerror (errno));
+        olen = sizeof (rxmem);
+        if (getsockopt (fd, SOL_SOCKET, SO_RCVBUF, &rxmem, &olen) != 0)
+                fprintf (stderr, "Can't get receive buffer size: %s\n", strerror (errno));
+        olen = sizeof (nonagle);
+        if (getsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &nonagle, &olen) != 0)
+                fprintf (stderr, "Can't get nagle: %s\n", strerror (errno));
+
+        if (!xchange_nids) 
+                peer_nid = ipaddr;
+        else {
+                PORTAL_IOC_INIT (data);
                 data.ioc_nal = g_nal;
-                data.ioc_nal_cmd = NAL_CMD_REGISTER_PEER_FD;
-                data.ioc_nid = peer_nid;
-                data.ioc_flags = bind_irq;
-                
-                rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
-                if (rc) {
-                        fprintf(stderr, "failed to register fd with portals: "
-                                "%s\n", strerror(errno));
+                rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_GET_NID, &data);
+                if (rc != 0) {
+                        fprintf (stderr, "failed to get my nid: %s\n",
+                                 strerror (errno));
                         close (fd);
-                        return -1;
+                        return (-1);
                 }
 
-                g_nid = peer_nid;
-                printf("Connection to "LPX64" registered with socknal\n", g_nid);
-
-                rc = close(fd);
-                if (rc) {
-                        fprintf(stderr, "close failed: %d\n", rc);
-                }
-        } else if (g_nal == QSWNAL) {
-                g_nid = atoi(argv[1]);
-        } else if (g_nal == GMNAL) {
-                g_nid = atoi(argv[1]);
-        } else if (g_nal == SCIMACNAL) {
-                unsigned int    tmpnid;
-                if(sscanf(argv[1], "%x", &tmpnid) == 1) {
-                        g_nid=tmpnid;
-                }
-                else {
-                        fprintf(stderr, "nid %s invalid for SCI nal\n", argv[1]);
+                rc = exchange_nids (fd, data.ioc_nid, &peer_nid);
+                if (rc != 0) {
+                        close (fd);
+                        return (-1);
                 }
+        } 
+        printf("Connected host: %s NID "LPX64" snd: %d rcv: %d nagle: %s\n", argv[1],
+               peer_nid, txmem, rxmem, nonagle ? "Disabled" : "Enabled");
 
+        PORTAL_IOC_INIT(data);
+        data.ioc_fd = fd;
+        data.ioc_nal = g_nal;
+        data.ioc_nal_cmd = NAL_CMD_REGISTER_PEER_FD;
+        data.ioc_nid = peer_nid;
+        data.ioc_flags = bind_irq;
 
-        } else {
-                fprintf(stderr, "This should never happen.  Also it is very "
-                        "bad.\n");
+        rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
+        if (rc) {
+                fprintf(stderr, "failed to register fd with portals: %s\n", 
+                        strerror(errno));
+                close (fd);
+                return -1;
         }
 
+        printf("Connection to "LPX64" registered with socknal\n", peer_nid);
+
+        rc = close(fd);
+        if (rc)
+                fprintf(stderr, "close failed: %d\n", rc);
+
         return 0;
 }
 
 int jt_ptl_disconnect(int argc, char **argv)
 {
-        if (argc > 2) {
-                fprintf(stderr, "usage: %s [hostname]\n", argv[0]);
+        struct portal_ioctl_data data;
+        ptl_nid_t                nid = PTL_NID_ANY;
+        __u32                    ipaddr = 0;
+        int                      rc;
+
+        if (argc > 3) {
+                fprintf(stderr, "usage: %s [nid] [ipaddr]\n", argv[0]);
                 return 0;
         }
-        if (g_nal == 0) {
-                fprintf(stderr, "Error: you must run the 'network' command "
-                        "first.\n");
+
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, TOENAL, 0))
+                return -1;
+
+        if (argc >= 2 &&
+            ptl_parse_nid (&nid, argv[1]) != 0) {
+                fprintf (stderr, "Can't parse nid %s\n", argv[1]);
                 return -1;
         }
-        if (g_nal == SOCKNAL || g_nal == TOENAL) {
-                struct hostent *he;
-                struct portal_ioctl_data data;
-                int rc;
 
-                PORTAL_IOC_INIT(data);
-                if (argc == 2) {
-                        he = ptl_gethostbyname(argv[1]);
-                        if (!he) 
-                                return -1;
-                        
-                        data.ioc_nid = ntohl (*(__u32 *)he->h_addr); /* HOST byte order */
+        if (argc >= 3 &&
+            ptl_parse_ipaddr (&ipaddr, argv[2]) != 0) {
+                fprintf (stderr, "Can't parse ip addr %s\n", argv[2]);
+                return -1;
+        }
 
-                } else {
-                        printf("Disconnecting ALL connections.\n");
-                        /* leave ioc_nid zeroed == disconnect all */
-                }
-                data.ioc_nal = g_nal;
-                data.ioc_nal_cmd = NAL_CMD_CLOSE_CONNECTION;
-                rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
-                if (rc) {
-                        fprintf(stderr, "failed to remove connection: %s\n",
-                                strerror(errno));
-                        return -1;
-                }
-        } else if (g_nal == QSWNAL) {
-                printf("'disconnect' doesn't make any sense for "
-                        "elan.\n");
-        } else if (g_nal == GMNAL) {
-                printf("'disconnect' doesn't make any sense for "
-                        "GM.\n");
-        } else if (g_nal == SCIMACNAL) {
-                printf("'disconnect' doesn't make any sense for "
-                        "SCI.\n");
-        } else {
-                fprintf(stderr, "This should never happen.  Also it is very "
-                        "bad.\n");
+        PORTAL_IOC_INIT(data);
+        data.ioc_nal     = g_nal;
+        data.ioc_nal_cmd = NAL_CMD_CLOSE_CONNECTION;
+        data.ioc_nid     = nid;
+        data.ioc_id      = ipaddr;
+        
+        rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
+        if (rc) {
+                fprintf(stderr, "failed to remove connection: %s\n",
+                        strerror(errno));
                 return -1;
         }
 
@@ -582,49 +808,40 @@ int jt_ptl_disconnect(int argc, char **argv)
 
 int jt_ptl_push_connection (int argc, char **argv)
 {
-        if (argc > 2) {
-                fprintf(stderr, "usage: %s [hostname]\n", argv[0]);
+        struct portal_ioctl_data data;
+        int                      rc;
+        ptl_nid_t                nid = PTL_NID_ANY;
+        __u32                    ipaddr = 0;
+
+        if (argc > 3) {
+                fprintf(stderr, "usage: %s [nid] [ip]\n", argv[0]);
                 return 0;
         }
-        if (g_nal == 0) {
-                fprintf(stderr, "Error: you must run the 'network' command "
-                        "first.\n");
+
+        if (!g_nal_is_compatible (argv[0], SOCKNAL, TOENAL, 0))
+                return -1;
+        
+        if (argc > 1 &&
+            ptl_parse_nid (&nid, argv[1]) != 0) {
+                fprintf(stderr, "Can't parse nid: %s\n", argv[1]);
                 return -1;
         }
-        if (g_nal == SOCKNAL || g_nal == TOENAL) {
-                struct hostent *he;
-                struct portal_ioctl_data data;
-                int rc;
-
-                PORTAL_IOC_INIT(data);
-                if (argc == 2) {
-                        he = ptl_gethostbyname(argv[1]);
-                        if (!he)
-                                return -1;
                         
-                        data.ioc_nid = ntohl (*(__u32 *)he->h_addr); /* HOST byte order */
+        if (argc > 2 &&
+            ptl_parse_ipaddr (&ipaddr, argv[2]) != 0) {
+                fprintf(stderr, "Can't parse ipaddr: %s\n", argv[2]);
+        }
 
-                } else {
-                        printf("Pushing ALL connections.\n");
-                        /* leave ioc_nid zeroed == disconnect all */
-                }
-                data.ioc_nal = g_nal;
-                data.ioc_nal_cmd = NAL_CMD_PUSH_CONNECTION;
-                rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
-                if (rc) {
-                        fprintf(stderr, "failed to push connection: %s\n",
-                                strerror(errno));
-                        return -1;
-                }
-        } else if (g_nal == QSWNAL) {
-                printf("'push' doesn't make any sense for elan.\n");
-        } else if (g_nal == GMNAL) {
-                printf("'push' doesn't make any sense for GM.\n");
-        } else if (g_nal == SCIMACNAL) {
-                printf("'push' doesn't make any sense for SCI.\n");
-        } else {
-                fprintf(stderr, "This should never happen.  Also it is very "
-                        "bad.\n");
+        PORTAL_IOC_INIT(data);
+        data.ioc_nal     = g_nal;
+        data.ioc_nal_cmd = NAL_CMD_PUSH_CONNECTION;
+        data.ioc_nid     = nid;
+        data.ioc_id      = ipaddr;
+
+        rc = l_ioctl(PORTALS_DEV_ID, IOC_PORTAL_NAL_CMD, &data);
+        if (rc) {
+                fprintf(stderr, "failed to push connection: %s\n",
+                        strerror(errno));
                 return -1;
         }
 
index 8c56d93..50d9c87 100644 (file)
 
 command_t list[] = {
         {"network", jt_ptl_network, 0,"setup the NAL (args: nal name)"},
-        {"connect", jt_ptl_connect, 0, "connect to a remote nid (args: <hostname port> | <id> for tcp/elan respectively)"},
-        {"disconnect", jt_ptl_disconnect, 0, "disconnect from a remote nid (args: [hostname]"},
-        {"push", jt_ptl_push_connection, 0, "flush connection to a remote nid (args: [hostname]"},
+        {"print_autoconns", jt_ptl_print_autoconnects, 0, "print autoconnect entries (no args)"},
+        {"add_autoconn", jt_ptl_add_autoconnect, 0, "add autoconnect entry (args: nid host [ixs])"},
+        {"del_autoconn", jt_ptl_del_autoconnect, 0, "delete autoconnect entry (args: [nid] [host] [ks])"},
+        {"print_conns", jt_ptl_print_connections, 0, "print connections (no args)"},
+        {"connect", jt_ptl_connect, 0, "connect to a remote nid (args: host port [xi])"},
+        {"disconnect", jt_ptl_disconnect, 0, "disconnect from a remote nid (args: [nid] [host]"},
+        {"push", jt_ptl_push_connection, 0, "flush connection to a remote nid (args: [nid]"},
         {"ping", jt_ptl_ping, 0, "do a ping test (args: nid [count] [size] [timeout])"},
         {"shownid", jt_ptl_shownid, 0, "print the local NID"},
         {"mynid", jt_ptl_mynid, 0, "inform the socknal of the local NID (args: [hostname])"},
index 2133391..85fe8e7 100644 (file)
@@ -883,6 +883,10 @@ extern ptl_handle_ni_t  kscimacnal_ni;
 #define NAL_CMD_CLOSE_CONNECTION     101
 #define NAL_CMD_REGISTER_MYNID       102
 #define NAL_CMD_PUSH_CONNECTION      103
+#define NAL_CMD_GET_CONN             104
+#define NAL_CMD_DEL_AUTOCONN         105
+#define NAL_CMD_ADD_AUTOCONN         106
+#define NAL_CMD_GET_AUTOCONN         107
 
 enum {
         DEBUG_DAEMON_START       =  1,
index dc02780..ffe7e5b 100644 (file)
@@ -34,6 +34,10 @@ char * ptl_nid2str (char *buffer, ptl_nid_t nid);
 
 int ptl_initialize(int argc, char **argv);
 int jt_ptl_network(int argc, char **argv);
+int jt_ptl_print_autoconnects (int argc, char **argv);
+int jt_ptl_add_autoconnect (int argc, char **argv);
+int jt_ptl_del_autoconnect (int argc, char **argv);
+int jt_ptl_print_connections (int argc, char **argv);
 int jt_ptl_connect(int argc, char **argv);
 int jt_ptl_disconnect(int argc, char **argv);
 int jt_ptl_push_connection(int argc, char **argv);
index c03d592..a6b4b93 100644 (file)
@@ -569,11 +569,6 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         int                sumnob;
 #endif
         
-        /* NB, the return code from this procedure is ignored.
-         * If we can't send, we must still complete with lib_finalize().
-         * We'll have to wait for 3.2 to return an error event.
-         */
-
         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
                " pid %u\n", payload_nob, payload_niov, nid, pid);
 
@@ -588,8 +583,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         if (payload_nob > KQSW_MAXPAYLOAD) {
                 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
                         payload_nob, KQSW_MAXPAYLOAD);
-                lib_finalize (&kqswnal_lib, private, cookie);
-                return (-1);
+                return (PTL_FAIL);
         }
 
         if (kqswnal_nid2elanid (nid) < 0) {     /* Can't send direct: find gateway? */
@@ -597,14 +591,12 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                 if (rc != 0) {
                         CERROR("Can't route to "LPX64": router error %d\n",
                                nid, rc);
-                        lib_finalize (&kqswnal_lib, private, cookie);
-                        return (-1);
+                        return (PTL_FAIL);
                 }
                 if (kqswnal_nid2elanid (gatewaynid) < 0) {
                         CERROR("Bad gateway "LPX64" for "LPX64"\n",
                                gatewaynid, nid);
-                        lib_finalize (&kqswnal_lib, private, cookie);
-                        return (-1);
+                        return (PTL_FAIL);
                 }
                 nid = gatewaynid;
         }
@@ -616,8 +608,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                           in_interrupt()));
         if (ktx == NULL) {
                 kqswnal_cerror_hdr (hdr);
-                lib_finalize (&kqswnal_lib, private, cookie);
-                return (-1);
+                return (PTL_NOSPACE);
         }
 
         memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
@@ -670,8 +661,7 @@ kqswnal_sendmsg (nal_cb_t     *nal,
                                                          payload_niov, payload_iov);
                         if (rc != 0) {
                                 kqswnal_put_idle_tx (ktx);
-                                lib_finalize (&kqswnal_lib, private, cookie);
-                                return (-1);
+                                return (PTL_FAIL);
                         }
                 } 
         }
@@ -686,12 +676,11 @@ kqswnal_sendmsg (nal_cb_t     *nal,
         rc = kqswnal_launch (ktx);
         if (rc != 0) {                    /* failed? */
                 CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc);
-                lib_finalize (&kqswnal_lib, private, cookie);
-                return (-1);
+                return (PTL_FAIL);
         }
 
         CDEBUG(D_NET, "send to "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
-        return (0);
+        return (PTL_OK);
 }
 
 static int
index 91d971c..e7232a0 100644 (file)
@@ -59,7 +59,9 @@ ksocknal_api_shutdown(nal_t *nal, int ni)
 {
         CDEBUG (D_NET, "closing all connections\n");
 
-        return ksocknal_close_sock(0);          /* close all sockets */
+        ksocknal_del_route (PTL_NID_ANY, 0, 0, 0);
+        ksocknal_close_conn (PTL_NID_ANY, 0);
+        return PTL_OK;
 }
 
 void
@@ -104,15 +106,6 @@ ksocknal_init(int interface, ptl_pt_index_t ptl_size,
  *  EXTRA functions follow
  */
 
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-#define SOCKET_I(inode) (&(inode)->u.socket_i)
-#endif
-static __inline__ struct socket *
-socki_lookup(struct inode *inode)
-{
-        return SOCKET_I(inode);
-}
-
 int
 ksocknal_set_mynid(ptl_nid_t nid)
 {
@@ -132,23 +125,43 @@ ksocknal_set_mynid(ptl_nid_t nid)
 }
 
 void
-ksocknal_bind_irq (unsigned int irq, int cpu)
+ksocknal_bind_irq (unsigned int irq)
 {
 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
-        char  cmdline[64];
-        char *argv[] = {"/bin/sh",
-                        "-c",
-                        cmdline,
-                        NULL};
-        char *envp[] = {"HOME=/",
-                        "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
-                        NULL};
+        int              bind;
+        unsigned long    flags;
+        char             cmdline[64];
+        ksock_irqinfo_t *info;
+        char            *argv[] = {"/bin/sh",
+                                   "-c",
+                                   cmdline,
+                                   NULL};
+        char            *envp[] = {"HOME=/",
+                                   "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
+                                   NULL};
+
+        LASSERT (irq < NR_IRQS);
+        if (irq == 0)                           /* software NIC */
+                return;
+
+        info = &ksocknal_data.ksnd_irqinfo[irq];
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        LASSERT (info->ksni_valid);
+        bind = !info->ksni_bound;
+        info->ksni_bound = 1;
+
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        if (!bind)                              /* bound already */
+                return;
 
         snprintf (cmdline, sizeof (cmdline),
-                  "echo %d > /proc/irq/%u/smp_affinity", 1 << cpu, irq);
+                  "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
 
         printk (KERN_INFO "Binding irq %u to CPU %d with cmd: %s\n",
-                irq, cpu, cmdline);
+                irq, info->ksni_sched, cmdline);
 
         /* FIXME: Find a better method of setting IRQ affinity...
          */
@@ -157,201 +170,854 @@ ksocknal_bind_irq (unsigned int irq, int cpu)
 #endif
 }
 
+ksock_route_t *
+ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
+                       int irq_affinity, int xchange_nids, int nonagel)
+{
+        ksock_route_t *route;
+
+        PORTAL_ALLOC (route, sizeof (*route));
+        if (route == NULL)
+                return (NULL);
+
+        atomic_set (&route->ksnr_refcount, 1);
+        route->ksnr_sharecount = 0;
+        route->ksnr_peer = NULL;
+        route->ksnr_timeout = jiffies_64;
+        route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
+        route->ksnr_ipaddr = ipaddr;
+        route->ksnr_port = port;
+        route->ksnr_buffer_size = buffer_size;
+        route->ksnr_irq_affinity = irq_affinity;
+        route->ksnr_xchange_nids = xchange_nids;
+        route->ksnr_nonagel = nonagel;
+        route->ksnr_connecting = 0;
+        route->ksnr_deleted = 0;
+        route->ksnr_generation = 0;
+        route->ksnr_conn = NULL;
+
+        return (route);
+}
+
+void
+ksocknal_destroy_route (ksock_route_t *route)
+{
+        LASSERT (route->ksnr_sharecount == 0);
+        LASSERT (route->ksnr_conn == NULL);
+
+        if (route->ksnr_peer != NULL)
+                ksocknal_put_peer (route->ksnr_peer);
+
+        PORTAL_FREE (route, sizeof (*route));
+}
+
+void
+ksocknal_put_route (ksock_route_t *route)
+{
+        CDEBUG (D_OTHER, "putting route[%p] -> "LPX64" (%d)\n",
+                route, route->ksnr_peer->ksnp_nid,
+                atomic_read (&route->ksnr_refcount));
+
+        LASSERT (atomic_read (&route->ksnr_refcount) > 0);
+        if (!atomic_dec_and_test (&route->ksnr_refcount))
+             return;
+
+        ksocknal_destroy_route (route);
+}
+
+ksock_peer_t *
+ksocknal_create_peer (ptl_nid_t nid)
+{
+        ksock_peer_t *peer;
+
+        LASSERT (nid != PTL_NID_ANY);
+
+        PORTAL_ALLOC (peer, sizeof (*peer));
+        if (peer == NULL)
+                return (NULL);
+
+        memset (peer, 0, sizeof (*peer));
+
+        peer->ksnp_nid = nid;
+        atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
+        peer->ksnp_closing = 0;
+        INIT_LIST_HEAD (&peer->ksnp_conns);
+        INIT_LIST_HEAD (&peer->ksnp_routes);
+        INIT_LIST_HEAD (&peer->ksnp_tx_queue);
+
+        /* Can't unload while peers exist; ensures all I/O has terminated
+         * before unload attempts */
+        PORTAL_MODULE_USE;
+        atomic_inc (&ksocknal_data.ksnd_npeers);
+        return (peer);
+}
+
+void
+ksocknal_destroy_peer (ksock_peer_t *peer)
+{
+        CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
+
+        LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
+        LASSERT (list_empty (&peer->ksnp_conns));
+        LASSERT (list_empty (&peer->ksnp_routes));
+        LASSERT (list_empty (&peer->ksnp_tx_queue));
+
+        PORTAL_FREE (peer, sizeof (*peer));
+
+        /* NB a peer's connections and autoconnect routes keep a reference
+         * on their peer until they are destroyed, so we can be assured
+         * that _all_ state to do with this peer has been cleaned up when
+         * its refcount drops to zero. */
+        atomic_dec (&ksocknal_data.ksnd_npeers);
+        PORTAL_MODULE_UNUSE;
+}
+
+void
+ksocknal_put_peer (ksock_peer_t *peer)
+{
+        CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
+                peer, peer->ksnp_nid,
+                atomic_read (&peer->ksnp_refcount));
+
+        LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
+        if (!atomic_dec_and_test (&peer->ksnp_refcount))
+                return;
+
+        ksocknal_destroy_peer (peer);
+}
+
+ksock_peer_t *
+ksocknal_find_peer_locked (ptl_nid_t nid)
+{
+        struct list_head *peer_list = ksocknal_nid2peerlist (nid);
+        struct list_head *tmp;
+        ksock_peer_t     *peer;
+
+        list_for_each (tmp, peer_list) {
+
+                peer = list_entry (tmp, ksock_peer_t, ksnp_list);
+
+                LASSERT (!peer->ksnp_closing);
+                LASSERT (!(list_empty (&peer->ksnp_routes) &&
+                           list_empty (&peer->ksnp_conns)));
+
+                if (peer->ksnp_nid != nid)
+                        continue;
+
+                CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
+                       peer, nid, atomic_read (&peer->ksnp_refcount));
+                return (peer);
+        }
+        return (NULL);
+}
+
+ksock_peer_t *
+ksocknal_get_peer (ptl_nid_t nid)
+{
+        ksock_peer_t     *peer;
+
+        read_lock (&ksocknal_data.ksnd_global_lock);
+        peer = ksocknal_find_peer_locked (nid);
+        if (peer != NULL)                       /* +1 ref for caller? */
+                atomic_inc (&peer->ksnp_refcount);
+        read_unlock (&ksocknal_data.ksnd_global_lock);
+
+        return (peer);
+}
+
+void
+ksocknal_unlink_peer_locked (ksock_peer_t *peer)
+{
+        LASSERT (!peer->ksnp_closing);
+        peer->ksnp_closing = 1;
+        list_del (&peer->ksnp_list);
+        /* lose peerlist's ref */
+        ksocknal_put_peer (peer);
+}
+
+ksock_route_t *
+ksocknal_get_route_by_idx (int index)
+{
+        ksock_peer_t      *peer;
+        struct list_head  *ptmp;
+        ksock_route_t     *route;
+        struct list_head  *rtmp;
+        int                i;
+
+        read_lock (&ksocknal_data.ksnd_global_lock);
+
+        for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
+                list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
+                        peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
+
+                        LASSERT (!(list_empty (&peer->ksnp_routes) &&
+                                   list_empty (&peer->ksnp_conns)));
+
+                        list_for_each (rtmp, &peer->ksnp_routes) {
+                                if (index-- > 0)
+                                        continue;
+
+                                route = list_entry (rtmp, ksock_route_t, ksnr_list);
+                                atomic_inc (&route->ksnr_refcount);
+                                read_unlock (&ksocknal_data.ksnd_global_lock);
+                                return (route);
+                        }
+                }
+        }
+
+        read_unlock (&ksocknal_data.ksnd_global_lock);
+        return (NULL);
+}
+
 int
-ksocknal_add_sock (ptl_nid_t nid, int fd, int bind_irq)
+ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
+                    int nonagle, int xchange_nids, int bind_irq, int share)
 {
         unsigned long      flags;
+        ksock_peer_t      *peer;
+        ksock_peer_t      *peer2;
+        ksock_route_t     *route;
+        struct list_head  *rtmp;
+        ksock_route_t     *route2;
+        
+        if (nid == PTL_NID_ANY)
+                return (-EINVAL);
+
+        /* Have a brand new peer ready... */
+        peer = ksocknal_create_peer (nid);
+        if (peer == NULL)
+                return (-ENOMEM);
+
+        route = ksocknal_create_route (ipaddr, port, bufnob,
+                                       nonagle, xchange_nids, bind_irq);
+        if (route == NULL) {
+                ksocknal_put_peer (peer);
+                return (-ENOMEM);
+        }
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        peer2 = ksocknal_find_peer_locked (nid);
+        if (peer2 != NULL) {
+                ksocknal_put_peer (peer);
+                peer = peer2;
+        } else {
+                /* peer table takes existing ref on peer */
+                list_add (&peer->ksnp_list,
+                          ksocknal_nid2peerlist (nid));
+        }
+
+        route2 = NULL;
+        if (share) {
+                /* check for existing route to this NID via this ipaddr */
+                list_for_each (rtmp, &peer->ksnp_routes) {
+                        route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
+                        
+                        if (route2->ksnr_ipaddr == ipaddr)
+                                break;
+
+                        route2 = NULL;
+                }
+        }
+
+        if (route2 != NULL) {
+                ksocknal_put_route (route);
+                route = route2;
+        } else {
+                /* route takes a ref on peer */
+                route->ksnr_peer = peer;
+                atomic_inc (&peer->ksnp_refcount);
+                /* peer's route list takes existing ref on route */
+                list_add (&route->ksnr_list, &peer->ksnp_routes);
+        }
+        
+        route->ksnr_sharecount++;
+
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        return (0);
+}
+
+void
+ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
+{
+        ksock_peer_t *peer = route->ksnr_peer;
+        ksock_conn_t *conn = route->ksnr_conn;
+
+        if (!share)
+                route->ksnr_sharecount = 0;
+        else {
+                route->ksnr_sharecount--;
+                if (route->ksnr_sharecount != 0)
+                        return;
+        }
+
+        if (conn != NULL) {
+                if (!keep_conn)
+                        ksocknal_close_conn_locked (conn);
+                else {
+                        /* keeping the conn; just dissociate it and route... */
+                        conn->ksnc_route = NULL;
+                        route->ksnr_conn = NULL;
+                        ksocknal_put_route (route); /* drop conn's ref on route */
+                        ksocknal_put_conn (conn); /* drop route's ref on conn */
+                }
+        }
+
+        route->ksnr_deleted = 1;
+        list_del (&route->ksnr_list);
+        ksocknal_put_route (route);             /* drop peer's ref */
+
+        if (list_empty (&peer->ksnp_routes) &&
+            list_empty (&peer->ksnp_conns)) {
+                /* I've just removed the last autoconnect route of a peer
+                 * with no active connections */
+                ksocknal_unlink_peer_locked (peer);
+        }
+}
+
+int
+ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
+{
+        unsigned long      flags;
+        struct list_head  *ptmp;
+        struct list_head  *pnxt;
+        ksock_peer_t      *peer;
+        struct list_head  *rtmp;
+        struct list_head  *rnxt;
+        ksock_route_t     *route;
+        int                lo;
+        int                hi;
+        int                i;
+        int                rc = -ENOENT;
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        if (nid != PTL_NID_ANY)
+                lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
+        else {
+                lo = 0;
+                hi = ksocknal_data.ksnd_peer_hash_size - 1;
+        }
+
+        for (i = lo; i <= hi; i++) {
+                list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
+                        peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
+
+                        if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
+                                continue;
+
+                        list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
+                                route = list_entry (rtmp, ksock_route_t,
+                                                    ksnr_list);
+
+                                if (!(ipaddr == 0 ||
+                                      route->ksnr_ipaddr == ipaddr))
+                                        continue;
+
+                                ksocknal_del_route_locked (route, share, keep_conn);
+                                rc = 0;         /* matched something */
+                                if (share)
+                                        goto out;
+                        }
+                }
+        }
+ out:
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        return (rc);
+}
+
+ksock_conn_t *
+ksocknal_get_conn_by_idx (int index)
+{
+        ksock_peer_t      *peer;
+        struct list_head  *ptmp;
         ksock_conn_t      *conn;
-        struct file       *file = NULL;
-        struct socket     *sock = NULL;
-        ksock_sched_t     *sched = NULL;
-        unsigned int       irq = 0;
-        struct net_device *dev = NULL;
-        int                ret;
-        int                idx;
-        ENTRY;
-
-        LASSERT (!in_interrupt());
-
-        file = fget(fd);
-        if (file == NULL)
-                RETURN(-EINVAL);
-
-        ret = -EINVAL;
-        sock = socki_lookup(file->f_dentry->d_inode);
-        if (sock == NULL)
-                GOTO(error, ret);
-
-        ret = -ENOMEM;
-        PORTAL_ALLOC(conn, sizeof(*conn));
-        if (!conn)
-                GOTO(error, ret);
+        struct list_head  *ctmp;
+        int                i;
 
-        sock->sk->allocation = GFP_NOFS;    /* don't call info fs for alloc */
+        read_lock (&ksocknal_data.ksnd_global_lock);
 
-        conn->ksnc_file = file;
-        conn->ksnc_sock = sock;
-        conn->ksnc_saved_data_ready = sock->sk->data_ready;
-        conn->ksnc_saved_write_space = sock->sk->write_space;
-        conn->ksnc_peernid = nid;
-        atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for socklist */
+        for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
+                list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
+                        peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
 
-        conn->ksnc_rx_ready = 0;
-        conn->ksnc_rx_scheduled = 0;
-        ksocknal_new_packet (conn, 0);
+                        LASSERT (!(list_empty (&peer->ksnp_routes) &&
+                                   list_empty (&peer->ksnp_conns)));
 
-        INIT_LIST_HEAD (&conn->ksnc_tx_queue);
-        conn->ksnc_tx_ready = 0;
-        conn->ksnc_tx_scheduled = 0;
+                        list_for_each (ctmp, &peer->ksnp_conns) {
+                                if (index-- > 0)
+                                        continue;
+
+                                conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
+                                atomic_inc (&conn->ksnc_refcount);
+                                read_unlock (&ksocknal_data.ksnd_global_lock);
+                                return (conn);
+                        }
+                }
+        }
+
+        read_unlock (&ksocknal_data.ksnd_global_lock);
+        return (NULL);
+}
+
+void
+ksocknal_get_peer_addr (ksock_conn_t *conn)
+{
+        struct sockaddr_in sin;
+        int                len = sizeof (sin);
+        int                rc;
 
-#warning check it is OK to derefence sk->dst_cache->dev like this...
-        lock_sock (conn->ksnc_sock->sk);
+        rc = ksocknal_getconnsock (conn);
+        LASSERT (rc == 0);
+        
+        rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
+                                            (struct sockaddr *)&sin, &len, 2);
+        LASSERT (len <= sizeof (sin));
+        ksocknal_putconnsock (conn);
+
+        if (rc != 0) {
+                CERROR ("Error %d getting sock peer IP\n", rc);
+                return;
+        }
 
-        if (conn->ksnc_sock->sk->dst_cache != NULL) {
-                dev = conn->ksnc_sock->sk->dst_cache->dev;
-                if (dev != NULL) {
-                        irq = dev->irq;
+        conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
+        conn->ksnc_port   = ntohs (sin.sin_port);
+}
+
+unsigned int
+ksocknal_conn_irq (ksock_conn_t *conn)
+{
+        int                irq = 0;
+        int                rc;
+        struct dst_entry  *dst;
+
+        rc = ksocknal_getconnsock (conn);
+        LASSERT (rc == 0);
+        
+        dst = sk_dst_get (conn->ksnc_sock->sk);
+        if (dst != NULL) {
+                if (dst->dev != NULL) {
+                        irq = dst->dev->irq;
                         if (irq >= NR_IRQS) {
                                 CERROR ("Unexpected IRQ %x\n", irq);
                                 irq = 0;
                         }
                 }
+                dst_release (dst);
         }
+        
+        ksocknal_putconnsock (conn);
+        return (irq);
+}
 
-        release_sock (conn->ksnc_sock->sk);
+ksock_sched_t *
+ksocknal_choose_scheduler_locked (unsigned int irq)
+{
+        ksock_sched_t    *sched;
+        ksock_irqinfo_t  *info;
+        int               i;
 
-        write_lock_irqsave (&ksocknal_data.ksnd_socklist_lock, flags);
+        LASSERT (irq < NR_IRQS);
+        info = &ksocknal_data.ksnd_irqinfo[irq];
 
-        if (irq == 0 ||
-            ksocknal_data.ksnd_irq_info[irq] == SOCKNAL_IRQ_UNASSIGNED) {
-                /* This is a software NIC, or we haven't associated it with
-                 * a CPU yet */
+        if (irq != 0 &&                         /* hardware NIC */
+            info->ksni_valid) {                 /* already set up */
+                return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
+        }
 
-                /* Choose the CPU with the fewest connections */
-                sched = ksocknal_data.ksnd_schedulers;
-                for (idx = 1; idx < SOCKNAL_N_SCHED; idx++)
-                        if (sched->kss_nconns >
-                            ksocknal_data.ksnd_schedulers[idx].kss_nconns)
-                                sched = &ksocknal_data.ksnd_schedulers[idx];
+        /* software NIC (irq == 0) || not associated with a scheduler yet.
+         * Choose the CPU with the fewest connections... */
+        sched = &ksocknal_data.ksnd_schedulers[0];
+        for (i = 1; i < SOCKNAL_N_SCHED; i++)
+                if (sched->kss_nconns >
+                    ksocknal_data.ksnd_schedulers[i].kss_nconns)
+                        sched = &ksocknal_data.ksnd_schedulers[i];
 
-                if (irq != 0) {                 /* Hardware NIC */
-                        /* Remember which scheduler we chose */
-                        idx = sched - ksocknal_data.ksnd_schedulers;
+        if (irq != 0) {                         /* Hardware NIC */
+                info->ksni_valid = 1;
+                info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
 
-                        LASSERT (idx < SOCKNAL_IRQ_SCHED_MASK);
+                /* no overflow... */
+                LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
+        }
 
-                        if (bind_irq)       /* remember if we will bind below */
-                                idx |= SOCKNAL_IRQ_BOUND;
+        return (sched);
+}
 
-                        ksocknal_data.ksnd_irq_info[irq] = idx;
+int
+ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
+                      struct socket *sock, int bind_irq)
+{
+        unsigned long      flags;
+        ksock_conn_t      *conn;
+        ksock_peer_t      *peer;
+        ksock_peer_t      *peer2;
+        ksock_sched_t     *sched;
+        unsigned int       irq;
+        ksock_tx_t        *tx;
+        int                rc;
+
+        /* NB, sock has an associated file since (a) this connection might
+         * have been created in userland and (b) we need the refcounting so
+         * that we don't close the socket while I/O is being done on it. */
+        LASSERT (sock->file != NULL);
+
+        rc = ksocknal_set_linger (sock);
+        if (rc != 0)
+                return (rc);
+
+        peer = NULL;
+        if (route == NULL) {                    /* not autoconnect */
+                /* Assume this socket connects to a brand new peer */
+                peer = ksocknal_create_peer (nid);
+                if (peer == NULL)
+                        return (-ENOMEM);
+        }
+
+        PORTAL_ALLOC(conn, sizeof(*conn));
+        if (conn == NULL) {
+                if (peer != NULL)
+                        ksocknal_put_peer (peer);
+                return (-ENOMEM);
+        }
+
+        memset (conn, 0, sizeof (*conn));
+        conn->ksnc_peer = NULL;
+        conn->ksnc_route = NULL;
+        conn->ksnc_sock = sock;
+        conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
+        conn->ksnc_saved_write_space = sock->sk->sk_write_space;
+        atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
+
+        conn->ksnc_rx_ready = 0;
+        conn->ksnc_rx_scheduled = 0;
+        ksocknal_new_packet (conn, 0);
+
+        INIT_LIST_HEAD (&conn->ksnc_tx_queue);
+#if SOCKNAL_ZC
+        INIT_LIST_HEAD (&conn->ksnc_tx_pending);
+#endif
+        conn->ksnc_tx_ready = 0;
+        conn->ksnc_tx_scheduled = 0;
+        atomic_set (&conn->ksnc_tx_nob, 0);
+
+        ksocknal_get_peer_addr (conn);
+
+        irq = ksocknal_conn_irq (conn);
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        if (route != NULL) {
+                /* Autoconnected! */
+                LASSERT (route->ksnr_conn == NULL && route->ksnr_connecting);
+
+                if (route->ksnr_deleted) {
+                        /* This conn was autoconnected, but the autoconnect
+                         * route got deleted while it was being
+                         * established! */
+                        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
+                                                 flags);
+                        PORTAL_FREE (conn, sizeof (*conn));
+                        return (-ESTALE);
                 }
-        } else { 
-                /* This is a hardware NIC, associated with a CPU */
-                idx = ksocknal_data.ksnd_irq_info[irq];
 
-                /* Don't bind again if we've bound already */
-                if ((idx & SOCKNAL_IRQ_BOUND) != 0)
-                        bind_irq = 0;
-                
-                sched = &ksocknal_data.ksnd_schedulers[idx & SOCKNAL_IRQ_SCHED_MASK];
+
+                /* associate conn/route for auto-reconnect */
+                route->ksnr_conn = conn;
+                atomic_inc (&conn->ksnc_refcount);
+                conn->ksnc_route = route;
+                atomic_inc (&route->ksnr_refcount);
+                route->ksnr_connecting = 0;
+
+                route->ksnr_generation++;
+                route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
+
+                peer = route->ksnr_peer;
+        } else {
+                /* Not an autoconnected connection; see if there is an
+                 * existing peer for this NID */
+                peer2 = ksocknal_find_peer_locked (nid);
+                if (peer2 != NULL) {
+                        ksocknal_put_peer (peer);
+                        peer = peer2;
+                } else {
+                        list_add (&peer->ksnp_list,
+                                  ksocknal_nid2peerlist (nid));
+                        /* peer list takes over existing ref */
+                }
         }
 
+        LASSERT (!peer->ksnp_closing);
+
+        conn->ksnc_peer = peer;
+        atomic_inc (&peer->ksnp_refcount);
+
+        list_add (&conn->ksnc_list, &peer->ksnp_conns);
+        atomic_inc (&conn->ksnc_refcount);
+
+        sched = ksocknal_choose_scheduler_locked (irq);
         sched->kss_nconns++;
         conn->ksnc_scheduler = sched;
 
-        list_add(&conn->ksnc_list, &ksocknal_data.ksnd_socklist);
-
-        write_unlock_irqrestore (&ksocknal_data.ksnd_socklist_lock, flags);
+        /* NB my callbacks block while I hold ksnd_global_lock */
+        sock->sk->sk_user_data = conn;
+        sock->sk->sk_data_ready = ksocknal_data_ready;
+        sock->sk->sk_write_space = ksocknal_write_space;
+
+        /* Take all the packets blocking for a connection.
+         * NB, it might be nicer to share these blocked packets among any
+         * other connections that are becoming established, however that
+         * confuses the normal packet launching operation, which selects a
+         * connection and queues the packet on it without needing an
+         * exclusive lock on ksnd_global_lock. */
+        while (!list_empty (&peer->ksnp_tx_queue)) {
+                tx = list_entry (peer->ksnp_tx_queue.next,
+                                 ksock_tx_t, tx_list);
+
+                list_del (&tx->tx_list);
+                ksocknal_queue_tx_locked (tx, conn);
+        }
 
-        if (bind_irq &&                         /* irq binding required */
-            irq != 0)                           /* hardware NIC */
-                ksocknal_bind_irq (irq, sched - ksocknal_data.ksnd_schedulers);
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
 
-        /* NOW it's safe to get called back when socket is ready... */
-        sock->sk->user_data = conn;
-        sock->sk->data_ready = ksocknal_data_ready;
-        sock->sk->write_space = ksocknal_write_space;
+        if (bind_irq)                           /* irq binding required */
+                ksocknal_bind_irq (irq);
 
-        /* ...which I call right now to get things going */
+        /* Call the callbacks right now to get things going. */
         ksocknal_data_ready (sock->sk, 0);
         ksocknal_write_space (sock->sk);
 
         CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64"\n",
-               conn, conn->ksnc_peernid);
+               conn, conn->ksnc_peer->ksnp_nid);
 
-        /* Can't unload while connection active */
-        PORTAL_MODULE_USE;
-        RETURN(0);
+        ksocknal_put_conn (conn);
+        return (0);
+}
+
+void
+ksocknal_close_conn_locked (ksock_conn_t *conn)
+{
+        /* This just does the immmediate housekeeping, and queues the
+         * connection for the reaper to terminate. 
+         * Caller holds ksnd_global_lock exclusively in irq context */
+        ksock_peer_t   *peer = conn->ksnc_peer;
+        ksock_route_t  *route;
+
+        LASSERT (!conn->ksnc_closing);
+        conn->ksnc_closing = 1;
+        atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
+        
+        route = conn->ksnc_route;
+        if (route != NULL) {
+                /* dissociate conn from route... */
+                LASSERT (!route->ksnr_connecting &&
+                         !route->ksnr_deleted);
+
+                route->ksnr_conn = NULL;
+                conn->ksnc_route = NULL;
+
+                ksocknal_put_route (route);     /* drop conn's ref on route */
+                ksocknal_put_conn (conn);       /* drop route's ref on conn */
+        }
+
+        /* ksnd_deathrow_conns takes over peer's ref */
+        list_del (&conn->ksnc_list);
 
-error:
-        fput(file);
-        return (ret);
+        if (list_empty (&peer->ksnp_conns) &&
+            list_empty (&peer->ksnp_routes)) {
+                /* I've just closed last conn belonging to a
+                 * non-autoconnecting peer */
+                ksocknal_unlink_peer_locked (peer);
+        }
+
+        spin_lock (&ksocknal_data.ksnd_reaper_lock);
+
+        list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
+        if (waitqueue_active (&ksocknal_data.ksnd_reaper_waitq))
+                wake_up (&ksocknal_data.ksnd_reaper_waitq);
+                
+        spin_unlock (&ksocknal_data.ksnd_reaper_lock);
 }
 
-/* Passing in a zero nid will close all connections */
 int
-ksocknal_close_sock(ptl_nid_t nid)
+ksocknal_close_conn_unlocked (ksock_conn_t *conn) 
 {
-        long               flags;
-        ksock_conn_t      *conn;
-        LIST_HEAD         (death_row);
-        struct list_head  *tmp;
+        unsigned long flags;
+        int           did_it = 0;
+        
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+                
+        if (!conn->ksnc_closing) {
+                did_it = 1;
+                ksocknal_close_conn_locked (conn);
+        }
+        
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
 
-        LASSERT (!in_interrupt());
-        write_lock_irqsave (&ksocknal_data.ksnd_socklist_lock, flags);
+        return (did_it);
+}
 
-        if (nid == 0) {                         /* close ALL connections */
-                /* insert 'death row' into the socket list... */
-                list_add (&death_row, &ksocknal_data.ksnd_socklist);
-                /* ...extract and reinitialise the socket list itself... */
-                list_del_init (&ksocknal_data.ksnd_socklist);
-                /* ...and voila, death row is the proud owner of all conns */
-        } else list_for_each (tmp, &ksocknal_data.ksnd_socklist) {
+void
+ksocknal_terminate_conn (ksock_conn_t *conn)
+{
+        /* This gets called by the reaper (guaranteed thread context) to
+         * disengage the socket from its callbacks and close it.
+         * ksnc_refcount will eventually hit zero, and then the reaper will
+         * destroy it. */
+        unsigned long   flags;
+
+        /* serialise with callbacks */
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        LASSERT (conn->ksnc_closing);
+        
+        /* Remove conn's network callbacks.
+         * NB I _have_ to restore the callback, rather than storing a noop,
+         * since the socket could survive past this module being unloaded!! */
+        conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
+        conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
+
+        /* A callback could be in progress already; they hold a read lock
+         * on ksnd_global_lock (to serialise with me) and NOOP if
+         * sk_user_data is NULL. */
+        conn->ksnc_sock->sk->sk_user_data = NULL;
+
+        conn->ksnc_scheduler->kss_nconns--;
+
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        /* The socket is closed on the final put; either here, or in
+         * ksocknal_{send,recv}msg().  Since we set up the linger2 option
+         * when the connection was established, this will close the socket
+         * immediately, aborting anything buffered in it. Any hung
+         * zero-copy transmits will therefore complete in finite time. */
+        ksocknal_putconnsock (conn);
+}
 
-                conn = list_entry (tmp, ksock_conn_t, ksnc_list);
+void
+ksocknal_destroy_conn (ksock_conn_t *conn)
+{
+        /* Final coup-de-grace of the reaper */
+        CDEBUG (D_NET, "connection %p\n", conn);
 
-                if (conn->ksnc_peernid == nid) {
-                        list_del (&conn->ksnc_list);
-                        list_add (&conn->ksnc_list, &death_row);
-                        break;
-                }
+        LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
+        LASSERT (conn->ksnc_route == NULL);
+        LASSERT (!conn->ksnc_tx_scheduled);
+        LASSERT (!conn->ksnc_rx_scheduled);
+#if SOCKNAL_ZC
+        LASSERT (list_empty (&conn->ksnc_tx_pending));
+#endif
+        /* complete queued packets */
+        while (!list_empty (&conn->ksnc_tx_queue)) {
+                ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next,
+                                             ksock_tx_t, tx_list);
+
+                CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
+                        NTOH__u32 (tx->tx_hdr->type),
+                        NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
+                        NTOH__u64 (tx->tx_hdr->src_nid),
+                        NTOH__u64 (tx->tx_hdr->dest_nid));
+
+                list_del (&tx->tx_list);
+                ksocknal_tx_done (tx, 0);
+        }
+
+        /* complete current receive if any */
+        switch (conn->ksnc_rx_state) {
+        case SOCKNAL_RX_BODY:
+                lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
+                break;
+        case SOCKNAL_RX_BODY_FWD:
+                ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
+                break;
+        case SOCKNAL_RX_HEADER:
+        case SOCKNAL_RX_SLOP:
+                break;
+        default:
+                LBUG ();
+                break;
         }
 
-        write_unlock_irqrestore (&ksocknal_data.ksnd_socklist_lock, flags);
+        ksocknal_put_peer (conn->ksnc_peer);
 
-        if (nid && list_empty (&death_row))
-                return (-ENOENT);
+        PORTAL_FREE (conn, sizeof (*conn));
+        atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
+}
 
-        while (!list_empty (&death_row)) {
-                conn = list_entry (death_row.next, ksock_conn_t, ksnc_list);
-                list_del (&conn->ksnc_list);
+void
+ksocknal_put_conn (ksock_conn_t *conn)
+{
+        unsigned long flags;
 
-                /* NB I _have_ to restore the callback, rather than storing
-                 * a noop, since the socket could survive past this module
-                 * being unloaded!! */
-                conn->ksnc_sock->sk->data_ready = conn->ksnc_saved_data_ready;
-                conn->ksnc_sock->sk->write_space = conn->ksnc_saved_write_space;
+        CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
+                conn, conn->ksnc_peer->ksnp_nid,
+                atomic_read (&conn->ksnc_refcount));
 
-                /* OK; no more callbacks, but they could be in progress now,
-                 * so wait for them to complete... */
-                write_lock_irqsave (&ksocknal_data.ksnd_socklist_lock, flags);
+        LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
+        if (!atomic_dec_and_test (&conn->ksnc_refcount))
+                return;
 
-                /* ...however if I get the lock before a callback gets it,
-                 * this will make them noop
-                 */
-                conn->ksnc_sock->sk->user_data = NULL;
+        spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
 
-                /* And drop the scheduler's connection count while I've got
-                 * the exclusive lock */
-                conn->ksnc_scheduler->kss_nconns--;
+        list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
+        if (waitqueue_active (&ksocknal_data.ksnd_reaper_waitq))
+                wake_up (&ksocknal_data.ksnd_reaper_waitq);
 
-                write_unlock_irqrestore(&ksocknal_data.ksnd_socklist_lock,
-                                        flags);
+        spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
+}
 
-                ksocknal_put_conn (conn);       /* drop ref for ksnd_socklist */
+int
+ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr)
+{
+        unsigned long       flags;
+        ksock_conn_t       *conn;
+        struct list_head   *ctmp;
+        struct list_head   *cnxt;
+        ksock_peer_t       *peer;
+        struct list_head   *ptmp;
+        struct list_head   *pnxt;
+        int                 lo;
+        int                 hi;
+        int                 i;
+        int                 rc = -ENOENT;
+
+        write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
+
+        if (nid != PTL_NID_ANY)
+                lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
+        else {
+                lo = 0;
+                hi = ksocknal_data.ksnd_peer_hash_size - 1;
         }
 
-        return (0);
+        for (i = lo; i <= hi; i++) {
+                list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
+
+                        peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
+
+                        if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
+                                continue;
+
+                        list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
+
+                                conn = list_entry (ctmp, ksock_conn_t,
+                                                   ksnc_list);
+
+                                if (!(ipaddr == 0 ||
+                                      conn->ksnc_ipaddr == ipaddr))
+                                        continue;
+
+                                rc = 0;
+                                ksocknal_close_conn_locked (conn);
+                        }
+                }
+        }
+
+        write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
+
+        return (rc);
 }
 
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
@@ -370,13 +1036,20 @@ struct tcp_opt *sock2tcp_opt(struct sock *sk)
 void
 ksocknal_push_conn (ksock_conn_t *conn)
 {
-        struct sock    *sk = conn->ksnc_sock->sk;
-        struct tcp_opt *tp = sock2tcp_opt(sk);
+        struct sock    *sk;
+        struct tcp_opt *tp;
         int             nonagle;
         int             val = 1;
         int             rc;
         mm_segment_t    oldmm;
 
+        rc = ksocknal_getconnsock (conn);
+        if (rc != 0)                            /* being shut down */
+                return;
+        
+        sk = conn->ksnc_sock->sk;
+        tp = sock2tcp_opt(sk);
+        
         lock_sock (sk);
         nonagle = tp->nonagle;
         tp->nonagle = 1;
@@ -385,8 +1058,8 @@ ksocknal_push_conn (ksock_conn_t *conn)
         oldmm = get_fs ();
         set_fs (KERNEL_DS);
 
-        rc = sk->prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
-                                   (char *)&val, sizeof (val));
+        rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
+                                      (char *)&val, sizeof (val));
         LASSERT (rc == 0);
 
         set_fs (oldmm);
@@ -394,47 +1067,33 @@ ksocknal_push_conn (ksock_conn_t *conn)
         lock_sock (sk);
         tp->nonagle = nonagle;
         release_sock (sk);
+
+        ksocknal_putconnsock (conn);
 }
 
-/* Passing in a zero nid pushes all connections */
-int
-ksocknal_push_sock (ptl_nid_t nid)
+void
+ksocknal_push_peer (ksock_peer_t *peer)
 {
-        ksock_conn_t      *conn;
-        struct list_head  *tmp;
-        int                index;
-        int                i;
-
-        if (nid != 0) {
-                conn = ksocknal_get_conn (nid);
-
-                if (conn == NULL)
-                        return (-ENOENT);
-
-                ksocknal_push_conn (conn);
-                ksocknal_put_conn (conn);
-
-                return (0);
-        }
+        int               index;
+        int               i;
+        struct list_head *tmp;
+        ksock_conn_t     *conn;
 
-        /* NB we can't remove connections from the socket list so we have to
-         * cope with them being removed from under us...
-         */
         for (index = 0; ; index++) {
-                read_lock (&ksocknal_data.ksnd_socklist_lock);
+                read_lock (&ksocknal_data.ksnd_global_lock);
 
                 i = 0;
                 conn = NULL;
 
-                list_for_each (tmp, &ksocknal_data.ksnd_socklist) {
+                list_for_each (tmp, &peer->ksnp_conns) {
                         if (i++ == index) {
-                                conn = list_entry(tmp, ksock_conn_t, ksnc_list);
-                                atomic_inc (&conn->ksnc_refcount); // take a ref
+                                conn = list_entry (tmp, ksock_conn_t, ksnc_list);
+                                atomic_inc (&conn->ksnc_refcount);
                                 break;
                         }
                 }
 
-                read_unlock (&ksocknal_data.ksnd_socklist_lock);
+                read_unlock (&ksocknal_data.ksnd_global_lock);
 
                 if (conn == NULL)
                         break;
@@ -442,85 +1101,57 @@ ksocknal_push_sock (ptl_nid_t nid)
                 ksocknal_push_conn (conn);
                 ksocknal_put_conn (conn);
         }
-
-        return (0);
 }
 
-ksock_conn_t *
-ksocknal_get_conn (ptl_nid_t nid)
+int
+ksocknal_push (ptl_nid_t nid)
 {
-        struct list_head *tmp;
-        ksock_conn_t     *conn;
-
-        PROF_START(conn_list_walk);
-
-        read_lock (&ksocknal_data.ksnd_socklist_lock);
-
-        list_for_each(tmp, &ksocknal_data.ksnd_socklist) {
-
-                conn = list_entry(tmp, ksock_conn_t, ksnc_list);
-
-                if (conn->ksnc_peernid == nid) {
-                        /* caller is referencing */
-                        atomic_inc (&conn->ksnc_refcount);
-
-                        read_unlock (&ksocknal_data.ksnd_socklist_lock);
+        ksock_peer_t      *peer;
+        struct list_head  *tmp;
+        int                index;
+        int                i;
+        int                j;
+        int                rc = -ENOENT;
 
-                        CDEBUG(D_NET, "got conn [%p] -> "LPX64" (%d)\n",
-                               conn, nid, atomic_read (&conn->ksnc_refcount));
+        if (nid != PTL_NID_ANY) {
+                peer = ksocknal_get_peer (nid);
 
-                        PROF_FINISH(conn_list_walk);
-                        return (conn);
+                if (peer != NULL) {
+                        rc = 0;
+                        ksocknal_push_peer (peer);
+                        ksocknal_put_peer (peer);
                 }
+                return (rc);
         }
 
-        read_unlock (&ksocknal_data.ksnd_socklist_lock);
+        for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
+                for (j = 0; ; j++) {
+                        read_lock (&ksocknal_data.ksnd_global_lock);
 
-        CDEBUG(D_NET, "No connection found when looking for nid "LPX64"\n",
-               nid);
-        PROF_FINISH(conn_list_walk);
-        return (NULL);
-}
+                        index = 0;
+                        peer = NULL;
 
-void
-ksocknal_close_conn (ksock_conn_t *conn)
-{
-        CDEBUG (D_NET, "connection [%p] closed \n", conn);
-
-        fput (conn->ksnc_file);
-        PORTAL_FREE (conn, sizeof (*conn));
-
-        /* One less connection keeping us hanging on */
-        PORTAL_MODULE_UNUSE;
-}
-
-void
-_ksocknal_put_conn (ksock_conn_t *conn)
-{
-        unsigned long flags;
-
-        CDEBUG (D_NET, "connection [%p] handed the black spot\n", conn);
+                        list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
+                                if (index++ == j) {
+                                        peer = list_entry(tmp, ksock_peer_t,
+                                                          ksnp_list);
+                                        atomic_inc (&peer->ksnp_refcount);
+                                        break;
+                                }
+                        }
 
-        /* "But what is the black spot, captain?" I asked.
-         * "That's a summons, mate..." */
+                        read_unlock (&ksocknal_data.ksnd_global_lock);
 
-        LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
-        LASSERT (conn->ksnc_sock->sk->data_ready != ksocknal_data_ready);
-        LASSERT (conn->ksnc_sock->sk->write_space != ksocknal_write_space);
-        LASSERT (conn->ksnc_sock->sk->user_data == NULL);
-        LASSERT (!conn->ksnc_rx_scheduled);
+                        if (peer != NULL) {
+                                rc = 0;
+                                ksocknal_push_peer (peer);
+                                ksocknal_put_peer (peer);
+                        }
+                }
 
-        if (!in_interrupt()) {
-                ksocknal_close_conn (conn);
-                return;
         }
 
-        spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
-
-        list_add (&conn->ksnc_list, &ksocknal_data.ksnd_reaper_list);
-        wake_up (&ksocknal_data.ksnd_reaper_waitq);
-
-        spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
+        return (rc);
 }
 
 int
@@ -531,13 +1162,68 @@ ksocknal_cmd(struct portal_ioctl_data * data, void * private)
         LASSERT (data != NULL);
 
         switch(data->ioc_nal_cmd) {
+        case NAL_CMD_GET_AUTOCONN: {
+                ksock_route_t *route = ksocknal_get_route_by_idx (data->ioc_count);
+
+                if (route == NULL)
+                        rc = -ENOENT;
+                else {
+                        rc = 0;
+                        data->ioc_nid   = route->ksnr_peer->ksnp_nid;
+                        data->ioc_id    = route->ksnr_ipaddr;
+                        data->ioc_misc  = route->ksnr_port;
+                        data->ioc_count = route->ksnr_generation;
+                        data->ioc_size  = route->ksnr_buffer_size;
+                        data->ioc_wait  = route->ksnr_sharecount;
+                        data->ioc_flags = (route->ksnr_nonagel      ? 1 : 0) |
+                                          (route->ksnr_xchange_nids ? 2 : 0) |
+                                          (route->ksnr_irq_affinity ? 4 : 0);
+                        ksocknal_put_route (route);
+                }
+                break;
+        }
+        case NAL_CMD_ADD_AUTOCONN: {
+                rc = ksocknal_add_route (data->ioc_nid, data->ioc_id,
+                                         data->ioc_misc, data->ioc_size,
+                                         (data->ioc_flags & 1) != 0,
+                                         (data->ioc_flags & 2) != 0,
+                                         (data->ioc_flags & 4) != 0,
+                                         (data->ioc_flags & 8) != 0);
+                break;
+        }
+        case NAL_CMD_DEL_AUTOCONN: {
+                rc = ksocknal_del_route (data->ioc_nid, data->ioc_id, 
+                                         (data->ioc_flags & 1) != 0,
+                                         (data->ioc_flags & 2) != 0);
+                break;
+        }
+        case NAL_CMD_GET_CONN: {
+                ksock_conn_t *conn = ksocknal_get_conn_by_idx (data->ioc_count);
+
+                if (conn == NULL)
+                        rc = -ENOENT;
+                else {
+                        rc = 0;
+                        data->ioc_nid  = conn->ksnc_peer->ksnp_nid;
+                        data->ioc_id   = conn->ksnc_ipaddr;
+                        data->ioc_misc = conn->ksnc_port;
+                        ksocknal_put_conn (conn);
+                }
+                break;
+        }
         case NAL_CMD_REGISTER_PEER_FD: {
-                rc = ksocknal_add_sock(data->ioc_nid, data->ioc_fd,
-                                       data->ioc_flags);
+                struct socket *sock = sockfd_lookup (data->ioc_fd, &rc);
+
+                if (sock != NULL) {
+                        rc = ksocknal_create_conn (data->ioc_nid, NULL,
+                                                   sock, data->ioc_flags);
+                        if (rc != 0)
+                                fput (sock->file);
+                }
                 break;
         }
         case NAL_CMD_CLOSE_CONNECTION: {
-                rc = ksocknal_close_sock(data->ioc_nid);
+                rc = ksocknal_close_conn (data->ioc_nid, data->ioc_id);
                 break;
         }
         case NAL_CMD_REGISTER_MYNID: {
@@ -545,7 +1231,7 @@ ksocknal_cmd(struct portal_ioctl_data * data, void * private)
                 break;
         }
         case NAL_CMD_PUSH_CONNECTION: {
-                rc = ksocknal_push_sock (data->ioc_nid);
+                rc = ksocknal_push (data->ioc_nid);
                 break;
         }
         }
@@ -573,6 +1259,7 @@ ksocknal_free_buffers (void)
                                                      SOCKNAL_LARGE_FWD_NMSGS));
         }
 
+        LASSERT (ksocknal_data.ksnd_active_ltxs == 0);
         if (ksocknal_data.ksnd_ltxs != NULL)
                 PORTAL_FREE (ksocknal_data.ksnd_ltxs,
                              sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS +
@@ -581,9 +1268,13 @@ ksocknal_free_buffers (void)
         if (ksocknal_data.ksnd_schedulers != NULL)
                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
                              sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
+
+        PORTAL_FREE (ksocknal_data.ksnd_peers,
+                     sizeof (struct list_head) * 
+                     ksocknal_data.ksnd_peer_hash_size);
 }
 
-void __exit
+void /*__exit*/
 ksocknal_module_fini (void)
 {
         int   i;
@@ -606,10 +1297,15 @@ ksocknal_module_fini (void)
                 /* fall through */
 
         case SOCKNAL_INIT_DATA:
-                /* Module refcount only gets to zero when all connections
+                /* Module refcount only gets to zero when all peers
                  * have been closed so all lists must be empty */
-                LASSERT (list_empty (&ksocknal_data.ksnd_socklist));
-                LASSERT (list_empty (&ksocknal_data.ksnd_reaper_list));
+                LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
+                LASSERT (ksocknal_data.ksnd_peers != NULL);
+                for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
+                        LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
+                }
+                LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
+                LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
 
@@ -628,6 +1324,7 @@ ksocknal_module_fini (void)
 
                 /* flag threads to terminate; wake and wait for them to die */
                 ksocknal_data.ksnd_shuttingdown = 1;
+                wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
                 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
 
                 for (i = 0; i < SOCKNAL_N_SCHED; i++)
@@ -682,8 +1379,16 @@ ksocknal_module_init (void)
 
         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
 
-        INIT_LIST_HEAD(&ksocknal_data.ksnd_socklist);
-        rwlock_init(&ksocknal_data.ksnd_socklist_lock);
+        ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
+        PORTAL_ALLOC (ksocknal_data.ksnd_peers,
+                      sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
+        if (ksocknal_data.ksnd_peers == NULL)
+                RETURN (-ENOMEM);
+
+        for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
+                INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
+
+        rwlock_init(&ksocknal_data.ksnd_global_lock);
 
         ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
         spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
@@ -702,19 +1407,26 @@ ksocknal_module_init (void)
         init_waitqueue_head(&ksocknal_data.ksnd_idle_ltx_waitq);
 
         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
-        INIT_LIST_HEAD (&ksocknal_data.ksnd_reaper_list);
+        INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
+        INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
 
-        memset (&ksocknal_data.ksnd_irq_info, SOCKNAL_IRQ_UNASSIGNED,
-                sizeof (ksocknal_data.ksnd_irq_info));
+        spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
+        INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
+        init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
+
+        /* NB memset above zeros whole of ksocknal_data, including
+         * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
 
         /* flag lists/ptrs/locks initialised */
         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
 
         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
                      sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
-        if (ksocknal_data.ksnd_schedulers == NULL)
+        if (ksocknal_data.ksnd_schedulers == NULL) {
+                ksocknal_module_fini ();
                 RETURN(-ENOMEM);
+        }
 
         for (i = 0; i < SOCKNAL_N_SCHED; i++) {
                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
@@ -728,7 +1440,7 @@ ksocknal_module_init (void)
                 init_waitqueue_head (&kss->kss_waitq);
         }
 
-        CERROR ("ltx "LPSZ", total "LPSZ"\n", sizeof (ksock_ltx_t),
+        CDEBUG (D_MALLOC, "ltx "LPSZ", total "LPSZ"\n", sizeof (ksock_ltx_t),
                 sizeof (ksock_ltx_t) * (SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS));
 
         PORTAL_ALLOC(ksocknal_data.ksnd_ltxs,
@@ -745,6 +1457,7 @@ ksocknal_module_init (void)
         for (i = 0; i < SOCKNAL_NLTXS + SOCKNAL_NNBLK_LTXS; i++) {
                 ksock_ltx_t *ltx = &((ksock_ltx_t *)ksocknal_data.ksnd_ltxs)[i];
 
+                ltx->ltx_tx.tx_hdr = &ltx->ltx_hdr;
                 ltx->ltx_idle = i < SOCKNAL_NLTXS ?
                                 &ksocknal_data.ksnd_idle_ltx_list :
                                 &ksocknal_data.ksnd_idle_nblk_ltx_list;
@@ -772,9 +1485,18 @@ ksocknal_module_init (void)
                 }
         }
 
+        for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
+                rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
+                if (rc != 0) {
+                        CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
+                        ksocknal_module_fini ();
+                        RETURN (rc);
+                }
+        }
+
         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
         if (rc != 0) {
-                CERROR("Can't spawn socknal reaper: %d\n", rc);
+                CERROR ("Can't spawn socknal reaper: %d\n", rc);
                 ksocknal_module_fini ();
                 RETURN (rc);
         }
index 86cdeb0..69daa02 100644 (file)
@@ -50,6 +50,7 @@
 #include <linux/kmod.h>
 #include <asm/uaccess.h>
 #include <asm/segment.h>
+#include <asm/div64.h>
 
 #define DEBUG_SUBSYSTEM S_SOCKNAL
 
 #include <portals/p30.h>
 #include <portals/lib-p30.h>
 
-#define SOCKNAL_N_SCHED num_online_cpus()       /* # socknal schedulers */
+#if CONFIG_SMP
+# define SOCKNAL_N_SCHED        smp_num_cpus    /* # socknal schedulers */
+#else
+# define SOCKNAL_N_SCHED        1               /* # socknal schedulers */
+#endif
+#define SOCKNAL_N_AUTOCONNECTD  4               /* # socknal autoconnect daemons */
+
+#define SOCKNAL_MIN_RECONNECT_INTERVAL HZ      /* first failed connection retry... */
+#define SOCKNAL_MAX_RECONNECT_INTERVAL (60*HZ) /* ...exponentially increasing to this */
+
+#define SOCKNAL_IO_TIMEOUT      (60*HZ)         /* default comms timeout */
+
+#define SOCKNAL_PEER_HASH_SIZE   101            /* # peer lists */
 
 #if PTL_LARGE_MTU
 # define SOCKNAL_MAX_FWD_PAYLOAD (256<<10)      /* biggest payload I can forward */
@@ -65,6 +78,8 @@
 # define SOCKNAL_MAX_FWD_PAYLOAD (64<<10)       /* biggest payload I can forward */
 #endif
 
+#define SOCKNAL_ZC_MIN_FRAG    (2<<10)          /* default smallest zerocopy fragment */
+
 #define SOCKNAL_NLTXS           128             /* # normal transmit messages */
 #define SOCKNAL_NNBLK_LTXS     128             /* # transmit messages reserved if can't block */
 
 
 #define SOCKNAL_RESCHED         100             /* # scheduler loops before reschedule */
 
-#define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sndbuf*8)/10)
+#define SOCKNAL_TX_LOW_WATER(sk) (((sk)->sk_sndbuf*8)/10)
+
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+# define jiffies_64  jiffies
+#endif
+
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,72))
+# define sk_data_ready data_ready
+# define sk_write_space write_space
+# define sk_user_data   user_data
+# define sk_prot        prot
+# define sk_sndbuf      sndbuf
+# define sk_socket      socket
+#endif
 
 typedef struct                                  /* pool of forwarding buffers */
 {
@@ -101,10 +129,17 @@ typedef struct                                  /* per scheduler state */
 } ksock_sched_t;
 
 typedef struct {
+        int               ksni_valid:1;         /* been set yet? */
+        int               ksni_bound:1;         /* bound to a cpu yet? */
+        int               ksni_sched:6;         /* which scheduler (assumes < 64) */
+} ksock_irqinfo_t;
+
+typedef struct {
         int               ksnd_init;            /* initialisation state */
         
-        struct list_head  ksnd_socklist;        /* all my connections */
-        rwlock_t          ksnd_socklist_lock;   /* stabilise add/find/remove */
+        rwlock_t          ksnd_global_lock;     /* stabilize peer/conn ops */
+        struct list_head *ksnd_peers;           /* hash table of all my known peers */
+        int               ksnd_peer_hash_size;  /* size of ksnd_peers */
 
         nal_cb_t         *ksnd_nal_cb;
         spinlock_t        ksnd_nal_cb_lock;     /* lib cli/sti lock */
@@ -112,7 +147,10 @@ typedef struct {
         atomic_t          ksnd_nthreads;        /* # live threads */
         int               ksnd_shuttingdown;    /* tell threads to exit */
         ksock_sched_t    *ksnd_schedulers;      /* scheduler state */
-        
+
+        atomic_t          ksnd_npeers;          /* total # peers extant */
+        atomic_t          ksnd_nclosing_conns;  /* # closed conns extant */
+
         kpr_router_t      ksnd_router;          /* THE router */
 
         void             *ksnd_fmbs;            /* all the pre-allocated FMBs */
@@ -124,11 +162,21 @@ typedef struct {
         struct list_head  ksnd_idle_ltx_list;   /* where to get an idle LTX */
         struct list_head  ksnd_idle_nblk_ltx_list; /* where to get an idle LTX if you can't block */
         wait_queue_head_t ksnd_idle_ltx_waitq;  /* where to block for an idle LTX */
+        int               ksnd_active_ltxs;     /* #active ltxs */
 
-        struct list_head  ksnd_reaper_list;     /* conn waiting to be reaped */
-        wait_queue_head_t ksnd_reaper_waitq;    /* reaper sleeps here */
+        struct list_head  ksnd_deathrow_conns;  /* conns to be closed */
+        struct list_head  ksnd_zombie_conns;    /* conns to be freed */
+        wait_queue_head_t ksnd_reaper_waitq;    /* reaper sleep here */
         spinlock_t        ksnd_reaper_lock;     /* serialise */
-        unsigned char     ksnd_irq_info[NR_IRQS]; /* irq->scheduler lookup */
+
+        int               ksnd_stall_tx;        /* test sluggish sender */
+        int               ksnd_stall_rx;        /* test sluggish receiver */
+
+        struct list_head  ksnd_autoconnectd_routes; /* routes waiting to be connected */
+        wait_queue_head_t ksnd_autoconnectd_waitq; /* autoconnectds sleep here */
+        spinlock_t        ksnd_autoconnectd_lock; /* serialise */
+
+        ksock_irqinfo_t   ksnd_irqinfo[NR_IRQS];/* irq->scheduler lookup */
 } ksock_nal_data_t;
 
 #define SOCKNAL_INIT_NOTHING    0
@@ -136,10 +184,6 @@ typedef struct {
 #define SOCKNAL_INIT_PTL        2
 #define SOCKNAL_INIT_ALL        3
 
-#define SOCKNAL_IRQ_BOUND       0x80            /* flag we _did_ bind already */
-#define SOCKNAL_IRQ_SCHED_MASK 0x7f            /* we assume < 127 CPUs */
-#define SOCKNAL_IRQ_UNASSIGNED  0xff            /* flag unassigned */
-
 /* A packet just assembled for transmission is represented by 1 or more
  * struct iovec fragments and 0 or more ptl_kiov_t fragments.  Forwarded
  * messages, or messages from an MD with PTL_MD_KIOV _not_ set have 0
@@ -154,17 +198,24 @@ typedef struct {
  * Otherwise up to PTL_MD_MAX_IOV ptl_kiov_t fragments are used.
  */
 
+struct ksock_conn;                              /* forward ref */
+struct ksock_peer;                              /* forward ref */
+struct ksock_route;                             /* forward ref */
+
 typedef struct                                  /* transmit packet */
 {
         struct list_head        tx_list;        /* queue on conn for transmission etc */
+        __u64                   tx_deadline;    /* when (in jiffies) tx times out */
         char                    tx_isfwd;       /* forwarding / sourced here */
         int                     tx_nob;         /* # packet bytes */
+        int                     tx_resid;       /* residual bytes */
         int                     tx_niov;        /* # packet iovec frags */
         struct iovec           *tx_iov;         /* packet iovec frags */
         int                     tx_nkiov;       /* # packet page frags */
         ptl_kiov_t             *tx_kiov;        /* packet page frags */
+        struct ksock_conn      *tx_conn;        /* owning conn */
+        ptl_hdr_t              *tx_hdr;         /* packet header (for debug only) */
 #if SOCKNAL_ZC        
-        ksock_sched_t          *tx_sched;       /* who to wake on callback */
         zccd_t                  tx_zccd;        /* zero copy callback descriptor */
 #endif
 } ksock_tx_t;
@@ -200,8 +251,7 @@ typedef struct                                  /* locally transmitted packet */
 /* local packets (lib->socknal) embedded in ksock_ltx_t::ltx_tx */
 
 /* NB list_entry() is used here as convenient macro for calculating a
- * pointer to a struct from the address of a member.
- */
+ * pointer to a struct from the address of a member. */
 
 typedef struct                                  /* Kernel portals Socket Forwarding message buffer */
 {                                               /* (socknal->router) */
@@ -209,6 +259,7 @@ typedef struct                                  /* Kernel portals Socket Forward
         kpr_fwd_desc_t          fmb_fwd;        /* router's descriptor */
         int                     fmb_npages;     /* # pages allocated */
         ksock_fmb_pool_t       *fmb_pool;       /* owning pool */
+        struct ksock_peer      *fmb_peer;       /* peer received from */
         struct page            *fmb_pages[SOCKNAL_LARGE_FWD_PAGES];
         struct iovec            fmb_iov[SOCKNAL_LARGE_FWD_PAGES];
 } ksock_fmb_t;
@@ -227,20 +278,24 @@ typedef union {
 #define SOCKNAL_RX_GET_FMB      5               /* scheduled for forwarding */
 #define SOCKNAL_RX_FMB_SLEEP    6               /* blocked waiting for a fwd desc */
 
-typedef struct 
+typedef struct ksock_conn
 { 
-        struct list_head    ksnc_list;          /* stash on global socket list */
-        struct file        *ksnc_file;          /* socket filp */
+        struct ksock_peer  *ksnc_peer;          /* owning peer */
+        struct ksock_route *ksnc_route;         /* owning route */
+        struct list_head    ksnc_list;          /* stash on peer's conn list */
         struct socket      *ksnc_sock;          /* actual socket */
         void               *ksnc_saved_data_ready; /* socket's original data_ready() callback */
         void               *ksnc_saved_write_space; /* socket's original write_space() callback */
-        ptl_nid_t           ksnc_peernid;       /* who's on the other end */
         atomic_t            ksnc_refcount;      /* # users */
         ksock_sched_t     *ksnc_scheduler;     /* who schedules this connection */
-        
+        __u32               ksnc_ipaddr;        /* peer's IP */
+        int                 ksnc_port;          /* peer's port */
+        int                 ksnc_closing;       /* being shut down */
+
         /* READER */
         struct list_head    ksnc_rx_list;       /* where I enq waiting input or a forwarding descriptor */
-        volatile int        ksnc_rx_ready;      /* data ready to read */
+        __u64               ksnc_rx_deadline;   /* when receive times out */
+        int                 ksnc_rx_ready;      /* data ready to read */
         int                 ksnc_rx_scheduled;  /* being progressed */
         int                 ksnc_rx_state;      /* what is being read */
         int                 ksnc_rx_nob_left;   /* # bytes to next hdr/body  */
@@ -256,37 +311,104 @@ typedef struct
         /* WRITER */
         struct list_head    ksnc_tx_list;       /* where I enq waiting for output space */
         struct list_head    ksnc_tx_queue;      /* packets waiting to be sent */
-        volatile int        ksnc_tx_ready;      /* write space */
+#if SOCKNAL_ZC
+        struct list_head    ksnc_tx_pending;    /* zc packets pending callback */
+#endif
+        atomic_t            ksnc_tx_nob;        /* # bytes queued */
+        int                 ksnc_tx_ready;      /* write space */
         int                 ksnc_tx_scheduled;  /* being progressed */
-
 } ksock_conn_t;
 
-extern int ksocknal_add_sock (ptl_nid_t nid, int fd, int client);
-extern int ksocknal_close_sock(ptl_nid_t nid);
-extern int ksocknal_set_mynid(ptl_nid_t nid);
-extern int ksocknal_push_sock(ptl_nid_t nid);
-extern ksock_conn_t *ksocknal_get_conn (ptl_nid_t nid);
-extern void _ksocknal_put_conn (ksock_conn_t *conn);
-extern void ksocknal_close_conn (ksock_conn_t *conn);
+typedef struct ksock_route
+{
+        struct list_head    ksnr_list;          /* chain on peer route list */
+        struct list_head    ksnr_connect_list;  /* chain on autoconnect list */
+        struct ksock_peer  *ksnr_peer;          /* owning peer */
+        atomic_t            ksnr_refcount;      /* # users */
+        int                 ksnr_sharecount;    /* lconf usage counter */
+        __u64               ksnr_timeout;       /* when reconnection can happen next */
+        unsigned int        ksnr_retry_interval; /* how long between retries */
+        __u32               ksnr_ipaddr;        /* an IP address for this peer */
+        int                 ksnr_port;          /* port to connect to */
+        int                 ksnr_buffer_size;   /* size of socket buffers */
+        unsigned int        ksnr_irq_affinity:1; /* set affinity? */
+        unsigned int        ksnr_xchange_nids:1; /* do hello protocol? */
+        unsigned int        ksnr_nonagel:1;     /* disable nagle? */
+        unsigned int        ksnr_connecting;    /* autoconnect in progress? */
+        unsigned int        ksnr_deleted;       /* been removed from peer? */
+        int                 ksnr_generation;    /* connection incarnation # */
+        ksock_conn_t       *ksnr_conn;          /* NULL/active connection */
+} ksock_route_t;
+
+typedef struct ksock_peer
+{
+        struct list_head    ksnp_list;          /* stash on global peer list */
+        ptl_nid_t           ksnp_nid;           /* who's on the other end(s) */
+        atomic_t            ksnp_refcount;      /* # users */
+        int                 ksnp_closing;       /* being closed */
+        struct list_head    ksnp_conns;         /* all active connections */
+        struct list_head    ksnp_routes;        /* routes */
+        struct list_head    ksnp_tx_queue;      /* waiting packets */
+} ksock_peer_t;
 
-static inline void
-ksocknal_put_conn (ksock_conn_t *conn)
+
+
+extern nal_cb_t         ksocknal_lib;
+extern ksock_nal_data_t ksocknal_data;
+
+static inline struct list_head *
+ksocknal_nid2peerlist (ptl_nid_t nid) 
 {
-        CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
-                conn, conn->ksnc_peernid, atomic_read (&conn->ksnc_refcount));
+        unsigned int hash = ((unsigned int)nid) % ksocknal_data.ksnd_peer_hash_size;
+        
+        return (&ksocknal_data.ksnd_peers [hash]);
+}
 
-        if (atomic_dec_and_test (&conn->ksnc_refcount))
-                _ksocknal_put_conn (conn);
+static inline int
+ksocknal_getconnsock (ksock_conn_t *conn) 
+{
+        int   rc = -ESHUTDOWN;
+        
+        read_lock (&ksocknal_data.ksnd_global_lock);
+        if (!conn->ksnc_closing) {
+                rc = 0;
+                get_file (conn->ksnc_sock->file);
+        }
+        read_unlock (&ksocknal_data.ksnd_global_lock);
+
+        return (rc);
 }
 
+static inline void
+ksocknal_putconnsock (ksock_conn_t *conn)
+{
+        fput (conn->ksnc_sock->file);
+}
+
+extern void ksocknal_put_route (ksock_route_t *route);
+extern void ksocknal_put_peer (ksock_peer_t *peer);
+extern ksock_peer_t *ksocknal_find_peer_locked (ptl_nid_t nid);
+extern ksock_peer_t *ksocknal_get_peer (ptl_nid_t nid);
+extern int ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr,
+                               int single, int keep_conn);
+extern int ksocknal_create_conn (ptl_nid_t nid, ksock_route_t *route,
+                                 struct socket *sock, int bind_irq);
+extern void ksocknal_close_conn_locked (ksock_conn_t *conn);
+extern int ksocknal_close_conn_unlocked (ksock_conn_t *conn);
+extern void ksocknal_terminate_conn (ksock_conn_t *conn);
+extern void ksocknal_destroy_conn (ksock_conn_t *conn);
+extern void ksocknal_put_conn (ksock_conn_t *conn);
+extern int ksocknal_close_conn (ptl_nid_t nid, __u32 ipaddr);
+
+extern void ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn);
+extern void ksocknal_tx_done (ksock_tx_t *tx, int asynch);
+extern void ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd);
+extern void ksocknal_fmb_callback (void *arg, int error);
 extern int ksocknal_thread_start (int (*fn)(void *arg), void *arg);
 extern int ksocknal_new_packet (ksock_conn_t *conn, int skip);
-extern void ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd);
 extern int ksocknal_scheduler (void *arg);
-extern int ksocknal_reaper (void *arg);
 extern void ksocknal_data_ready(struct sock *sk, int n);
 extern void ksocknal_write_space(struct sock *sk);
-
-
-extern nal_cb_t         ksocknal_lib;
-extern ksock_nal_data_t ksocknal_data;
+extern int ksocknal_autoconnectd (void *arg);
+extern int ksocknal_reaper (void *arg);
+extern int ksocknal_set_linger (struct socket *sock);
index 6147d8a..3341596 100644 (file)
 
 #include "socknal.h"
 
-atomic_t   ksocknal_packets_received;
-atomic_t   ksocknal_packets_launched;
-atomic_t   ksocknal_packets_being_sent;
-
+int        ksocknal_io_timeout = SOCKNAL_IO_TIMEOUT;
 #if SOCKNAL_ZC
 int        ksocknal_do_zc = 1;
-int        ksocknal_zc_min_frag = 2048;
+int        ksocknal_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
 #endif
 
 /*
@@ -127,7 +124,7 @@ ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
 int
 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
 {
-        /* I would guess that if ksocknal_get_conn(nid) == NULL,
+        /* I would guess that if ksocknal_get_peer (nid) == NULL,
            and we're not routing, then 'nid' is very distant :) */
         if ( nal->ni.nid == nid ) {
                 *dist = 0;
@@ -141,7 +138,7 @@ ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
 ksock_ltx_t *
 ksocknal_get_ltx (int may_block)
 {
-        long             flags;
+        unsigned long flags;
         ksock_ltx_t *ltx = NULL;
 
         for (;;) {
@@ -151,6 +148,7 @@ ksocknal_get_ltx (int may_block)
                         ltx = list_entry(ksocknal_data.ksnd_idle_ltx_list.next,
                                          ksock_ltx_t, ltx_tx.tx_list);
                         list_del (&ltx->ltx_tx.tx_list);
+                        ksocknal_data.ksnd_active_ltxs++;
                         break;
                 }
 
@@ -159,6 +157,7 @@ ksocknal_get_ltx (int may_block)
                                 ltx = list_entry(ksocknal_data.ksnd_idle_nblk_ltx_list.next,
                                                  ksock_ltx_t, ltx_tx.tx_list);
                                 list_del (&ltx->ltx_tx.tx_list);
+                                ksocknal_data.ksnd_active_ltxs++;
                         }
                         break;
                 }
@@ -175,6 +174,24 @@ ksocknal_get_ltx (int may_block)
         return (ltx);
 }
 
+void
+ksocknal_put_ltx (ksock_ltx_t *ltx)
+{
+        unsigned long   flags;
+        
+        spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
+
+        ksocknal_data.ksnd_active_ltxs--;
+        list_add_tail (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
+
+        /* normal tx desc => wakeup anyone blocking for one */
+        if (ltx->ltx_idle == &ksocknal_data.ksnd_idle_ltx_list &&
+            waitqueue_active (&ksocknal_data.ksnd_idle_ltx_waitq))
+                wake_up (&ksocknal_data.ksnd_idle_ltx_waitq);
+
+        spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
+}
+
 #if SOCKNAL_ZC
 struct page *
 ksocknal_kvaddr_to_page (unsigned long vaddr)
@@ -202,11 +219,15 @@ ksocknal_kvaddr_to_page (unsigned long vaddr)
 #endif
 
 int
-ksocknal_send_iov (struct socket *sock, ksock_tx_t *tx, int more)
+ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
 {
+        struct socket *sock = conn->ksnc_sock;
         struct iovec  *iov = tx->tx_iov;
         int            fragsize = iov->iov_len;
         unsigned long  vaddr = (unsigned long)iov->iov_base;