lib_nal_t *gmni_libnal;
struct gm_port *gmni_port;
spinlock_t gmni_gm_lock; /* serialise GM calls */
+ atomic_t gmni_nthreads;
+ int gmni_nrxthreads;
long gmni_rxthread_pid[NRXTHREADS];
- int gmni_rxthread_stop_flag;
- spinlock_t gmni_rxthread_flag_lock;
- long gmni_rxthread_flag;
- long gmni_ctthread_pid;
- int gmni_ctthread_flag;
gm_alarm_t gmni_ctthread_alarm;
+ int gmni_thread_shutdown;
int gmni_msg_size;
struct list_head gmni_rxq;
spinlock_t gmni_rxq_lock;
struct semaphore gmni_rxq_wait;
} gmnal_ni_t;
-/*
- * Flags to start/stop and check status of threads
- * each rxthread sets 1 bit (any bit) of the flag on startup
- * and clears 1 bit when exiting
- */
-#define GMNAL_THREAD_RESET 0
-#define GMNAL_THREAD_STOP 666
-#define GMNAL_CTTHREAD_STARTED 333
-#define GMNAL_RXTHREADS_STARTED ( (1<<num_rx_threads)-1)
-
/*
* for ioctl get pid
void gmnal_return_tx(gmnal_ni_t *gmnalni, gmnal_tx_t *tx);
int gmnal_alloc_rxs(gmnal_ni_t *gmnalni);
void gmnal_free_rxs(gmnal_ni_t *gmnalni);
-void gmnal_stop_rxthread(gmnal_ni_t *gmnalni);
-void gmnal_stop_ctthread(gmnal_ni_t *gmnalni);
char *gmnal_gmstatus2str(gm_status_t status);
char *gmnal_rxevent2str(gm_recv_event_t *ev);
void gmnal_yield(int delay);
int gmnal_enqueue_rx(gmnal_ni_t *gmnalni, gm_recv_t *recv);
gmnal_rx_t *gmnal_dequeue_rx(gmnal_ni_t *gmnalni);
-int gmnal_start_kernel_threads(gmnal_ni_t *gmnalni);
+void gmnal_stop_threads(gmnal_ni_t *gmnalni);
+int gmnal_start_threads(gmnal_ni_t *gmnalni);
/* gmnal_comm.c */
void gmnal_pack_msg(gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
lib_msg_t *libmsg, ptl_nid_t nid, int nob);
/* Module Parameters */
-extern int num_rx_threads;
extern int num_txds;
extern int gm_port_id;
libcfs_nal_cmd_unregister(GMNAL);
/* stop processing messages */
- gmnal_stop_ctthread(gmnalni);
- gmnal_stop_rxthread(gmnalni);
+ gmnal_stop_threads(gmnalni);
gm_close(gmnalni->gmni_port);
gm_finalize();
/* Now that we have initialised the portals library, start receive
* threads, we do this to avoid processing messages before we can parse
* them */
- rc = gmnal_start_kernel_threads(gmnalni);
+ rc = gmnal_start_threads(gmnalni);
if (rc != 0) {
CERROR("Can't start threads: %d\n", rc);
goto failed_3;
return PTL_OK;
failed_4:
- gmnal_stop_rxthread(gmnalni);
- gmnal_stop_ctthread(gmnalni);
+ gmnal_stop_threads(gmnalni);
failed_3:
gm_close(gmnalni->gmni_port);
sprintf(current->comm, "gmnal_ct");
kportal_daemonize("gmnalctd");
- gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
-
- while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
+ while(!gmnalni->gmni_thread_shutdown) {
spin_lock(&gmnalni->gmni_gm_lock);
rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
spin_unlock(&gmnalni->gmni_gm_lock);
- if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
- CDEBUG(D_NET, "time to exit\n");
- break;
- }
-
CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent2str(rxevent));
if (GM_RECV_EVENT_TYPE(rxevent) == GM_RECV_EVENT) {
recv = (gm_recv_t*)&rxevent->recv;
gmnal_enqueue_rx(gmnalni, recv);
- } else {
- gm_unknown(gmnalni->gmni_port, rxevent);
- }
+ continue;
+ }
+
+ gm_unknown(gmnalni->gmni_port, rxevent);
}
- gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
- CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
+ CDEBUG(D_NET, "exiting\n");
+ atomic_dec(&gmnalni->gmni_nthreads);
return 0;
}
gmnal_rx_t *rx;
int rank;
- for (rank=0; rank<num_rx_threads; rank++)
+ for (rank = 0; rank < gmnalni->gmni_nrxthreads; rank++)
if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
break;
snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
kportal_daemonize(name);
- /*
- * set 1 bit for each thread started
- * doesn't matter which bit
- */
- spin_lock(&gmnalni->gmni_rxthread_flag_lock);
- if (gmnalni->gmni_rxthread_flag)
- gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
- else
- gmnalni->gmni_rxthread_flag = 1;
- spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
-
- while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
- CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
+ while(!gmnalni->gmni_thread_shutdown) {
rx = gmnal_dequeue_rx(gmnalni);
- if (rx == NULL) {
- CDEBUG(D_NET, "Receive thread time to exit\n");
+ if (rx == NULL)
break;
- }
-
+
/* We're connectionless: simply ignore packets on error */
if (gmnal_unpack_msg(gmnalni, rx) == 0) {
gmnal_post_rx(gmnalni, rx);
}
- spin_lock(&gmnalni->gmni_rxthread_flag_lock);
- gmnalni->gmni_rxthread_flag /= 2;
- spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
-
- CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
+ CDEBUG(D_NET, "exiting\n");
+ atomic_dec(&gmnalni->gmni_nthreads);
return 0;
}
#include "gmnal.h"
-/*
- * -1 indicates default value.
- * This is 1 thread per cpu
- * See start_kernel_threads
- */
-int num_rx_threads = -1;
int num_txds = 5;
int gm_port_id = 4;
gmnal_is_rxthread(gmnal_ni_t *gmnalni)
{
int i;
- for (i=0; i<num_rx_threads; i++) {
+
+ for (i = 0; i < gmnalni->gmni_nrxthreads; i++)
if (gmnalni->gmni_rxthread_pid[i] == current->pid)
- return(1);
- }
- return(0);
+ return 1;
+ return 0;
}
gmnal_tx_t *
}
void
-gmnal_stop_rxthread(gmnal_ni_t *gmnalni)
-{
- int count = 2;
- int i;
-
- gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_STOP;
-
- for (i = 0; i < num_rx_threads; i++)
- up(&gmnalni->gmni_rxq_wait);
-
- while (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) {
- CDEBUG(D_NET, "gmnal_stop_rxthread sleeping\n");
- gmnal_yield(1);
-
- count++;
- if ((count & (count - 1)) == 0)
- CWARN("Waiting for rxthreads to stop\n");
- }
-}
-
-void
-gmnal_stop_ctthread(gmnal_ni_t *gmnalni)
+gmnal_stop_threads(gmnal_ni_t *gmnalni)
{
int count = 2;
+ int i;
- gmnalni->gmni_ctthread_flag = GMNAL_THREAD_STOP;
+ gmnalni->gmni_thread_shutdown = 1;
+ /* wake ctthread with an alarm */
spin_lock(&gmnalni->gmni_gm_lock);
- gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, 10,
- NULL, NULL);
+ gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm,
+ 0, NULL, NULL);
spin_unlock(&gmnalni->gmni_gm_lock);
- while (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
- CDEBUG(D_NET, "gmnal_stop_ctthread sleeping\n");
- gmnal_yield(1);
+ /* wake each rxthread */
+ for (i = 0; i < num_online_cpus(); i++)
+ up(&gmnalni->gmni_rxq_wait);
+
+ while (atomic_read(&gmnalni->gmni_nthreads) != 0) {
count++;
if ((count & (count - 1)) == 0)
- CWARN("Waiting for ctthread to stop\n");
+ CWARN("Waiting for %d threads to stop\n",
+ atomic_read(&gmnalni->gmni_nthreads));
+ gmnal_yield(1);
}
}
* callback events or sleeps.
*/
int
-gmnal_start_kernel_threads(gmnal_ni_t *gmnalni)
+gmnal_start_threads(gmnal_ni_t *gmnalni)
{
+ int i;
+ int pid;
+
+ gmnalni->gmni_thread_shutdown = 0;
+ gmnalni->gmni_nrxthreads = 0;
+ atomic_set(&gmnalni->gmni_nthreads, 0);
- int threads = 0;
- int flag;
-
INIT_LIST_HEAD(&gmnalni->gmni_rxq);
spin_lock_init(&gmnalni->gmni_rxq_lock);
sema_init(&gmnalni->gmni_rxq_wait, 0);
CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n");
gm_initialize_alarm(&gmnalni->gmni_ctthread_alarm);
- CDEBUG(D_NET, "Starting caretaker thread\n");
- gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
- gmnalni->gmni_ctthread_pid =
- kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0);
- if (gmnalni->gmni_ctthread_pid <= 0) {
- CERROR("Caretaker thread failed to start\n");
- return -ENOMEM;
- }
-
- while (gmnalni->gmni_ctthread_flag != GMNAL_THREAD_RESET) {
- gmnal_yield(1);
- CDEBUG(D_NET, "Waiting for caretaker thread signs of life\n");
+ pid = kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0);
+ if (pid < 0) {
+ CERROR("Caretaker thread failed to start: %d\n", pid);
+ return pid;
}
+ atomic_inc(&gmnalni->gmni_nthreads);
- CDEBUG(D_NET, "caretaker thread has started\n");
-
- /*
- * Now start a number of receiver threads
- * these treads get work to do from the caretaker (ct) thread
- */
- gmnalni->gmni_rxthread_flag = GMNAL_THREAD_RESET;
- gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_RESET;
-
- spin_lock_init(&gmnalni->gmni_rxthread_flag_lock);
- for (threads=0; threads<NRXTHREADS; threads++)
- gmnalni->gmni_rxthread_pid[threads] = -1;
+ for (i = 0; i < num_online_cpus(); i++) {
- /*
- * If the default number of receive threades isn't
- * modified at load time, then start one thread per cpu
- */
- if (num_rx_threads == -1)
- num_rx_threads = smp_num_cpus;
-
- CDEBUG(D_NET, "Starting [%d] receive threads\n", num_rx_threads);
- for (threads=0; threads<num_rx_threads; threads++) {
- gmnalni->gmni_rxthread_pid[threads] =
- kernel_thread(gmnal_rx_thread, (void*)gmnalni, 0);
- if (gmnalni->gmni_rxthread_pid[threads] <= 0) {
- CERROR("Receive thread failed to start\n");
- gmnal_stop_rxthread(gmnalni);
- gmnal_stop_ctthread(gmnalni);
- return -ENOMEM;
- }
- }
-
- for (;;) {
- spin_lock(&gmnalni->gmni_rxthread_flag_lock);
- flag = gmnalni->gmni_rxthread_flag;
- spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
-
- if (flag == GMNAL_RXTHREADS_STARTED)
- break;
+ pid = kernel_thread(gmnal_rx_thread, (void*)gmnalni, 0);
+ if (pid < 0) {
+ CERROR("rx thread failed to start: %d\n", pid);
+ gmnal_stop_threads(gmnalni);
+ return pid;
+ }
- gmnal_yield(1);
+ atomic_inc(&gmnalni->gmni_nthreads);
+ gmnalni->gmni_rxthread_pid[i] = pid;
+ gmnalni->gmni_nrxthreads++;
}
- CDEBUG(D_NET, "receive threads seem to have started\n");
-
return 0;
}
while(down_interruptible(&gmnalni->gmni_rxq_wait) != 0)
/* do nothing */;
- if (gmnalni->gmni_rxthread_stop_flag == GMNAL_THREAD_STOP)
+ if (gmnalni->gmni_thread_shutdown)
return NULL;
spin_lock(&gmnalni->gmni_rxq_lock);