list_add_tail (&route->ksnr_connd_list,
&ksocknal_data.ksnd_connd_routes);
+
cfs_waitq_signal (&ksocknal_data.ksnd_connd_waitq);
cfs_spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
return 0;
}
-/* Go through connd_routes queue looking for a route that
- we can process right now */
+/*
+ * check whether we need to create more connds.
+ * It will try to create new thread if it's necessary, @timeout can
+ * be updated if failed to create, so caller wouldn't keep try while
+ * running out of resource.
+ */
+static int
+ksocknal_connd_check_start(long sec, long *timeout)
+{
+ int rc;
+ int total = ksocknal_data.ksnd_connd_starting +
+ ksocknal_data.ksnd_connd_running;
+
+ if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
+ /* still in initializing */
+ return 0;
+ }
+
+ if (total >= *ksocknal_tunables.ksnd_nconnds_max ||
+ total > ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV) {
+ /* can't create more connd, or still have enough
+ * threads to handle more connecting */
+ return 0;
+ }
+
+ if (list_empty(&ksocknal_data.ksnd_connd_routes)) {
+ /* no pending connecting request */
+ return 0;
+ }
+
+ if (sec - ksocknal_data.ksnd_connd_failed_stamp <= 1) {
+ /* may run out of resource, retry later */
+ *timeout = cfs_time_seconds(1);
+ return 0;
+ }
+
+ if (ksocknal_data.ksnd_connd_starting > 0) {
+ /* serialize starting to avoid flood */
+ return 0;
+ }
+
+ ksocknal_data.ksnd_connd_starting_stamp = sec;
+ ksocknal_data.ksnd_connd_starting++;
+ cfs_spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
+
+ /* NB: total is the next id */
+ rc = ksocknal_thread_start(ksocknal_connd, (void *)((long)total));
+
+ cfs_spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
+ if (rc == 0)
+ return 1;
+
+ /* we tried ... */
+ LASSERT(ksocknal_data.ksnd_connd_starting > 0);
+ ksocknal_data.ksnd_connd_starting--;
+ ksocknal_data.ksnd_connd_failed_stamp = cfs_time_current_sec();
+
+ return 1;
+}
+
+/*
+ * check whether current thread can exit, it will return 1 if there are too
+ * many threads and no creating in past 120 seconds.
+ * Also, this function may update @timeout to make caller come back
+ * again to recheck these conditions.
+ */
+static int
+ksocknal_connd_check_stop(long sec, long *timeout)
+{
+ int val;
+
+ if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
+ /* still in initializing */
+ return 0;
+ }
+
+ if (ksocknal_data.ksnd_connd_starting > 0) {
+ /* in progress of starting new thread */
+ return 0;
+ }
+
+ if (ksocknal_data.ksnd_connd_running <=
+ *ksocknal_tunables.ksnd_nconnds) { /* can't shrink */
+ return 0;
+ }
+
+ /* created thread in past 120 seconds? */
+ val = (int)(ksocknal_data.ksnd_connd_starting_stamp +
+ SOCKNAL_CONND_TIMEOUT - sec);
+
+ *timeout = (val > 0) ? cfs_time_seconds(val) :
+ cfs_time_seconds(SOCKNAL_CONND_TIMEOUT);
+ if (val > 0)
+ return 0;
+
+ /* no creating in past 120 seconds */
+
+ return ksocknal_data.ksnd_connd_running >
+ ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV;
+}
+
+/* Go through connd_routes queue looking for a route that we can process
+ * right now, @timeout_p can be updated if we need to come back later */
static ksock_route_t *
ksocknal_connd_get_route_locked(signed long *timeout_p)
{
ksock_route_t *route;
cfs_time_t now;
- /* Only handle an outgoing connection request if there
- * is someone left to handle incoming connections */
- if ((ksocknal_data.ksnd_connd_connecting + 1) >=
- *ksocknal_tunables.ksnd_nconnds)
- return NULL;
-
now = cfs_time_current();
/* connd_routes can contain both pending and ordinary routes */
int
ksocknal_connd (void *arg)
{
+ cfs_spinlock_t *connd_lock = &ksocknal_data.ksnd_connd_lock;
long id = (long)arg;
char name[16];
ksock_connreq_t *cr;
- ksock_route_t *route;
cfs_waitlink_t wait;
- signed long timeout;
int nloops = 0;
int cons_retry = 0;
- int dropped_lock;
snprintf (name, sizeof (name), "socknal_cd%02ld", id);
cfs_daemonize (name);
cfs_waitlink_init (&wait);
- cfs_spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
+ cfs_spin_lock_bh (connd_lock);
+
+ LASSERT(ksocknal_data.ksnd_connd_starting > 0);
+ ksocknal_data.ksnd_connd_starting--;
+ ksocknal_data.ksnd_connd_running++;
while (!ksocknal_data.ksnd_shuttingdown) {
+ ksock_route_t *route = NULL;
+ long sec = cfs_time_current_sec();
+ long timeout = CFS_MAX_SCHEDULE_TIMEOUT;
+ int dropped_lock = 0;
+
+ if (ksocknal_connd_check_stop(sec, &timeout)) {
+ /* wakeup another one to check stop */
+ cfs_waitq_signal(&ksocknal_data.ksnd_connd_waitq);
+ break;
+ }
- dropped_lock = 0;
+ if (ksocknal_connd_check_start(sec, &timeout)) {
+ /* created new thread */
+ dropped_lock = 1;
+ }
if (!list_empty(&ksocknal_data.ksnd_connd_connreqs)) {
/* Connection accepted by the listener */
ksock_connreq_t, ksncr_list);
list_del(&cr->ksncr_list);
- cfs_spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
+ cfs_spin_unlock_bh (connd_lock);
dropped_lock = 1;
ksocknal_create_conn(cr->ksncr_ni, NULL,
lnet_ni_decref(cr->ksncr_ni);
LIBCFS_FREE(cr, sizeof(*cr));
- cfs_spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
+ cfs_spin_lock_bh (connd_lock);
}
- /* Sleep till explicit wake_up if no pending routes present */
- timeout = CFS_MAX_SCHEDULE_TIMEOUT;
-
- /* Connection request */
- route = ksocknal_connd_get_route_locked(&timeout);
+ /* Only handle an outgoing connection request if there
+ * is a thread left to handle incoming connections and
+ * create new connd */
+ if (ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV <
+ ksocknal_data.ksnd_connd_running) {
+ route = ksocknal_connd_get_route_locked(&timeout);
+ }
if (route != NULL) {
list_del (&route->ksnr_connd_list);
+
ksocknal_data.ksnd_connd_connecting++;
- cfs_spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
+ cfs_spin_unlock_bh (connd_lock);
dropped_lock = 1;
if (ksocknal_connect(route)) {
ksocknal_route_decref(route);
- cfs_spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
+ cfs_spin_lock_bh (connd_lock);
ksocknal_data.ksnd_connd_connecting--;
}
if (dropped_lock) {
if (++nloops < SOCKNAL_RESCHED)
continue;
- cfs_spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
+ cfs_spin_unlock_bh(connd_lock);
nloops = 0;
cfs_cond_resched();
- cfs_spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
+ cfs_spin_lock_bh(connd_lock);
continue;
}
/* Nothing to do for 'timeout' */
cfs_set_current_state (CFS_TASK_INTERRUPTIBLE);
cfs_waitq_add_exclusive (&ksocknal_data.ksnd_connd_waitq, &wait);
- cfs_spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
+ cfs_spin_unlock_bh (connd_lock);
nloops = 0;
cfs_waitq_timedwait (&wait, CFS_TASK_INTERRUPTIBLE, timeout);
cfs_set_current_state (CFS_TASK_RUNNING);
cfs_waitq_del (&ksocknal_data.ksnd_connd_waitq, &wait);
- cfs_spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
+ cfs_spin_lock_bh (connd_lock);
}
- cfs_spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
+ ksocknal_data.ksnd_connd_running--;
+ cfs_spin_unlock_bh (connd_lock);
ksocknal_thread_fini ();
return (0);