Whamcloud - gitweb
* Added ENOMEM detection and retry on socknal sends
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
index 22345fe..9e04712 100644 (file)
@@ -223,22 +223,20 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
                 set_fs (oldmm);
         } 
 
-        if (rc <= 0)
-                return (rc);
-
-        tx->tx_resid -= rc;
-
-        if (rc < iov->iov_len) {
-                /* didn't send whole iov entry... */
-                iov->iov_base = (void *)(vaddr + rc);
-                iov->iov_len -= rc;
-                /* ...but did we send everything we tried to send? */
-                return ((rc == fragsize) ? 1 : -EAGAIN);
+        if (rc > 0) {
+                tx->tx_resid -= rc;
+
+                if (rc < iov->iov_len) {
+                        /* didn't send whole iov entry... */
+                        iov->iov_base = (void *)(vaddr + rc);
+                        iov->iov_len -= rc;
+                } else {
+                        tx->tx_iov++;
+                        tx->tx_niov--;
+                }
         }
-
-        tx->tx_iov++;
-        tx->tx_niov--;
-        return (1);
+        
+        return (rc);
 }
 
 int
@@ -295,61 +293,84 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
                 kunmap (page);
         }
 
-        if (rc <= 0)
-                return (rc);
-
-        tx->tx_resid -= rc;
+        if (rc > 0) {
+                tx->tx_resid -= rc;
  
-        if (rc < fragsize) {
-                /* didn't send whole frag */
-                kiov->kiov_offset = offset + rc;
-                kiov->kiov_len    = fragsize - rc;
-                return (-EAGAIN);
+                if (rc < fragsize) {
+                        kiov->kiov_offset = offset + rc;
+                        kiov->kiov_len    = fragsize - rc;
+                } else {
+                        tx->tx_kiov++;
+                        tx->tx_nkiov--;
+                }
         }
 
-        /* everything went */
-        LASSERT (rc == fragsize);
-        tx->tx_kiov++;
-        tx->tx_nkiov--;
-        return (1);
+        return (rc);
 }
 
 int
 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
 {
-        /* Return 0 on success, < 0 on error.
-         * caller checks tx_resid to determine progress/completion */
         int      rc;
-        ENTRY;
         
         if (ksocknal_data.ksnd_stall_tx != 0) {
                 set_current_state (TASK_UNINTERRUPTIBLE);
                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
         }
 
+        LASSERT (tx->tx_resid != 0);
+
         rc = ksocknal_getconnsock (conn);
         if (rc != 0) {
                 LASSERT (conn->ksnc_closing);
-                return (rc);
+                return (-ESHUTDOWN);
         }
 
-        for (;;) {
-                LASSERT (tx->tx_resid != 0);
-
-                if (tx->tx_niov != 0)
+        do {
+                if (ksocknal_data.ksnd_enomem_tx > 0) {
+                        /* testing... */
+                        ksocknal_data.ksnd_enomem_tx--;
+                        rc = -EAGAIN;
+                } else if (tx->tx_niov != 0) {
                         rc = ksocknal_send_iov (conn, tx);
-                else
+                } else {
                         rc = ksocknal_send_kiov (conn, tx);
+                }
 
-                if (rc <= 0) {                  /* error or socket full? */
-                        /* NB: rc == 0 and rc == -EAGAIN both mean try
+                if (rc <= 0) {
+                        /* Didn't write anything.
+                         *
+                         * NB: rc == 0 and rc == -EAGAIN both mean try
                          * again later (linux stack returns -EAGAIN for
-                         * this, but Adaptech TOE returns 0) */
-                        if (rc == -EAGAIN)
-                                rc = 0;
+                         * this, but Adaptech TOE returns 0).
+                         *
+                         * Also, sends never fail with -ENOMEM, just
+                         * -EAGAIN, but with the added bonus that we can't
+                         * expect write_space() to call us back to tell us
+                         * when to try sending again.  We use the
+                         * SOCK_NOSPACE flag to diagnose...  */
+
+                        LASSERT(rc != -ENOMEM);
+
+                        if (rc == 0 || rc == -EAGAIN) {
+                                if (test_bit(SOCK_NOSPACE, 
+                                             &conn->ksnc_sock->flags)) {
+                                        rc = -EAGAIN;
+                                } else {
+                                        static int counter;
+                         
+                                        counter++;
+                                        if ((counter & (-counter)) == counter)
+                                                CWARN("%d ENOMEM tx %p\n", 
+                                                      counter, conn);
+                                        rc = -ENOMEM;
+                                }
+                        }
                         break;
                 }
 
+                rc = 0;
+
                 /* Consider the connection alive since we managed to chuck
                  * more data into it.  Really, we'd like to consider it
                  * alive only when the peer ACKs something, but
@@ -360,14 +381,10 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                                          ksocknal_data.ksnd_io_timeout * HZ;
                 conn->ksnc_peer->ksnp_last_alive = jiffies;
 
-                if (tx->tx_resid == 0) {        /* sent everything */
-                        rc = 0;
-                        break;
-                }
-        }
+        } while (tx->tx_resid != 0);
 
         ksocknal_putconnsock (conn);
-        RETURN (rc);
+        return (rc);
 }
 
 void
@@ -518,7 +535,7 @@ ksocknal_receive (ksock_conn_t *conn)
         rc = ksocknal_getconnsock (conn);
         if (rc != 0) {
                 LASSERT (conn->ksnc_closing);
-                return (rc);
+                return (-ESHUTDOWN);
         }
 
         for (;;) {
@@ -639,32 +656,52 @@ ksocknal_tx_launched (ksock_tx_t *tx)
 int
 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
 {
+        unsigned long  flags;
         int            rc;
        
         rc = ksocknal_transmit (conn, tx);
 
         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
-        LASSERT (rc != -EAGAIN);
 
-        if (rc == 0) {
-                /* no errors */
-                if (tx->tx_resid != 0) {
-                        /* didn't send everything */
-                        return (-EAGAIN);
-                }
-                
+        if (tx->tx_resid == 0) {
+                /* Sent everything OK */
+                LASSERT (rc == 0);
+
                 ksocknal_tx_launched (tx);
                 return (0);
         }
 
+        if (rc == -EAGAIN)
+                return (rc);
+
+        if (rc == -ENOMEM) {
+                /* Queue on ksnd_enomem_conns for retry after a timeout */
+                spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
+
+                /* enomem list takes over scheduler's ref... */
+                LASSERT (conn->ksnc_tx_scheduled);
+                list_add_tail(&conn->ksnc_tx_list,
+                              &ksocknal_data.ksnd_enomem_conns);
+                if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
+                                   ksocknal_data.ksnd_reaper_waketime))
+                        wake_up (&ksocknal_data.ksnd_reaper_waitq);
+                
+                spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
+                return (rc);
+        }
+
+        /* Actual error */
+        LASSERT (rc < 0);
+
         if (!conn->ksnc_closing)
-                CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
-                        conn, rc, conn->ksnc_peer->ksnp_nid,
+                CERROR ("[%p] Error %d on write to "LPX64
+                        " ip %08x:%d\n",conn, rc, 
+                        conn->ksnc_peer->ksnp_nid,
                         conn->ksnc_ipaddr, conn->ksnc_port);
 
         ksocknal_close_conn_and_siblings (conn, rc);
         ksocknal_tx_launched (tx);
-
+        
         return (rc);
 } 
 
@@ -1734,16 +1771,19 @@ int ksocknal_scheduler (void *arg)
 
                         spin_lock_irqsave (&sched->kss_lock, flags);
 
-                        if (rc != -EAGAIN) {
-                                /* error or everything went: assume more can go */
-                                conn->ksnc_tx_ready = 1;
-                        } else {
-                                 /* back onto HEAD of tx_queue */
+                        if (rc == -ENOMEM || rc == -EAGAIN) {
+                                /* Incomplete send: replace tx on HEAD of tx_queue */
                                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
+                        } else {
+                                /* Complete send; assume space for more */
+                                conn->ksnc_tx_ready = 1;
                         }
-                        
-                        if (conn->ksnc_tx_ready &&
-                            !list_empty (&conn->ksnc_tx_queue)) {
+
+                        if (rc == -ENOMEM) {
+                                /* Do nothing; after a short timeout, this
+                                 * conn will be reposted on kss_tx_conns. */
+                        } else if (conn->ksnc_tx_ready &&
+                                   !list_empty (&conn->ksnc_tx_queue)) {
                                 /* reschedule for tx */
                                 list_add_tail (&conn->ksnc_tx_list, 
                                                &sched->kss_tx_conns);
@@ -2503,6 +2543,9 @@ ksocknal_reaper (void *arg)
         wait_queue_t       wait;
         unsigned long      flags;
         ksock_conn_t      *conn;
+        ksock_sched_t     *sched;
+        struct list_head   enomem_conns;
+        int                nenomem_conns;
         int                timeout;
         int                i;
         int                peer_index = 0;
@@ -2511,6 +2554,7 @@ ksocknal_reaper (void *arg)
         kportal_daemonize ("ksocknal_reaper");
         kportal_blockallsigs ();
 
+        INIT_LIST_HEAD(&enomem_conns);
         init_waitqueue_entry (&wait, current);
 
         current->flags |= PF_MEMALLOC;
@@ -2545,11 +2589,36 @@ ksocknal_reaper (void *arg)
                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
                         continue;
                 }
-                
+
+                if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
+                        list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
+                        list_del_init(&ksocknal_data.ksnd_enomem_conns);
+                }
+
                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
 
+                /* reschedule all the connections that stalled with ENOMEM... */
+                nenomem_conns = 0;
+                while (!list_empty (&enomem_conns)) {
+                        conn = list_entry (enomem_conns.next,
+                                           ksock_conn_t, ksnc_tx_list);
+                        list_del (&conn->ksnc_tx_list);
+
+                        sched = conn->ksnc_scheduler;
+
+                        spin_lock_irqsave (&sched->kss_lock, flags);
+
+                        LASSERT (conn->ksnc_tx_scheduled);
+                        conn->ksnc_tx_ready = 1;
+                        list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
+                        wake_up (&sched->kss_waitq);
+
+                        spin_unlock_irqrestore (&sched->kss_lock, flags);
+                        nenomem_conns++;
+                }
+                
                 /* careful with the jiffy wrap... */
-                while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
+                while ((timeout = (int)(deadline - jiffies)) <= 0) {
                         const int n = 4;
                         const int p = 1;
                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
@@ -2576,6 +2645,14 @@ ksocknal_reaper (void *arg)
                         deadline += p * HZ;
                 }
 
+                if (nenomem_conns != 0) {
+                        /* Reduce my timeout if I rescheduled ENOMEM conns.
+                         * This also prevents me getting woken immediately
+                         * if any go back on my enomem list. */
+                        timeout = SOCKNAL_ENOMEM_RETRY;
+                }
+                ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
+
                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
                 set_current_state (TASK_INTERRUPTIBLE);