Whamcloud - gitweb
* Coded for new connection handshake.
authoreeb <eeb>
Sat, 12 Feb 2005 03:23:36 +0000 (03:23 +0000)
committereeb <eeb>
Sat, 12 Feb 2005 03:23:36 +0000 (03:23 +0000)
lnet/klnds/ralnd/ralnd.c
lnet/klnds/ralnd/ralnd.h
lnet/klnds/ralnd/ralnd_cb.c

index 014b4c6..bee886d 100644 (file)
@@ -539,29 +539,28 @@ kranal_set_conn_params(kra_conn_t *conn, kra_connreq_t *connreq,
         unsigned long  flags;
         RAP_RETURN     rrc;
 
-        /* tell scheduler to release the setri_mutex... */
+        /* CAVEAT EMPTOR: we're really overloading rac_last_tx + rac_keepalive
+         * to do RapkCompleteSync() timekeeping (see kibnal_scheduler). */
+        conn->rac_last_tx = jiffies;
+        conn->rac_keepalive = 0;
+
+        /* Schedule conn on rad_new_conns */
+        kranal_conn_addref(conn);
         spin_lock_irqsave(&dev->rad_lock, flags);
-        dev->rad_setri_please++;
+        list_add_tail(&conn->rac_schedlist, &dev->rad_new_conns);
         wake_up(&dev->rad_waitq);
         spin_unlock_irqrestore(&dev->rad_lock, flags);
-        /* ...and grab it */
-        down(&dev->rad_setri_mutex);
 
         rrc = RapkSetRiParams(conn->rac_rihandle, &connreq->racr_riparams);
         if (rrc != RAP_SUCCESS) {
                 CERROR("Error setting riparams from %u.%u.%u.%u/%d: %d\n",
                        HIPQUAD(peer_ip), peer_port, rrc);
-                return -EPROTO;
+                return -ECONNABORTED;
         }
 
-        /* release the setri_mutex... */
-        up(&dev->rad_setri_mutex);
-        /* ...and tell scheduler we're all done */
-        spin_lock_irqsave(&dev->rad_lock, flags);
-        dev->rad_setri_please--;
-        wake_up(&dev->rad_waitq);
-        spin_unlock_irqrestore(&dev->rad_lock, flags);
-        
+        /* Scheduler doesn't touch conn apart from to deschedule and decref it
+         * after RapkCompleteSync() return success, so conn is all mine */
+
         conn->rac_peerstamp = connreq->racr_peerstamp;
         conn->rac_peer_connstamp = connreq->racr_connstamp;
         conn->rac_keepalive = RANAL_TIMEOUT2KEEPALIVE(connreq->racr_timeout);
@@ -1743,11 +1742,6 @@ kranal_device_init(int id, kra_device_t *dev)
         const int         total_ntx = RANAL_NTX + RANAL_NTX_NBLK;
         RAP_RETURN        rrc;
 
-        /* The awful serialise RapkSetRiParams with the device scheduler
-         * work-around! */
-        dev->rad_setri_please = 0;
-        init_MUTEX(&dev->rad_setri_mutex);
-
         dev->rad_id = id;
         rrc = RapkGetDeviceByIndex(id, kranal_device_callback,
                                    &dev->rad_handle);
@@ -1851,13 +1845,19 @@ kranal_api_shutdown (nal_t *nal)
                 break;
         }
 
+        /* Conn/Peer state all cleaned up BEFORE setting shutdown, so threads
+         * don't have to worry about shutdown races */
+        LASSERT (atomic_read(&kranal_data.kra_nconns) == 0);
+        LASSERT (atomic_read(&kranal_data.kra_npeers) == 0);
+        
         /* flag threads to terminate; wake and wait for them to die */
         kranal_data.kra_shutdown = 1;
 
         for (i = 0; i < kranal_data.kra_ndevs; i++) {
                 kra_device_t *dev = &kranal_data.kra_devices[i];
 
-                LASSERT (list_empty(&dev->rad_connq));
+                LASSERT (list_empty(&dev->rad_ready_conns));
+                LASSERT (list_empty(&dev->rad_new_conns));
 
                 spin_lock_irqsave(&dev->rad_lock, flags);
                 wake_up(&dev->rad_waitq);
@@ -1961,7 +1961,8 @@ kranal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
                 kra_device_t  *dev = &kranal_data.kra_devices[i];
 
                 dev->rad_idx = i;
-                INIT_LIST_HEAD(&dev->rad_connq);
+                INIT_LIST_HEAD(&dev->rad_ready_conns);
+                INIT_LIST_HEAD(&dev->rad_new_conns);
                 init_waitqueue_head(&dev->rad_waitq);
                 spin_lock_init(&dev->rad_lock);
         }
index 5cb1640..97a1fa3 100644 (file)
@@ -110,12 +110,11 @@ typedef struct
         int                     rad_id;         /* device id */
         int                     rad_idx;        /* index in kra_devices */
         int                     rad_ready;      /* set by device callback */
-        struct list_head        rad_connq;      /* connections requiring attention */
+        struct list_head        rad_ready_conns;/* connections ready to tx/rx */
+        struct list_head        rad_new_conns;  /* new connections to complete */
         wait_queue_head_t       rad_waitq;      /* scheduler waits here */
         spinlock_t              rad_lock;       /* serialise */
         void                   *rad_scheduler;  /* scheduling thread */
-        int                     rad_setri_please; /* ++ when connd wants to setri */
-        struct semaphore        rad_setri_mutex; /* serialise setri */
 } kra_device_t;
 
 typedef struct
@@ -310,7 +309,7 @@ typedef struct kra_conn
         struct kra_peer    *rac_peer;           /* owning peer */
         struct list_head    rac_list;           /* stash on peer's conn list */
         struct list_head    rac_hashlist;       /* stash in connection hash table */
-        struct list_head    rac_schedlist;      /* schedule (on rad_connq) for attention */
+        struct list_head    rac_schedlist;      /* schedule (on rad_???_conns) for attention */
         struct list_head    rac_fmaq;           /* txs queued for FMA */
         struct list_head    rac_rdmaq;          /* txs awaiting RDMA completion */
         struct list_head    rac_replyq;         /* txs awaiting replies */
index fc5ed3f..3987235 100644 (file)
@@ -77,7 +77,7 @@ kranal_schedule_conn(kra_conn_t *conn)
         if (!conn->rac_scheduled) {
                 kranal_conn_addref(conn);       /* +1 ref for scheduler */
                 conn->rac_scheduled = 1;
-                list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
+                list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
                 wake_up(&dev->rad_waitq);
         }
 
@@ -1863,14 +1863,42 @@ kranal_complete_closed_conn (kra_conn_t *conn)
 }
 
 int
+kranal_process_new_conn (kra_conn_t *conn)
+{
+        RAP_RETURN   rrc;
+        
+        rrc = RapkCompleteSync(conn->rac_rihandle, 1);
+        if (rrc == RAP_SUCCESS)
+                return 0;
+
+        LASSERT (rrc == RAP_NOT_DONE);
+        if (!time_after_eq(jiffies, conn->rac_last_tx + 
+                           conn->rac_timeout * HZ))
+                return -EAGAIN;
+
+        /* Too late */
+        rrc = RapkCompleteSync(conn->rac_rihandle, 0);
+        LASSERT (rrc == RAP_SUCCESS);
+        return -ETIMEDOUT;
+}
+
+int
 kranal_scheduler (void *arg)
 {
-        kra_device_t   *dev = (kra_device_t *)arg;
-        wait_queue_t    wait;
-        char            name[16];
-        kra_conn_t     *conn;
-        unsigned long   flags;
-        int             busy_loops = 0;
+        kra_device_t     *dev = (kra_device_t *)arg;
+        wait_queue_t      wait;
+        char              name[16];
+        kra_conn_t       *conn;
+        unsigned long     flags;
+        unsigned long     deadline;
+        unsigned long     soonest;
+        int               nsoonest;
+        long              timeout;
+        struct list_head *tmp;
+        struct list_head *nxt;
+        int               rc;
+        int               dropped_lock;
+        int               busy_loops = 0;
 
         snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
         kportal_daemonize(name);
@@ -1879,9 +1907,6 @@ kranal_scheduler (void *arg)
         dev->rad_scheduler = current;
         init_waitqueue_entry(&wait, current);
 
-        /* prevent connd from doing setri until requested */
-        down(&dev->rad_setri_mutex);
-
         spin_lock_irqsave(&dev->rad_lock, flags);
 
         while (!kranal_data.kra_shutdown) {
@@ -1896,23 +1921,13 @@ kranal_scheduler (void *arg)
                         spin_lock_irqsave(&dev->rad_lock, flags);
                 }
 
-                /* Ghastly hack to ensure RapkSetRiParams() serialises with
-                 * other comms */
-                if (dev->rad_setri_please != 0) {
-                        spin_unlock_irqrestore(&dev->rad_lock, flags);
-                        up(&dev->rad_setri_mutex);
-                        
-                        wait_event_interruptible(dev->rad_waitq,
-                                                 dev->rad_setri_please == 0);
-                        
-                        down(&dev->rad_setri_mutex);
-                        spin_lock_irqsave(&dev->rad_lock, flags);
-                }
-                
+                dropped_lock = 0;
+
                 if (dev->rad_ready) {
                         /* Device callback fired since I last checked it */
                         dev->rad_ready = 0;
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
+                        dropped_lock = 1;
 
                         kranal_check_rdma_cq(dev);
                         kranal_check_fma_cq(dev);
@@ -1920,14 +1935,14 @@ kranal_scheduler (void *arg)
                         spin_lock_irqsave(&dev->rad_lock, flags);
                 }
 
-                if (!list_empty(&dev->rad_connq)) {
-                        /* Connection needs attention */
-                        conn = list_entry(dev->rad_connq.next,
-                                          kra_conn_t, rac_schedlist);
+                list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
+                        conn = list_entry(tmp, kra_conn_t, rac_schedlist);
+
                         list_del_init(&conn->rac_schedlist);
                         LASSERT (conn->rac_scheduled);
                         conn->rac_scheduled = 0;
                         spin_unlock_irqrestore(&dev->rad_lock, flags);
+                        dropped_lock = 1;
 
                         kranal_check_fma_rx(conn);
                         kranal_process_fmaq(conn);
@@ -1936,31 +1951,75 @@ kranal_scheduler (void *arg)
                                 kranal_complete_closed_conn(conn);
 
                         kranal_conn_decref(conn);
-
                         spin_lock_irqsave(&dev->rad_lock, flags);
-                        continue;
                 }
 
-                /* recheck device callback fired before sleeping */
-                if (dev->rad_ready)
+                nsoonest = 0;
+                soonest = jiffies;
+
+                list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
+                        conn = list_entry(tmp, kra_conn_t, rac_schedlist);
+                        
+                        deadline = conn->rac_last_tx + conn->rac_keepalive;
+                        if (time_after_eq(jiffies, deadline)) {
+                                /* Time to process this new conn */
+                                spin_unlock_irqrestore(&dev->rad_lock, flags);
+                                dropped_lock = 1;
+
+                                rc = kranal_process_new_conn(conn);
+                                if (rc != -EAGAIN) {
+                                        /* All done with this conn */
+                                        spin_lock_irqsave(&dev->rad_lock, flags);
+                                        list_del(&conn->rac_schedlist);
+                                        spin_unlock_irqrestore(&dev->rad_lock, flags);
+
+                                        kranal_conn_decref(conn);
+                                        spin_lock_irqsave(&dev->rad_lock, flags);
+                                        continue;
+                                }
+
+                                /* retry with exponential backoff until HZ */
+                                if (conn->rac_keepalive == 0)
+                                        conn->rac_keepalive = 1;
+                                else if (conn->rac_keepalive <= HZ)
+                                        conn->rac_keepalive *= 2;
+                                else
+                                        conn->rac_keepalive += HZ;
+                                
+                                deadline = conn->rac_last_tx + conn->rac_keepalive;
+                                spin_lock_irqsave(&dev->rad_lock, flags);
+                        }
+
+                        /* Does this conn need attention soonest? */
+                        if (nsoonest++ == 0 ||
+                            !time_after_eq(deadline, soonest))
+                                soonest = deadline;
+                }
+
+                if (dropped_lock)               /* may sleep iff I didn't drop the lock */
                         continue;
 
-                add_wait_queue(&dev->rad_waitq, &wait);
                 set_current_state(TASK_INTERRUPTIBLE);
-
+                add_wait_queue(&dev->rad_waitq, &wait);
                 spin_unlock_irqrestore(&dev->rad_lock, flags);
 
-                busy_loops = 0;
-                schedule();
+                if (nsoonest == 0) {
+                        busy_loops = 0;
+                        schedule();
+                } else {
+                        timeout = (long)(soonest - jiffies);
+                        if (timeout > 0) {
+                                busy_loops = 0;
+                                schedule_timeout(timeout);
+                        }
+                }
 
-                set_current_state(TASK_RUNNING);
                 remove_wait_queue(&dev->rad_waitq, &wait);
-
+                set_current_state(TASK_RUNNING);
                 spin_lock_irqsave(&dev->rad_lock, flags);
         }
 
         spin_unlock_irqrestore(&dev->rad_lock, flags);
-        up(&dev->rad_setri_mutex);
 
         dev->rad_scheduler = NULL;
         kranal_thread_fini();