Whamcloud - gitweb
* Removed lingering RHness
[fs/lustre-release.git] / lustre / portals / knals / socknal / socknal_cb.c
index 656a0c5..b0b9342 100644 (file)
 
 #include "socknal.h"
 
-int        ksocknal_io_timeout = SOCKNAL_IO_TIMEOUT;
-#if SOCKNAL_ZC
-int        ksocknal_do_zc = 1;
-int        ksocknal_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
-#endif
-
 /*
  *  LIB functions follow
  *
@@ -225,7 +219,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         struct iovec  *iov = tx->tx_iov;
         int            fragsize = iov->iov_len;
         unsigned long  vaddr = (unsigned long)iov->iov_base;
-        int            more = !list_empty (&conn->ksnc_tx_queue) |
+        int            more = (!list_empty (&conn->ksnc_tx_queue)) |
                               (tx->tx_niov > 1) |
                               (tx->tx_nkiov > 1);
 #if SOCKNAL_ZC
@@ -241,10 +235,9 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (tx->tx_niov > 0);
         
 #if SOCKNAL_ZC
-        if (ksocknal_do_zc &&
+        if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
-            zcsize >= ksocknal_zc_min_frag &&
             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
                 
                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
@@ -306,7 +299,7 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
         int            fragsize = kiov->kiov_len;
         struct page   *page = kiov->kiov_page;
         int            offset = kiov->kiov_offset;
-        int            more = !list_empty (&conn->ksnc_tx_queue) |
+        int            more = (!list_empty (&conn->ksnc_tx_queue)) |
                               (tx->tx_nkiov > 1);
         int            rc;
 
@@ -318,10 +311,9 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (tx->tx_nkiov > 0);
 
 #if SOCKNAL_ZC
-        if (ksocknal_do_zc &&
+        if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
-            (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
-            fragsize >= ksocknal_zc_min_frag) {
+            (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
 
                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
                                page, offset, fragsize);
@@ -349,6 +341,7 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
                 set_fs (KERNEL_DS);
                 rc = sock_sendmsg(sock, &msg, fragsize);
                 set_fs (oldmm);
+
                 kunmap (page);
         }
 
@@ -410,6 +403,16 @@ ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
                         break;
                 }
 
+                /* Consider the connection alive since we managed to chuck
+                 * more data into it.  Really, we'd like to consider it
+                 * alive only when the peer ACKs something, but
+                 * write_space() only gets called back while SOCK_NOSPACE
+                 * is set.  Instead, we presume peer death has occurred if
+                 * the socket doesn't drain within a timout */
+                conn->ksnc_tx_deadline = jiffies + 
+                                         ksocknal_data.ksnd_io_timeout * HZ;
+                conn->ksnc_peer->ksnp_last_alive = jiffies;
+
                 if (tx->tx_resid == 0) {        /* sent everything */
                         rc = 0;
                         break;
@@ -420,6 +423,24 @@ ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
         RETURN (rc);
 }
 
+void
+ksocknal_eager_ack (ksock_conn_t *conn)
+{
+        int            opt = 1;
+        mm_segment_t   oldmm = get_fs();
+        struct socket *sock = conn->ksnc_sock;
+        
+        /* Remind the socket to ACK eagerly.  If I don't, the socket might
+         * think I'm about to send something it could piggy-back the ACK
+         * on, introducing delay in completing zero-copy sends in my
+         * peer. */
+
+        set_fs(KERNEL_DS);
+        sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
+                               (char *)&opt, sizeof (opt));
+        set_fs(oldmm);
+}
+
 int
 ksocknal_recv_iov (ksock_conn_t *conn)
 {
@@ -453,6 +474,13 @@ ksocknal_recv_iov (ksock_conn_t *conn)
         if (rc <= 0)
                 return (rc);
 
+        /* received something... */
+        conn->ksnc_peer->ksnp_last_alive = jiffies;
+        conn->ksnc_rx_deadline = jiffies + 
+                                 ksocknal_data.ksnd_io_timeout * HZ;
+        mb();                           /* order with setting rx_started */
+        conn->ksnc_rx_started = 1;
+
         conn->ksnc_rx_nob_wanted -= rc;
         conn->ksnc_rx_nob_left -= rc;
                 
@@ -499,11 +527,19 @@ ksocknal_recv_kiov (ksock_conn_t *conn)
         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
         /* NB this is just a boolean............................^ */
         set_fs (oldmm);
+
         kunmap (page);
         
         if (rc <= 0)
                 return (rc);
         
+        /* received something... */
+        conn->ksnc_peer->ksnp_last_alive = jiffies;
+        conn->ksnc_rx_deadline = jiffies + 
+                                 ksocknal_data.ksnd_io_timeout * HZ;
+        mb();                           /* order with setting rx_started */
+        conn->ksnc_rx_started = 1;
+
         conn->ksnc_rx_nob_wanted -= rc;
         conn->ksnc_rx_nob_left -= rc;
                 
@@ -541,20 +577,33 @@ ksocknal_recvmsg (ksock_conn_t *conn)
                         rc = -ESHUTDOWN;
                         break;
                 }
-                
+
                 if (conn->ksnc_rx_niov != 0)
                         rc = ksocknal_recv_iov (conn);
                 else
                         rc = ksocknal_recv_kiov (conn);
-                
+
                 if (rc <= 0) {
                         /* error/EOF or partial receive */
-                        if (rc == -EAGAIN)
+                        if (rc == -EAGAIN) {
                                 rc = 1;
+                        } else if (rc == 0 && conn->ksnc_rx_started) {
+                                /* EOF in the middle of a message */
+                                rc = -EPROTO;
+                        }
                         break;
                 }
 
+                /* Completed a fragment */
+
                 if (conn->ksnc_rx_nob_wanted == 0) {
+                        /* Completed a message segment (header or payload) */
+                        if (ksocknal_data.ksnd_eager_ack &&
+                            (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
+                             conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
+                                /* Remind the socket to ack eagerly... */
+                                ksocknal_eager_ack(conn);
+                        }
                         rc = 1;
                         break;
                 }
@@ -577,7 +626,6 @@ ksocknal_zc_callback (zccd_t *zcd)
 
         spin_lock_irqsave (&sched->kss_lock, flags);
 
-        list_del (&tx->tx_list); /* remove from kss_zctxpending_list */
         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
         if (waitqueue_active (&sched->kss_waitq))
                 wake_up (&sched->kss_waitq);
@@ -628,23 +676,12 @@ ksocknal_tx_launched (ksock_tx_t *tx)
 {
 #if SOCKNAL_ZC
         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
-                unsigned long  flags;
                 ksock_conn_t  *conn = tx->tx_conn;
-                ksock_sched_t *sched = conn->ksnc_scheduler;
                 
                 /* zccd skbufs are still in-flight.  First take a ref on
                  * conn, so it hangs about for ksocknal_tx_done... */
                 atomic_inc (&conn->ksnc_refcount);
 
-                /* Stash it for timeout...
-                 * NB We have to hold a lock to stash the tx, and we have
-                 * stash it before we zcc_put(), but we have to _not_ hold
-                 * this lock when we zcc_put(), otherwise we could deadlock
-                 * if it turns out to be the last put. Aaaaarrrrggghhh! */
-                spin_lock_irqsave (&sched->kss_lock, flags);
-                list_add_tail (&tx->tx_list, &conn->ksnc_tx_pending);
-                spin_unlock_irqrestore (&sched->kss_lock, flags);
-                
                 /* ...then drop the initial ref on zccd, so the zero copy
                  * callback can occur */
                 zccd_put (&tx->tx_zccd);
@@ -659,10 +696,10 @@ ksocknal_tx_launched (ksock_tx_t *tx)
 void
 ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
 {
-        ksock_conn_t *conn;
-        ksock_tx_t *tx;
-        int         rc;
-
+        ksock_conn_t  *conn;
+        ksock_tx_t    *tx;
+        int            rc;
+        
         LASSERT (!list_empty (&sched->kss_tx_conns));
         conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
         list_del (&conn->ksnc_tx_list);
@@ -686,7 +723,7 @@ ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
 
         if (rc != 0) {
-                if (ksocknal_close_conn_unlocked (conn)) {
+                if (ksocknal_close_conn_unlocked (conn, rc)) {
                         /* I'm the first to close */
                         CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
                                 conn, rc, conn->ksnc_peer->ksnp_nid,
@@ -696,7 +733,6 @@ ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
 
         } else if (tx->tx_resid == 0) {  
-
                 /* everything went; assume more can go, and avoid
                  * write_space locking */
                 conn->ksnc_tx_ready = 1;
@@ -763,7 +799,8 @@ ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
                 return (NULL);
         }
         
-        rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, &target_nid);
+        rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
+                         &target_nid);
         if (rc != 0) {
                 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
                 return (NULL);
@@ -820,8 +857,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 #endif
 
         spin_lock_irqsave (&sched->kss_lock, flags);
-                
-        tx->tx_deadline = jiffies_64 + ksocknal_io_timeout;
+
         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
                 
         if (conn->ksnc_tx_ready &&      /* able to send */
@@ -839,7 +875,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 }
 
 ksock_route_t *
-ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
+ksocknal_find_connectable_route_locked (ksock_peer_t *peer, int eager_only)
 {
         struct list_head  *tmp;
         ksock_route_t     *route;
@@ -849,7 +885,8 @@ ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
                 
                 if (route->ksnr_conn == NULL && /* not connected */
                     !route->ksnr_connecting &&  /* not connecting */
-                    route->ksnr_timeout <= jiffies_64) /* OK to retry */
+                    (!eager_only || route->ksnr_eager) && /* wants to connect */
+                    time_after_eq (jiffies, route->ksnr_timeout)) /* OK to retry */
                         return (route);
         }
         
@@ -880,7 +917,7 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
         ksock_conn_t     *conn;
         ksock_route_t    *route;
         rwlock_t         *g_lock;
-        
+
         /* Ensure the frags we've been given EXACTLY match the number of
          * bytes we want to send.  Many TCP/IP stacks disregard any total
          * size parameters passed to them and just look at the frags. 
@@ -907,17 +944,19 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
                 return (PTL_FAIL);
         }
 
-        /* Any routes need to be connected? (need write lock if so) */
-        if (ksocknal_find_connectable_route_locked (peer) == NULL) {
+        if (ksocknal_find_connectable_route_locked(peer, 1) == NULL) {
                 conn = ksocknal_find_conn_locked (tx, peer);
                 if (conn != NULL) {
+                        /* I've got no unconnected autoconnect routes that
+                         * need to be connected, and I do have an actual
+                         * connection... */
                         ksocknal_queue_tx_locked (tx, conn);
                         read_unlock (g_lock);
                         return (PTL_OK);
                 }
         }
         
-        /* need a write lock now to change peer state... */
+        /* Making one or more connections; I'll need a write lock... */
 
         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
         read_unlock (g_lock);
@@ -930,23 +969,37 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
         }
         ksocknal_put_peer (peer);               /* drop ref I got above */
 
-        /* I may launch autoconnects, now we're write locked... */
-        while ((route = ksocknal_find_connectable_route_locked (peer)) != NULL)
+
+        for (;;) {
+                /* launch all eager autoconnections */
+                route = ksocknal_find_connectable_route_locked (peer, 1);
+                if (route == NULL)
+                        break;
+
                 ksocknal_launch_autoconnect_locked (route);
+        }
 
         conn = ksocknal_find_conn_locked (tx, peer);
         if (conn != NULL) {
+                /* Connection exists; queue message on it */
                 ksocknal_queue_tx_locked (tx, conn);
                 write_unlock_irqrestore (g_lock, flags);
                 return (PTL_OK);
         }
-                
+
         if (ksocknal_find_connecting_route_locked (peer) == NULL) {
-                /* no routes actually connecting now */
-                write_unlock_irqrestore (g_lock, flags);
-                return (PTL_FAIL);
+                /* no autoconnect routes actually connecting now.  Scrape
+                 * the barrel for non-eager autoconnects */
+                route = ksocknal_find_connectable_route_locked (peer, 0);
+                if (route != NULL) {
+                        ksocknal_launch_autoconnect_locked (route);
+                } else {
+                        write_unlock_irqrestore (g_lock, flags);
+                        return (PTL_FAIL);
+                }
         }
 
+        /* At least 1 connection is being established; queue the message... */
         list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
 
         write_unlock_irqrestore (g_lock, flags);
@@ -1127,6 +1180,9 @@ ksocknal_fmb_callback (void *arg, int error)
                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
 
+        /* drop peer ref taken on init */
+        ksocknal_put_peer (fmb->fmb_peer);
+        
         spin_lock_irqsave (&fmp->fmp_lock, flags);
 
         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
@@ -1139,9 +1195,6 @@ ksocknal_fmb_callback (void *arg, int error)
 
         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
 
-        /* drop peer ref taken on init */
-        ksocknal_put_peer (fmb->fmb_peer);
-        
         if (conn == NULL)
                 return;
 
@@ -1279,7 +1332,6 @@ ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
 
         conn->ksnc_cookie = fmb;                /* stash fmb for later */
         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
-        conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout; /* start timeout */
         
         /* payload is desc's iov-ed buffer, but skipping the hdr */
         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
@@ -1323,7 +1375,7 @@ ksocknal_fwd_parse (ksock_conn_t *conn)
                        dest_nid, body_len);
 
                 ksocknal_new_packet (conn, 0);  /* on to new packet */
-                ksocknal_close_conn_unlocked (conn); /* give up on conn */
+                ksocknal_close_conn_unlocked (conn, -EINVAL); /* give up on conn */
                 return;
         }
 
@@ -1373,6 +1425,9 @@ ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
         int   skipped;
 
         if (nob_to_skip == 0) {         /* right at next packet boundary now */
+                conn->ksnc_rx_started = 0;
+                mb ();                          /* racing with timeout thread */
+                
                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
@@ -1464,7 +1519,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
         rc = ksocknal_recvmsg(conn);
 
         if (rc <= 0) {
-                if (ksocknal_close_conn_unlocked (conn)) {
+                if (ksocknal_close_conn_unlocked (conn, rc)) {
                         /* I'm the first to close */
                         if (rc < 0)
                                 CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
@@ -1478,6 +1533,7 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                 goto out;
         }
 
+        
         if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
                 goto out;                       /* try again later */
 
@@ -1506,9 +1562,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                 /* sets wanted_len, iovs etc */
                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
 
-                /* start timeout (lib is waiting for finalize) */
-                conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout;
-
                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
                         goto try_read;          /* go read the payload */
@@ -1517,7 +1570,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
 
         case SOCKNAL_RX_BODY:
                 /* payload all received */
-                conn->ksnc_rx_deadline = 0;     /* cancel timeout */
                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
                 /* Fall through */
 
@@ -1534,9 +1586,6 @@ ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
                         conn->ksnc_rx_nob_left);
 
-                /* cancel timeout (only needed it while fmb allocated) */
-                conn->ksnc_rx_deadline = 0;
-
                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
                  * conn->ksnc_cookie */
                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
@@ -1631,15 +1680,20 @@ int ksocknal_scheduler (void *arg)
         int                id = sched - ksocknal_data.ksnd_schedulers;
         char               name[16];
 
-        snprintf (name, sizeof (name),"ksocknald[%d]", id);
+        snprintf (name, sizeof (name),"ksocknald_%02d", id);
         kportal_daemonize (name);
         kportal_blockallsigs ();
 
 #if (CONFIG_SMP && CPU_AFFINITY)
-        if ((cpu_online_map & (1 << id)) != 0)
+        if ((cpu_online_map & (1 << id)) != 0) {
+#if 1
                 current->cpus_allowed = (1 << id);
-        else
+#else
+                set_cpus_allowed (current, 1<<id);
+#endif
+        } else {
                 CERROR ("Can't set CPU affinity for %s\n", name);
+        }
 #endif /* CONFIG_SMP && CPU_AFFINITY */
         
         spin_lock_irqsave (&sched->kss_lock, flags);
@@ -1722,7 +1776,10 @@ ksocknal_data_ready (struct sock *sk, int n)
         if (conn == NULL) {             /* raced with ksocknal_close_sock */
                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
                 sk->sk_data_ready (sk, n);
-        } else if (!conn->ksnc_rx_ready) {        /* new news */
+                goto out;
+        }
+
+        if (!conn->ksnc_rx_ready) {        /* new news */
                 /* Set ASAP in case of concurrent calls to me */
                 conn->ksnc_rx_ready = 1;
 
@@ -1747,6 +1804,7 @@ ksocknal_data_ready (struct sock *sk, int n)
                 spin_unlock_irqrestore (&sched->kss_lock, flags);
         }
 
+ out:
         read_unlock (&ksocknal_data.ksnd_global_lock);
 
         EXIT;
@@ -1776,7 +1834,12 @@ ksocknal_write_space (struct sock *sk)
         if (conn == NULL) {             /* raced with ksocknal_close_sock */
                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
                 sk->sk_write_space (sk);
-        } else if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
+
+                read_unlock (&ksocknal_data.ksnd_global_lock);
+                return;
+        }
+
+        if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
 
                 if (!conn->ksnc_tx_ready) {      /* new news */
@@ -1967,7 +2030,7 @@ ksocknal_exchange_nids (struct socket *sock, ptl_nid_t nid)
 }
 
 int
-ksocknal_set_linger (struct socket *sock) 
+ksocknal_setup_sock (struct socket *sock) 
 {
         mm_segment_t    oldmm = get_fs ();
         int             rc;
@@ -1998,7 +2061,51 @@ ksocknal_set_linger (struct socket *sock)
                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
                 return (rc);
         }
+
+#if SOCKNAL_USE_KEEPALIVES
+        /* Keepalives: If 3/4 of the timeout elapses, start probing every
+         * second until the timeout elapses. */
+
+        option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
+        set_fs (KERNEL_DS);
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
+                                    (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
+                return (rc);
+        }
+        
+        option = 1;
+        set_fs (KERNEL_DS);
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
+                                    (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
+                return (rc);
+        }
         
+        option = ksocknal_data.ksnd_io_timeout / 4;
+        set_fs (KERNEL_DS);
+        rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
+                                    (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
+                return (rc);
+        }
+
+        option = 1;
+        set_fs (KERNEL_DS);
+        rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
+                              (char *)&option, sizeof (option));
+        set_fs (oldmm);
+        if (rc != 0) {
+                CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
+                return (rc);
+        }
+#endif
         return (0);
 }
 
@@ -2007,7 +2114,6 @@ ksocknal_connect_peer (ksock_route_t *route)
 {
         struct sockaddr_in  peer_addr;
         mm_segment_t        oldmm = get_fs();
-        __u64               n;
         struct timeval      tv;
         int                 fd;
         struct socket      *sock;
@@ -2020,8 +2126,8 @@ ksocknal_connect_peer (ksock_route_t *route)
         }
 
         /* Ugh; have to map_fd for compatibility with sockets passed in
-         * from userspace.  And we actually need the refcounting that
-         * this gives you :) */
+         * from userspace.  And we actually need the sock->file refcounting
+         * that this gives you :) */
 
         fd = sock_map_fd (sock);
         if (fd < 0) {
@@ -2033,22 +2139,19 @@ ksocknal_connect_peer (ksock_route_t *route)
         /* NB the fd now owns the ref on sock->file */
         LASSERT (sock->file != NULL);
         LASSERT (file_count(sock->file) == 1);
-        
+
         /* Set the socket timeouts, so our connection attempt completes in
          * finite time */
-        tv.tv_sec = ksocknal_io_timeout / HZ;
-        n = ksocknal_io_timeout % HZ;
-        n = n * 1000000 + HZ - 1;
-        do_div (n, HZ);
-        tv.tv_usec = n;
+        tv.tv_sec = ksocknal_data.ksnd_io_timeout;
+        tv.tv_usec = 0;
 
         set_fs (KERNEL_DS);
         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
                               (char *)&tv, sizeof (tv));
         set_fs (oldmm);
         if (rc != 0) {
-                CERROR ("Can't set send timeout %d (in HZ): %d\n", 
-                        ksocknal_io_timeout, rc);
+                CERROR ("Can't set send timeout %d: %d\n", 
+                        ksocknal_data.ksnd_io_timeout, rc);
                 goto out;
         }
         
@@ -2057,8 +2160,8 @@ ksocknal_connect_peer (ksock_route_t *route)
                               (char *)&tv, sizeof (tv));
         set_fs (oldmm);
         if (rc != 0) {
-                CERROR ("Can't set receive timeout %d (in HZ): %d\n",
-                        ksocknal_io_timeout, rc);
+                CERROR ("Can't set receive timeout %d: %d\n",
+                        ksocknal_data.ksnd_io_timeout, rc);
                 goto out;
         }
 
@@ -2154,7 +2257,7 @@ ksocknal_autoconnect (ksock_route_t *route)
         route->ksnr_connecting = 0;
 
         LASSERT (route->ksnr_retry_interval != 0);
-        route->ksnr_timeout = jiffies_64 + route->ksnr_retry_interval;
+        route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
 
@@ -2198,7 +2301,7 @@ ksocknal_autoconnectd (void *arg)
         ksock_route_t     *route;
         int                rc;
 
-        snprintf (name, sizeof (name), "ksocknal_ad[%ld]", id);
+        snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
         kportal_daemonize (name);
         kportal_blockallsigs ();
 
@@ -2239,55 +2342,47 @@ ksock_conn_t *
 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
 {
         /* We're called with a shared lock on ksnd_global_lock */
-        unsigned long      flags;
         ksock_conn_t      *conn;
         struct list_head  *ctmp;
-        ksock_tx_t        *tx;
-        struct list_head  *ttmp;
         ksock_sched_t     *sched;
 
         list_for_each (ctmp, &peer->ksnp_conns) {
                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
                 sched = conn->ksnc_scheduler;
-                
-                if (conn->ksnc_rx_deadline != 0 &&
-                    conn->ksnc_rx_deadline <= jiffies_64)
-                        goto timed_out;
 
-                spin_lock_irqsave (&sched->kss_lock, flags);
+                /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
+                LASSERT (!conn->ksnc_closing);
                 
-                list_for_each (ttmp, &conn->ksnc_tx_queue) {
-                        tx = list_entry (ttmp, ksock_tx_t, tx_list);
-                        LASSERT (tx->tx_deadline != 0);
-                        
-                        if (tx->tx_deadline <= jiffies_64)
-                                goto timed_out_locked;
+                if (conn->ksnc_rx_started &&
+                    time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
+                        /* Timed out incomplete incoming message */
+                        atomic_inc (&conn->ksnc_refcount);
+                        CERROR ("Timed out RX from "LPX64" %p\n", 
+                                peer->ksnp_nid, conn);
+                        return (conn);
                 }
-#if SOCKNAL_ZC
-                list_for_each (ttmp, &conn->ksnc_tx_pending) {
-                        tx = list_entry (ttmp, ksock_tx_t, tx_list);
-                        LASSERT (tx->tx_deadline != 0);
-
-                        if (tx->tx_deadline <= jiffies_64)
-                                goto timed_out_locked;
+                
+                if ((!list_empty (&conn->ksnc_tx_queue) ||
+                     conn->ksnc_sock->sk->wmem_queued != 0) &&
+                    time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
+                        /* Timed out messages queued for sending, or
+                         * messages buffered in the socket's send buffer */
+                        atomic_inc (&conn->ksnc_refcount);
+                        CERROR ("Timed out TX to "LPX64" %s%d %p\n", 
+                                peer->ksnp_nid, 
+                                list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
+                                conn->ksnc_sock->sk->wmem_queued, conn);
+                        return (conn);
                 }
-#endif                
-                spin_unlock_irqrestore (&sched->kss_lock, flags);
-                continue;
-
-        timed_out_locked:
-                spin_unlock_irqrestore (&sched->kss_lock, flags);
-        timed_out:
-                atomic_inc (&conn->ksnc_refcount);
-                return (conn);
         }
 
         return (NULL);
 }
 
 void
-ksocknal_check_peer_timeouts (struct list_head *peers)
+ksocknal_check_peer_timeouts (int idx)
 {
+        struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
         struct list_head *ptmp;
         ksock_peer_t     *peer;
         ksock_conn_t     *conn;
@@ -2305,7 +2400,7 @@ ksocknal_check_peer_timeouts (struct list_head *peers)
                 if (conn != NULL) {
                         read_unlock (&ksocknal_data.ksnd_global_lock);
 
-                        if (ksocknal_close_conn_unlocked (conn)) {
+                        if (ksocknal_close_conn_unlocked (conn, -ETIMEDOUT)) {
                                 /* I actually closed... */
                                 CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
                                         peer->ksnp_nid, conn->ksnc_ipaddr,
@@ -2330,8 +2425,9 @@ ksocknal_reaper (void *arg)
         unsigned long      flags;
         ksock_conn_t      *conn;
         int                timeout;
+        int                i;
         int                peer_index = 0;
-        __u64              deadline = jiffies_64;
+        unsigned long      deadline = jiffies;
         
         kportal_daemonize ("ksocknal_reaper");
         kportal_blockallsigs ();
@@ -2371,12 +2467,32 @@ ksocknal_reaper (void *arg)
                 
                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
 
-                while ((timeout = deadline - jiffies_64) <= 0) {
-                        /* Time to check for timeouts on a few more peers */
-                        ksocknal_check_peer_timeouts (&ksocknal_data.ksnd_peers[peer_index]);
+                /* careful with the jiffy wrap... */
+                while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
+                        const int n = 4;
+                        const int p = 1;
+                        int       chunk = ksocknal_data.ksnd_peer_hash_size;
+                        
+                        /* Time to check for timeouts on a few more peers: I do
+                         * checks every 'p' seconds on a proportion of the peer
+                         * table and I need to check every connection 'n' times
+                         * within a timeout interval, to ensure I detect a
+                         * timeout on any connection within (n+1)/n times the
+                         * timeout interval. */
+
+                        if (ksocknal_data.ksnd_io_timeout > n * p)
+                                chunk = (chunk * n * p) / 
+                                        ksocknal_data.ksnd_io_timeout;
+                        if (chunk == 0)
+                                chunk = 1;
+
+                        for (i = 0; i < chunk; i++) {
+                                ksocknal_check_peer_timeouts (peer_index);
+                                peer_index = (peer_index + 1) % 
+                                             ksocknal_data.ksnd_peer_hash_size;
+                        }
 
-                        peer_index = (peer_index + 1) % SOCKNAL_PEER_HASH_SIZE;
-                        deadline += HZ;
+                        deadline += p * HZ;
                 }
 
                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);