Whamcloud - gitweb
LU-8 dynamically grow/shrink socklnd connd threads
authorLiang Zhen <liang@whamcloud.com>
Fri, 1 Apr 2011 07:16:33 +0000 (15:16 +0800)
committerOleg Drokin <green@whamcloud.com>
Fri, 3 Jun 2011 21:19:43 +0000 (14:19 -0700)
if multiple nodes are down, all socklnd connds could be blocked for a
long while, we can workaround this by increase default nconnds but it
always requires to have unnecessary number of threads.This patch can
support dynamically grow/shrink connd threads pool, it
can create new thread if there's pending active connecting, it will
kill some threads if there are too many idle connds.

Change-Id: Icbb52e8029d7c4c85d87be69e0a5b440832b01ba
Signed-off-by: Liang Zhen <liang@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/390
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Lai Siyao <laisiyao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/klnds/socklnd/socklnd.c
lnet/klnds/socklnd/socklnd.h
lnet/klnds/socklnd/socklnd_cb.c
lnet/klnds/socklnd/socklnd_modparams.c

index 96a9bc3..7763027 100644 (file)
@@ -2422,15 +2422,31 @@ ksocknal_base_startup (void)
                 }
         }
 
                 }
         }
 
+        ksocknal_data.ksnd_connd_starting         = 0;
+        ksocknal_data.ksnd_connd_failed_stamp     = 0;
+        ksocknal_data.ksnd_connd_starting_stamp   = cfs_time_current_sec();
         /* must have at least 2 connds to remain responsive to accepts while
          * connecting */
         /* must have at least 2 connds to remain responsive to accepts while
          * connecting */
-        if (*ksocknal_tunables.ksnd_nconnds < 2)
-                *ksocknal_tunables.ksnd_nconnds = 2;
+        if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
+                *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
+
+        if (*ksocknal_tunables.ksnd_nconnds_max <
+            *ksocknal_tunables.ksnd_nconnds) {
+                ksocknal_tunables.ksnd_nconnds_max =
+                        ksocknal_tunables.ksnd_nconnds;
+        }
 
         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
 
         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
+                cfs_spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
+                ksocknal_data.ksnd_connd_starting++;
+                cfs_spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
+
                 rc = ksocknal_thread_start (ksocknal_connd,
                                             (void *)((ulong_ptr_t)i));
                 if (rc != 0) {
                 rc = ksocknal_thread_start (ksocknal_connd,
                                             (void *)((ulong_ptr_t)i));
                 if (rc != 0) {
+                        cfs_spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
+                        ksocknal_data.ksnd_connd_starting--;
+                        cfs_spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
                         CERROR("Can't spawn socknal connd: %d\n", rc);
                         goto failed;
                 }
                         CERROR("Can't spawn socknal connd: %d\n", rc);
                         goto failed;
                 }
index 724e216..e491b18 100644 (file)
@@ -102,6 +102,7 @@ typedef struct
 {
         int              *ksnd_timeout;         /* "stuck" socket timeout (seconds) */
         int              *ksnd_nconnds;         /* # connection daemons */
 {
         int              *ksnd_timeout;         /* "stuck" socket timeout (seconds) */
         int              *ksnd_nconnds;         /* # connection daemons */
+        int              *ksnd_nconnds_max;     /* max # connection daemons */
         int              *ksnd_min_reconnectms; /* first connection retry after (ms)... */
         int              *ksnd_max_reconnectms; /* ...exponentially increasing to this */
         int              *ksnd_eager_ack;       /* make TCP ack eagerly? */
         int              *ksnd_min_reconnectms; /* first connection retry after (ms)... */
         int              *ksnd_max_reconnectms; /* ...exponentially increasing to this */
         int              *ksnd_eager_ack;       /* make TCP ack eagerly? */
@@ -150,6 +151,11 @@ typedef struct
         ksock_interface_t ksnn_interfaces[LNET_MAX_INTERFACES];
 } ksock_net_t;
 
         ksock_interface_t ksnn_interfaces[LNET_MAX_INTERFACES];
 } ksock_net_t;
 
+/** connd timeout */
+#define SOCKNAL_CONND_TIMEOUT  120
+/** reserved thread for accepting & creating new connd */
+#define SOCKNAL_CONND_RESV     1
+
 typedef struct
 {
         int               ksnd_init;           /* initialisation state */
 typedef struct
 {
         int               ksnd_init;           /* initialisation state */
@@ -181,6 +187,14 @@ typedef struct
         cfs_list_t        ksnd_connd_routes;   /* routes waiting to be connected */
         cfs_waitq_t       ksnd_connd_waitq;    /* connds sleep here */
         int               ksnd_connd_connecting;/* # connds connecting */
         cfs_list_t        ksnd_connd_routes;   /* routes waiting to be connected */
         cfs_waitq_t       ksnd_connd_waitq;    /* connds sleep here */
         int               ksnd_connd_connecting;/* # connds connecting */
+        /** time stamp of the last failed connecting attempt */
+        long              ksnd_connd_failed_stamp;
+        /** # starting connd */
+        unsigned          ksnd_connd_starting;
+        /** time stamp of the last starting connd */
+        long              ksnd_connd_starting_stamp;
+        /** # running connd */
+        unsigned          ksnd_connd_running;
         cfs_spinlock_t    ksnd_connd_lock;     /* serialise */
 
         cfs_list_t        ksnd_idle_noop_txs;  /* list head for freed noop tx */
         cfs_spinlock_t    ksnd_connd_lock;     /* serialise */
 
         cfs_list_t        ksnd_idle_noop_txs;  /* list head for freed noop tx */
index 27b816b..93e2e54 100644 (file)
@@ -2005,20 +2005,115 @@ ksocknal_connect (ksock_route_t *route)
         return 0;
 }
 
         return 0;
 }
 
-/* Go through connd_routes queue looking for a route that
-   we can process right now */
+/*
+ * check whether we need to create more connds.
+ * It will try to create new thread if it's necessary, @timeout can
+ * be updated if failed to create, so caller wouldn't keep try while
+ * running out of resource.
+ */
+static int
+ksocknal_connd_check_start(long sec, long *timeout)
+{
+        int rc;
+        int total = ksocknal_data.ksnd_connd_starting +
+                    ksocknal_data.ksnd_connd_running;
+
+        if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
+                /* still in initializing */
+                return 0;
+        }
+
+        if (total >= *ksocknal_tunables.ksnd_nconnds_max ||
+            total > ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV) {
+                /* can't create more connd, or still have enough
+                 * threads to handle more connecting */
+                return 0;
+        }
+
+        if (list_empty(&ksocknal_data.ksnd_connd_routes)) {
+                /* no pending connecting request */
+                return 0;
+        }
+
+        if (sec - ksocknal_data.ksnd_connd_failed_stamp <= 1) {
+                /* may run out of resource, retry later */
+                *timeout = cfs_time_seconds(1);
+                return 0;
+        }
+
+        if (ksocknal_data.ksnd_connd_starting > 0) {
+                /* serialize starting to avoid flood */
+                return 0;
+        }
+
+        ksocknal_data.ksnd_connd_starting_stamp = sec;
+        ksocknal_data.ksnd_connd_starting++;
+        cfs_spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
+
+        /* NB: total is the next id */
+        rc = ksocknal_thread_start(ksocknal_connd, (void *)((long)total));
+
+        cfs_spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
+        if (rc == 0)
+                return 1;
+
+        /* we tried ... */
+        LASSERT(ksocknal_data.ksnd_connd_starting > 0);
+        ksocknal_data.ksnd_connd_starting--;
+        ksocknal_data.ksnd_connd_failed_stamp = cfs_time_current_sec();
+
+        return 1;
+}
+
+/*
+ * check whether current thread can exit, it will return 1 if there are too
+ * many threads and no creating in past 120 seconds.
+ * Also, this function may update @timeout to make caller come back
+ * again to recheck these conditions.
+ */
+static int
+ksocknal_connd_check_stop(long sec, long *timeout)
+{
+        int val;
+
+        if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
+                /* still in initializing */
+                return 0;
+        }
+
+        if (ksocknal_data.ksnd_connd_starting > 0) {
+                /* in progress of starting new thread */
+                return 0;
+        }
+
+        if (ksocknal_data.ksnd_connd_running <=
+            *ksocknal_tunables.ksnd_nconnds) { /* can't shrink */
+                return 0;
+        }
+
+        /* created thread in past 120 seconds? */
+        val = (int)(ksocknal_data.ksnd_connd_starting_stamp +
+                    SOCKNAL_CONND_TIMEOUT - sec);
+
+        *timeout = (val > 0) ? cfs_time_seconds(val) :
+                               cfs_time_seconds(SOCKNAL_CONND_TIMEOUT);
+        if (val > 0)
+                return 0;
+
+        /* no creating in past 120 seconds */
+
+        return ksocknal_data.ksnd_connd_running >
+               ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV;
+}
+
+/* Go through connd_routes queue looking for a route that we can process
+ * right now, @timeout_p can be updated if we need to come back later */
 static ksock_route_t *
 ksocknal_connd_get_route_locked(signed long *timeout_p)
 {
         ksock_route_t *route;
         cfs_time_t     now;
 
 static ksock_route_t *
 ksocknal_connd_get_route_locked(signed long *timeout_p)
 {
         ksock_route_t *route;
         cfs_time_t     now;
 
-        /* Only handle an outgoing connection request if there
-         * is someone left to handle incoming connections */
-        if ((ksocknal_data.ksnd_connd_connecting + 1) >=
-            *ksocknal_tunables.ksnd_nconnds)
-                return NULL;
-
         now = cfs_time_current();
 
         /* connd_routes can contain both pending and ordinary routes */
         now = cfs_time_current();
 
         /* connd_routes can contain both pending and ordinary routes */
@@ -2040,15 +2135,13 @@ ksocknal_connd_get_route_locked(signed long *timeout_p)
 int
 ksocknal_connd (void *arg)
 {
 int
 ksocknal_connd (void *arg)
 {
+        cfs_spinlock_t    *connd_lock = &ksocknal_data.ksnd_connd_lock;
         long               id = (long)(long_ptr_t)arg;
         char               name[16];
         ksock_connreq_t   *cr;
         long               id = (long)(long_ptr_t)arg;
         char               name[16];
         ksock_connreq_t   *cr;
-        ksock_route_t     *route;
         cfs_waitlink_t     wait;
         cfs_waitlink_t     wait;
-        signed long        timeout;
         int                nloops = 0;
         int                cons_retry = 0;
         int                nloops = 0;
         int                cons_retry = 0;
-        int                dropped_lock;
 
         snprintf (name, sizeof (name), "socknal_cd%02ld", id);
         cfs_daemonize (name);
 
         snprintf (name, sizeof (name), "socknal_cd%02ld", id);
         cfs_daemonize (name);
@@ -2056,11 +2149,28 @@ ksocknal_connd (void *arg)
 
         cfs_waitlink_init (&wait);
 
 
         cfs_waitlink_init (&wait);
 
-        cfs_spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
+        cfs_spin_lock_bh (connd_lock);
+
+        LASSERT(ksocknal_data.ksnd_connd_starting > 0);
+        ksocknal_data.ksnd_connd_starting--;
+        ksocknal_data.ksnd_connd_running++;
 
         while (!ksocknal_data.ksnd_shuttingdown) {
 
         while (!ksocknal_data.ksnd_shuttingdown) {
+                ksock_route_t *route = NULL;
+                long sec = cfs_time_current_sec();
+                long timeout = CFS_MAX_SCHEDULE_TIMEOUT;
+                int  dropped_lock = 0;
+
+                if (ksocknal_connd_check_stop(sec, &timeout)) {
+                        /* wakeup another one to check stop */
+                        cfs_waitq_signal(&ksocknal_data.ksnd_connd_waitq);
+                        break;
+                }
 
 
-                dropped_lock = 0;
+                if (ksocknal_connd_check_start(sec, &timeout)) {
+                        /* created new thread */
+                        dropped_lock = 1;
+                }
 
                 if (!cfs_list_empty(&ksocknal_data.ksnd_connd_connreqs)) {
                         /* Connection accepted by the listener */
 
                 if (!cfs_list_empty(&ksocknal_data.ksnd_connd_connreqs)) {
                         /* Connection accepted by the listener */
@@ -2068,7 +2178,7 @@ ksocknal_connd (void *arg)
                                             next, ksock_connreq_t, ksncr_list);
 
                         cfs_list_del(&cr->ksncr_list);
                                             next, ksock_connreq_t, ksncr_list);
 
                         cfs_list_del(&cr->ksncr_list);
-                        cfs_spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
+                        cfs_spin_unlock_bh(connd_lock);
                         dropped_lock = 1;
 
                         ksocknal_create_conn(cr->ksncr_ni, NULL,
                         dropped_lock = 1;
 
                         ksocknal_create_conn(cr->ksncr_ni, NULL,
@@ -2076,19 +2186,20 @@ ksocknal_connd (void *arg)
                         lnet_ni_decref(cr->ksncr_ni);
                         LIBCFS_FREE(cr, sizeof(*cr));
 
                         lnet_ni_decref(cr->ksncr_ni);
                         LIBCFS_FREE(cr, sizeof(*cr));
 
-                        cfs_spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
+                        cfs_spin_lock_bh(connd_lock);
                 }
 
                 }
 
-                /* Sleep till explicit wake_up if no pending routes present */
-                timeout = CFS_MAX_SCHEDULE_TIMEOUT;
-
-                /* Connection request */
-                route = ksocknal_connd_get_route_locked(&timeout);
-
+                /* Only handle an outgoing connection request if there
+                 * is a thread left to handle incoming connections and
+                 * create new connd */
+                if (ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV <
+                    ksocknal_data.ksnd_connd_running) {
+                        route = ksocknal_connd_get_route_locked(&timeout);
+                }
                 if (route != NULL) {
                         cfs_list_del (&route->ksnr_connd_list);
                         ksocknal_data.ksnd_connd_connecting++;
                 if (route != NULL) {
                         cfs_list_del (&route->ksnr_connd_list);
                         ksocknal_data.ksnd_connd_connecting++;
-                        cfs_spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
+                        cfs_spin_unlock_bh(connd_lock);
                         dropped_lock = 1;
 
                         if (ksocknal_connect(route)) {
                         dropped_lock = 1;
 
                         if (ksocknal_connect(route)) {
@@ -2105,17 +2216,17 @@ ksocknal_connd (void *arg)
 
                         ksocknal_route_decref(route);
 
 
                         ksocknal_route_decref(route);
 
-                        cfs_spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
+                        cfs_spin_lock_bh(connd_lock);
                         ksocknal_data.ksnd_connd_connecting--;
                 }
 
                 if (dropped_lock) {
                         if (++nloops < SOCKNAL_RESCHED)
                                 continue;
                         ksocknal_data.ksnd_connd_connecting--;
                 }
 
                 if (dropped_lock) {
                         if (++nloops < SOCKNAL_RESCHED)
                                 continue;
-                        cfs_spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
+                        cfs_spin_unlock_bh(connd_lock);
                         nloops = 0;
                         cfs_cond_resched();
                         nloops = 0;
                         cfs_cond_resched();
-                        cfs_spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
+                        cfs_spin_lock_bh(connd_lock);
                         continue;
                 }
 
                         continue;
                 }
 
@@ -2123,17 +2234,17 @@ ksocknal_connd (void *arg)
                 cfs_set_current_state (CFS_TASK_INTERRUPTIBLE);
                 cfs_waitq_add_exclusive (&ksocknal_data.ksnd_connd_waitq,
                                          &wait);
                 cfs_set_current_state (CFS_TASK_INTERRUPTIBLE);
                 cfs_waitq_add_exclusive (&ksocknal_data.ksnd_connd_waitq,
                                          &wait);
-                cfs_spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
+                cfs_spin_unlock_bh(connd_lock);
 
                 nloops = 0;
                 cfs_waitq_timedwait (&wait, CFS_TASK_INTERRUPTIBLE, timeout);
 
                 cfs_set_current_state (CFS_TASK_RUNNING);
                 cfs_waitq_del (&ksocknal_data.ksnd_connd_waitq, &wait);
 
                 nloops = 0;
                 cfs_waitq_timedwait (&wait, CFS_TASK_INTERRUPTIBLE, timeout);
 
                 cfs_set_current_state (CFS_TASK_RUNNING);
                 cfs_waitq_del (&ksocknal_data.ksnd_connd_waitq, &wait);
-                cfs_spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
+                cfs_spin_lock_bh(connd_lock);
         }
         }
-
-        cfs_spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
+        ksocknal_data.ksnd_connd_running--;
+        cfs_spin_unlock_bh(connd_lock);
 
         ksocknal_thread_fini ();
         return (0);
 
         ksocknal_thread_fini ();
         return (0);
index ff7049a..0dd1544 100644 (file)
@@ -43,7 +43,11 @@ CFS_MODULE_PARM(peer_timeout, "i", int, 0444,
 
 static int nconnds = 4;
 CFS_MODULE_PARM(nconnds, "i", int, 0444,
 
 static int nconnds = 4;
 CFS_MODULE_PARM(nconnds, "i", int, 0444,
-                "# connection daemons");
+                "# connection daemons while starting");
+
+static int nconnds_max = 64;
+CFS_MODULE_PARM(nconnds_max, "i", int, 0444,
+                "max # connection daemons");
 
 static int min_reconnectms = 1000;
 CFS_MODULE_PARM(min_reconnectms, "i", int, 0644,
 
 static int min_reconnectms = 1000;
 CFS_MODULE_PARM(min_reconnectms, "i", int, 0644,
@@ -169,6 +173,7 @@ int ksocknal_tunables_init(void)
         /* initialize ksocknal_tunables structure */
         ksocknal_tunables.ksnd_timeout            = &sock_timeout;
         ksocknal_tunables.ksnd_nconnds            = &nconnds;
         /* initialize ksocknal_tunables structure */
         ksocknal_tunables.ksnd_timeout            = &sock_timeout;
         ksocknal_tunables.ksnd_nconnds            = &nconnds;
+        ksocknal_tunables.ksnd_nconnds_max        = &nconnds_max;
         ksocknal_tunables.ksnd_min_reconnectms    = &min_reconnectms;
         ksocknal_tunables.ksnd_max_reconnectms    = &max_reconnectms;
         ksocknal_tunables.ksnd_eager_ack          = &eager_ack;
         ksocknal_tunables.ksnd_min_reconnectms    = &min_reconnectms;
         ksocknal_tunables.ksnd_max_reconnectms    = &max_reconnectms;
         ksocknal_tunables.ksnd_eager_ack          = &eager_ack;