Whamcloud - gitweb
* vibnal fixes
[fs/lustre-release.git] / lnet / klnds / ralnd / ralnd_cb.c
index 38f1b77..d5165f4 100644 (file)
@@ -77,7 +77,7 @@ kranal_schedule_conn(kra_conn_t *conn)
         if (!conn->rac_scheduled) {
                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
                 conn->rac_scheduled = 1;
-                list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
+                list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
                 wake_up(&dev->rad_waitq);
         }
 
@@ -630,7 +630,7 @@ kranal_do_send (lib_nal_t    *nal,
                         break;                  /* RDMA not expected */
                 }
 
-                /* Incoming message consistent with immediate reply? */
+                /* Incoming message consistent with RDMA? */
                 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
                         CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
                                nid, conn->rac_rxmsg->ram_type);
@@ -1397,6 +1397,10 @@ kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
                 return 0;
 
         case RAP_NOT_DONE:
+                if (time_after_eq(jiffies,
+                                  conn->rac_last_tx + conn->rac_keepalive*HZ))
+                        CDEBUG(D_WARNING, "EAGAIN sending %02x (idle %lu secs)\n",
+                               msg->ram_type, (jiffies - conn->rac_last_tx)/HZ);
                 return -EAGAIN;
         }
 }
@@ -1466,7 +1470,9 @@ kranal_process_fmaq (kra_conn_t *conn)
 
                 if (time_after_eq(jiffies,
                                   conn->rac_last_tx + conn->rac_keepalive * HZ)) {
-                        CDEBUG(D_NET, "sending NOOP (idle)\n");
+                        CDEBUG(D_NET, "sending NOOP -> "LPX64" (%p idle %lu(%ld))\n",
+                               conn->rac_peer->rap_nid, conn,
+                               (jiffies - conn->rac_last_tx)/HZ, conn->rac_keepalive);
                         kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
                         kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
                 }
@@ -1829,12 +1835,14 @@ void
 kranal_complete_closed_conn (kra_conn_t *conn)
 {
         kra_tx_t   *tx;
+        int         nfma;
+        int         nreplies;
 
         LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
         LASSERT (list_empty(&conn->rac_list));
         LASSERT (list_empty(&conn->rac_hashlist));
 
-        while (!list_empty(&conn->rac_fmaq)) {
+        for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) {
                 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
 
                 list_del(&tx->tx_list);
@@ -1843,23 +1851,54 @@ kranal_complete_closed_conn (kra_conn_t *conn)
 
         LASSERT (list_empty(&conn->rac_rdmaq));
 
-        while (!list_empty(&conn->rac_replyq)) {
+        for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) {
                 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
 
                 list_del(&tx->tx_list);
                 kranal_tx_done(tx, -ECONNABORTED);
         }
+
+        CDEBUG(D_WARNING, "Closed conn %p -> "LPX64": nmsg %d nreplies %d\n",
+               conn, conn->rac_peer->rap_nid, nfma, nreplies);
+}
+
+int
+kranal_process_new_conn (kra_conn_t *conn)
+{
+        RAP_RETURN   rrc;
+        
+        rrc = RapkCompleteSync(conn->rac_rihandle, 1);
+        if (rrc == RAP_SUCCESS)
+                return 0;
+
+        LASSERT (rrc == RAP_NOT_DONE);
+        if (!time_after_eq(jiffies, conn->rac_last_tx + 
+                           conn->rac_timeout * HZ))
+                return -EAGAIN;
+
+        /* Too late */
+        rrc = RapkCompleteSync(conn->rac_rihandle, 0);
+        LASSERT (rrc == RAP_SUCCESS);
+        return -ETIMEDOUT;
 }
 
 int
 kranal_scheduler (void *arg)
 {
-        kra_device_t   *dev = (kra_device_t *)arg;
-        wait_queue_t    wait;
-        char            name[16];
-        kra_conn_t     *conn;
-        unsigned long   flags;
-        int             busy_loops = 0;
+        kra_device_t     *dev = (kra_device_t *)arg;
+        wait_queue_t      wait;
+        char              name[16];
+        kra_conn_t       *conn;
+        unsigned long     flags;
+        unsigned long     deadline;
+        unsigned long     soonest;
+        int               nsoonest;
+        long              timeout;
+        struct list_head *tmp;
+        struct list_head *nxt;
+        int               rc;
+        int               dropped_lock;
+        int               busy_loops = 0;
 
         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
         kportal_daemonize(name);
@@ -1882,10 +1921,13 @@ kranal_scheduler (void *arg)
                         spin_lock_irqsave(&dev->rad_lock, flags);
                 }
 
+                dropped_lock = 0;
+
                 if (dev->rad_ready) {
                         /* Device callback fired since I last checked it */
                         dev->rad_ready = 0;
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
+                        dropped_lock = 1;
 
                         kranal_check_rdma_cq(dev);
                         kranal_check_fma_cq(dev);
@@ -1893,14 +1935,14 @@ kranal_scheduler (void *arg)
                         spin_lock_irqsave(&dev->rad_lock, flags);
                 }
 
-                if (!list_empty(&dev->rad_connq)) {
-                        /* Connection needs attention */
-                        conn = list_entry(dev->rad_connq.next,
-                                          kra_conn_t, rac_schedlist);
+                list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
+                        conn = list_entry(tmp, kra_conn_t, rac_schedlist);
+
                         list_del_init(&conn->rac_schedlist);
                         LASSERT (conn->rac_scheduled);
                         conn->rac_scheduled = 0;
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
+                        dropped_lock = 1;
 
                         kranal_check_fma_rx(conn);
                         kranal_process_fmaq(conn);
@@ -1909,26 +1951,71 @@ kranal_scheduler (void *arg)
                                 kranal_complete_closed_conn(conn);
 
                         kranal_conn_decref(conn);
-
                         spin_lock_irqsave(&dev->rad_lock, flags);
-                        continue;
                 }
 
-                /* recheck device callback fired before sleeping */
-                if (dev->rad_ready)
+                nsoonest = 0;
+                soonest = jiffies;
+
+                list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
+                        conn = list_entry(tmp, kra_conn_t, rac_schedlist);
+                        
+                        deadline = conn->rac_last_tx + conn->rac_keepalive;
+                        if (time_after_eq(jiffies, deadline)) {
+                                /* Time to process this new conn */
+                                spin_unlock_irqrestore(&dev->rad_lock, flags);
+                                dropped_lock = 1;
+
+                                rc = kranal_process_new_conn(conn);
+                                if (rc != -EAGAIN) {
+                                        /* All done with this conn */
+                                        spin_lock_irqsave(&dev->rad_lock, flags);
+                                        list_del_init(&conn->rac_schedlist);
+                                        spin_unlock_irqrestore(&dev->rad_lock, flags);
+
+                                        kranal_conn_decref(conn);
+                                        spin_lock_irqsave(&dev->rad_lock, flags);
+                                        continue;
+                                }
+
+                                /* retry with exponential backoff until HZ */
+                                if (conn->rac_keepalive == 0)
+                                        conn->rac_keepalive = 1;
+                                else if (conn->rac_keepalive <= HZ)
+                                        conn->rac_keepalive *= 2;
+                                else
+                                        conn->rac_keepalive += HZ;
+                                
+                                deadline = conn->rac_last_tx + conn->rac_keepalive;
+                                spin_lock_irqsave(&dev->rad_lock, flags);
+                        }
+
+                        /* Does this conn need attention soonest? */
+                        if (nsoonest++ == 0 ||
+                            !time_after_eq(deadline, soonest))
+                                soonest = deadline;
+                }
+
+                if (dropped_lock)               /* may sleep iff I didn't drop the lock */
                         continue;
 
-                add_wait_queue(&dev->rad_waitq, &wait);
                 set_current_state(TASK_INTERRUPTIBLE);
-
+                add_wait_queue(&dev->rad_waitq, &wait);
                 spin_unlock_irqrestore(&dev->rad_lock, flags);
 
-                busy_loops = 0;
-                schedule();
+                if (nsoonest == 0) {
+                        busy_loops = 0;
+                        schedule();
+                } else {
+                        timeout = (long)(soonest - jiffies);
+                        if (timeout > 0) {
+                                busy_loops = 0;
+                                schedule_timeout(timeout);
+                        }
+                }
 
-                set_current_state(TASK_RUNNING);
                 remove_wait_queue(&dev->rad_waitq, &wait);
-
+                set_current_state(TASK_RUNNING);
                 spin_lock_irqsave(&dev->rad_lock, flags);
         }