From 8483950e2edce4eee194f44f99da09fc839de3a2 Mon Sep 17 00:00:00 2001 From: eeb Date: Mon, 22 Aug 2005 11:42:01 +0000 Subject: [PATCH] * simplified gmnal thread startup/shutdown --- lnet/klnds/gmlnd/gmlnd.h | 24 ++------ lnet/klnds/gmlnd/gmlnd_api.c | 8 +-- lnet/klnds/gmlnd/gmlnd_comm.c | 49 ++++----------- lnet/klnds/gmlnd/gmlnd_module.c | 6 -- lnet/klnds/gmlnd/gmlnd_utils.c | 132 +++++++++++++--------------------------- 5 files changed, 62 insertions(+), 157 deletions(-) diff --git a/lnet/klnds/gmlnd/gmlnd.h b/lnet/klnds/gmlnd/gmlnd.h index 4c7b1b8..47d71eb 100644 --- a/lnet/klnds/gmlnd/gmlnd.h +++ b/lnet/klnds/gmlnd/gmlnd.h @@ -162,29 +162,17 @@ typedef struct gmnal_ni { lib_nal_t *gmni_libnal; struct gm_port *gmni_port; spinlock_t gmni_gm_lock; /* serialise GM calls */ + atomic_t gmni_nthreads; + int gmni_nrxthreads; long gmni_rxthread_pid[NRXTHREADS]; - int gmni_rxthread_stop_flag; - spinlock_t gmni_rxthread_flag_lock; - long gmni_rxthread_flag; - long gmni_ctthread_pid; - int gmni_ctthread_flag; gm_alarm_t gmni_ctthread_alarm; + int gmni_thread_shutdown; int gmni_msg_size; struct list_head gmni_rxq; spinlock_t gmni_rxq_lock; struct semaphore gmni_rxq_wait; } gmnal_ni_t; -/* - * Flags to start/stop and check status of threads - * each rxthread sets 1 bit (any bit) of the flag on startup - * and clears 1 bit when exiting - */ -#define GMNAL_THREAD_RESET 0 -#define GMNAL_THREAD_STOP 666 -#define GMNAL_CTTHREAD_STARTED 333 -#define GMNAL_RXTHREADS_STARTED ( (1<gmni_port); gm_finalize(); @@ -255,7 +254,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, /* Now that we have initialised the portals library, start receive * threads, we do this to avoid processing messages before we can parse * them */ - rc = gmnal_start_kernel_threads(gmnalni); + rc = gmnal_start_threads(gmnalni); if (rc != 0) { CERROR("Can't start threads: %d\n", rc); goto failed_3; @@ -271,8 +270,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid, return PTL_OK; failed_4: - gmnal_stop_rxthread(gmnalni); - gmnal_stop_ctthread(gmnalni); + gmnal_stop_threads(gmnalni); failed_3: gm_close(gmnalni->gmni_port); diff --git a/lnet/klnds/gmlnd/gmlnd_comm.c b/lnet/klnds/gmlnd/gmlnd_comm.c index 2b77ea3..3b4baa0 100644 --- a/lnet/klnds/gmlnd/gmlnd_comm.c +++ b/lnet/klnds/gmlnd/gmlnd_comm.c @@ -133,31 +133,25 @@ gmnal_ct_thread(void *arg) sprintf(current->comm, "gmnal_ct"); kportal_daemonize("gmnalctd"); - gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED; - - while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) { + while(!gmnalni->gmni_thread_shutdown) { spin_lock(&gmnalni->gmni_gm_lock); rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port); spin_unlock(&gmnalni->gmni_gm_lock); - if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) { - CDEBUG(D_NET, "time to exit\n"); - break; - } - CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent2str(rxevent)); if (GM_RECV_EVENT_TYPE(rxevent) == GM_RECV_EVENT) { recv = (gm_recv_t*)&rxevent->recv; gmnal_enqueue_rx(gmnalni, recv); - } else { - gm_unknown(gmnalni->gmni_port, rxevent); - } + continue; + } + + gm_unknown(gmnalni->gmni_port, rxevent); } - gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET; - CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni); + CDEBUG(D_NET, "exiting\n"); + atomic_dec(&gmnalni->gmni_nthreads); return 0; } @@ -173,33 +167,19 @@ gmnal_rx_thread(void *arg) gmnal_rx_t *rx; int rank; - for (rank=0; rankgmni_nrxthreads; rank++) if (gmnalni->gmni_rxthread_pid[rank] == current->pid) break; snprintf(name, sizeof(name), "gmnal_rx_%d", rank); kportal_daemonize(name); - /* - * set 1 bit for each thread started - * doesn't matter which bit - */ - spin_lock(&gmnalni->gmni_rxthread_flag_lock); - if (gmnalni->gmni_rxthread_flag) - gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1; - else - gmnalni->gmni_rxthread_flag = 1; - spin_unlock(&gmnalni->gmni_rxthread_flag_lock); - - while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) { - CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n"); + while(!gmnalni->gmni_thread_shutdown) { rx = gmnal_dequeue_rx(gmnalni); - if (rx == NULL) { - CDEBUG(D_NET, "Receive thread time to exit\n"); + if (rx == NULL) break; - } - + /* We're connectionless: simply ignore packets on error */ if (gmnal_unpack_msg(gmnalni, rx) == 0) { @@ -213,11 +193,8 @@ gmnal_rx_thread(void *arg) gmnal_post_rx(gmnalni, rx); } - spin_lock(&gmnalni->gmni_rxthread_flag_lock); - gmnalni->gmni_rxthread_flag /= 2; - spin_unlock(&gmnalni->gmni_rxthread_flag_lock); - - CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni); + CDEBUG(D_NET, "exiting\n"); + atomic_dec(&gmnalni->gmni_nthreads); return 0; } diff --git a/lnet/klnds/gmlnd/gmlnd_module.c b/lnet/klnds/gmlnd/gmlnd_module.c index 446f265..449c331 100644 --- a/lnet/klnds/gmlnd/gmlnd_module.c +++ b/lnet/klnds/gmlnd/gmlnd_module.c @@ -22,12 +22,6 @@ #include "gmnal.h" -/* - * -1 indicates default value. - * This is 1 thread per cpu - * See start_kernel_threads - */ -int num_rx_threads = -1; int num_txds = 5; int gm_port_id = 4; diff --git a/lnet/klnds/gmlnd/gmlnd_utils.c b/lnet/klnds/gmlnd/gmlnd_utils.c index b4a1a99..00bedf5 100644 --- a/lnet/klnds/gmlnd/gmlnd_utils.c +++ b/lnet/klnds/gmlnd/gmlnd_utils.c @@ -31,11 +31,11 @@ int gmnal_is_rxthread(gmnal_ni_t *gmnalni) { int i; - for (i=0; igmni_nrxthreads; i++) if (gmnalni->gmni_rxthread_pid[i] == current->pid) - return(1); - } - return(0); + return 1; + return 0; } gmnal_tx_t * @@ -344,44 +344,29 @@ gmnal_free_rxs(gmnal_ni_t *gmnalni) } void -gmnal_stop_rxthread(gmnal_ni_t *gmnalni) -{ - int count = 2; - int i; - - gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_STOP; - - for (i = 0; i < num_rx_threads; i++) - up(&gmnalni->gmni_rxq_wait); - - while (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) { - CDEBUG(D_NET, "gmnal_stop_rxthread sleeping\n"); - gmnal_yield(1); - - count++; - if ((count & (count - 1)) == 0) - CWARN("Waiting for rxthreads to stop\n"); - } -} - -void -gmnal_stop_ctthread(gmnal_ni_t *gmnalni) +gmnal_stop_threads(gmnal_ni_t *gmnalni) { int count = 2; + int i; - gmnalni->gmni_ctthread_flag = GMNAL_THREAD_STOP; + gmnalni->gmni_thread_shutdown = 1; + /* wake ctthread with an alarm */ spin_lock(&gmnalni->gmni_gm_lock); - gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, 10, - NULL, NULL); + gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, + 0, NULL, NULL); spin_unlock(&gmnalni->gmni_gm_lock); - while (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) { - CDEBUG(D_NET, "gmnal_stop_ctthread sleeping\n"); - gmnal_yield(1); + /* wake each rxthread */ + for (i = 0; i < num_online_cpus(); i++) + up(&gmnalni->gmni_rxq_wait); + + while (atomic_read(&gmnalni->gmni_nthreads) != 0) { count++; if ((count & (count - 1)) == 0) - CWARN("Waiting for ctthread to stop\n"); + CWARN("Waiting for %d threads to stop\n", + atomic_read(&gmnalni->gmni_nthreads)); + gmnal_yield(1); } } @@ -393,12 +378,15 @@ gmnal_stop_ctthread(gmnal_ni_t *gmnalni) * callback events or sleeps. */ int -gmnal_start_kernel_threads(gmnal_ni_t *gmnalni) +gmnal_start_threads(gmnal_ni_t *gmnalni) { + int i; + int pid; + + gmnalni->gmni_thread_shutdown = 0; + gmnalni->gmni_nrxthreads = 0; + atomic_set(&gmnalni->gmni_nthreads, 0); - int threads = 0; - int flag; - INIT_LIST_HEAD(&gmnalni->gmni_rxq); spin_lock_init(&gmnalni->gmni_rxq_lock); sema_init(&gmnalni->gmni_rxq_wait, 0); @@ -410,65 +398,27 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni) CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n"); gm_initialize_alarm(&gmnalni->gmni_ctthread_alarm); - CDEBUG(D_NET, "Starting caretaker thread\n"); - gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET; - gmnalni->gmni_ctthread_pid = - kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0); - if (gmnalni->gmni_ctthread_pid <= 0) { - CERROR("Caretaker thread failed to start\n"); - return -ENOMEM; - } - - while (gmnalni->gmni_ctthread_flag != GMNAL_THREAD_RESET) { - gmnal_yield(1); - CDEBUG(D_NET, "Waiting for caretaker thread signs of life\n"); + pid = kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0); + if (pid < 0) { + CERROR("Caretaker thread failed to start: %d\n", pid); + return pid; } + atomic_inc(&gmnalni->gmni_nthreads); - CDEBUG(D_NET, "caretaker thread has started\n"); - - /* - * Now start a number of receiver threads - * these treads get work to do from the caretaker (ct) thread - */ - gmnalni->gmni_rxthread_flag = GMNAL_THREAD_RESET; - gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_RESET; - - spin_lock_init(&gmnalni->gmni_rxthread_flag_lock); - for (threads=0; threadsgmni_rxthread_pid[threads] = -1; + for (i = 0; i < num_online_cpus(); i++) { - /* - * If the default number of receive threades isn't - * modified at load time, then start one thread per cpu - */ - if (num_rx_threads == -1) - num_rx_threads = smp_num_cpus; - - CDEBUG(D_NET, "Starting [%d] receive threads\n", num_rx_threads); - for (threads=0; threadsgmni_rxthread_pid[threads] = - kernel_thread(gmnal_rx_thread, (void*)gmnalni, 0); - if (gmnalni->gmni_rxthread_pid[threads] <= 0) { - CERROR("Receive thread failed to start\n"); - gmnal_stop_rxthread(gmnalni); - gmnal_stop_ctthread(gmnalni); - return -ENOMEM; - } - } - - for (;;) { - spin_lock(&gmnalni->gmni_rxthread_flag_lock); - flag = gmnalni->gmni_rxthread_flag; - spin_unlock(&gmnalni->gmni_rxthread_flag_lock); - - if (flag == GMNAL_RXTHREADS_STARTED) - break; + pid = kernel_thread(gmnal_rx_thread, (void*)gmnalni, 0); + if (pid < 0) { + CERROR("rx thread failed to start: %d\n", pid); + gmnal_stop_threads(gmnalni); + return pid; + } - gmnal_yield(1); + atomic_inc(&gmnalni->gmni_nthreads); + gmnalni->gmni_rxthread_pid[i] = pid; + gmnalni->gmni_nrxthreads++; } - CDEBUG(D_NET, "receive threads seem to have started\n"); - return 0; } @@ -760,7 +710,7 @@ gmnal_dequeue_rx(gmnal_ni_t *gmnalni) while(down_interruptible(&gmnalni->gmni_rxq_wait) != 0) /* do nothing */; - if (gmnalni->gmni_rxthread_stop_flag == GMNAL_THREAD_STOP) + if (gmnalni->gmni_thread_shutdown) return NULL; spin_lock(&gmnalni->gmni_rxq_lock); -- 1.8.3.1