Whamcloud - gitweb
* simplified gmnal thread startup/shutdown
authoreeb <eeb>
Mon, 22 Aug 2005 11:42:01 +0000 (11:42 +0000)
committereeb <eeb>
Mon, 22 Aug 2005 11:42:01 +0000 (11:42 +0000)
lnet/klnds/gmlnd/gmlnd.h
lnet/klnds/gmlnd/gmlnd_api.c
lnet/klnds/gmlnd/gmlnd_comm.c
lnet/klnds/gmlnd/gmlnd_module.c
lnet/klnds/gmlnd/gmlnd_utils.c

index 4c7b1b8..47d71eb 100644 (file)
@@ -162,29 +162,17 @@ typedef struct gmnal_ni {
         lib_nal_t       *gmni_libnal;
         struct gm_port  *gmni_port;
         spinlock_t       gmni_gm_lock;          /* serialise GM calls */
+        atomic_t         gmni_nthreads;
+        int              gmni_nrxthreads;
         long             gmni_rxthread_pid[NRXTHREADS];
-        int              gmni_rxthread_stop_flag;
-        spinlock_t       gmni_rxthread_flag_lock;
-        long             gmni_rxthread_flag;
-        long             gmni_ctthread_pid;
-        int              gmni_ctthread_flag;
         gm_alarm_t       gmni_ctthread_alarm;
+        int              gmni_thread_shutdown;
         int              gmni_msg_size;
         struct list_head gmni_rxq;
         spinlock_t       gmni_rxq_lock;
         struct semaphore gmni_rxq_wait;
 } gmnal_ni_t;
 
-/*
- *      Flags to start/stop and check status of threads
- *      each rxthread sets 1 bit (any bit) of the flag on startup
- *      and clears 1 bit when exiting
- */
-#define GMNAL_THREAD_RESET      0
-#define GMNAL_THREAD_STOP       666
-#define GMNAL_CTTHREAD_STARTED  333
-#define GMNAL_RXTHREADS_STARTED ( (1<<num_rx_threads)-1)
-
 
 /*
  * for ioctl get pid
@@ -225,14 +213,13 @@ gmnal_tx_t *gmnal_get_tx(gmnal_ni_t *gmnalni, int block);
 void gmnal_return_tx(gmnal_ni_t *gmnalni, gmnal_tx_t *tx);
 int gmnal_alloc_rxs(gmnal_ni_t *gmnalni);
 void gmnal_free_rxs(gmnal_ni_t *gmnalni);
-void gmnal_stop_rxthread(gmnal_ni_t *gmnalni);
-void gmnal_stop_ctthread(gmnal_ni_t *gmnalni);
 char *gmnal_gmstatus2str(gm_status_t status);
 char *gmnal_rxevent2str(gm_recv_event_t *ev);
 void gmnal_yield(int delay);
 int gmnal_enqueue_rx(gmnal_ni_t *gmnalni, gm_recv_t *recv);
 gmnal_rx_t *gmnal_dequeue_rx(gmnal_ni_t *gmnalni);
-int gmnal_start_kernel_threads(gmnal_ni_t *gmnalni);
+void gmnal_stop_threads(gmnal_ni_t *gmnalni);
+int gmnal_start_threads(gmnal_ni_t *gmnalni);
 
 /* gmnal_comm.c */
 void gmnal_pack_msg(gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
@@ -244,7 +231,6 @@ ptl_err_t gmnal_post_tx(gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
                         lib_msg_t *libmsg, ptl_nid_t nid, int nob);
 
 /* Module Parameters */
-extern  int num_rx_threads;
 extern  int num_txds;
 extern  int gm_port_id;
 
index 18e4976..6597cb5 100644 (file)
@@ -130,8 +130,7 @@ gmnal_api_shutdown(nal_t *nal)
         libcfs_nal_cmd_unregister(GMNAL);
 
         /* stop processing messages */
-       gmnal_stop_ctthread(gmnalni);
-       gmnal_stop_rxthread(gmnalni);
+        gmnal_stop_threads(gmnalni);
 
        gm_close(gmnalni->gmni_port);
        gm_finalize();
@@ -255,7 +254,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
        /* Now that we have initialised the portals library, start receive
         * threads, we do this to avoid processing messages before we can parse
         * them */
-       rc = gmnal_start_kernel_threads(gmnalni);
+       rc = gmnal_start_threads(gmnalni);
         if (rc != 0) {
                 CERROR("Can't start threads: %d\n", rc);
                 goto failed_3;
@@ -271,8 +270,7 @@ gmnal_api_startup(nal_t *nal, ptl_pid_t requested_pid,
        return PTL_OK;
 
  failed_4:
-       gmnal_stop_rxthread(gmnalni);
-       gmnal_stop_ctthread(gmnalni);
+       gmnal_stop_threads(gmnalni);
 
  failed_3:
         gm_close(gmnalni->gmni_port);
index 2b77ea3..3b4baa0 100644 (file)
@@ -133,31 +133,25 @@ gmnal_ct_thread(void *arg)
        sprintf(current->comm, "gmnal_ct");
        kportal_daemonize("gmnalctd");
 
-       gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
-
-       while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
+       while(!gmnalni->gmni_thread_shutdown) {
 
                 spin_lock(&gmnalni->gmni_gm_lock);
                rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
                 spin_unlock(&gmnalni->gmni_gm_lock);
 
-               if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
-                       CDEBUG(D_NET, "time to exit\n");
-                       break;
-               }
-
                CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent2str(rxevent));
 
                if (GM_RECV_EVENT_TYPE(rxevent) == GM_RECV_EVENT) {
                         recv = (gm_recv_t*)&rxevent->recv;
                         gmnal_enqueue_rx(gmnalni, recv);
-                } else {
-                        gm_unknown(gmnalni->gmni_port, rxevent);
-               }
+                        continue;
+                }
+
+                gm_unknown(gmnalni->gmni_port, rxevent);
        }
 
-       gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
-       CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
+       CDEBUG(D_NET, "exiting\n");
+        atomic_dec(&gmnalni->gmni_nthreads);
        return 0;
 }
 
@@ -173,33 +167,19 @@ gmnal_rx_thread(void *arg)
        gmnal_rx_t    *rx;
        int            rank;
 
-       for (rank=0; rank<num_rx_threads; rank++)
+       for (rank = 0; rank < gmnalni->gmni_nrxthreads; rank++)
                if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
                        break;
 
        snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
        kportal_daemonize(name);
 
-       /*
-        *      set 1 bit for each thread started
-        *      doesn't matter which bit
-        */
-       spin_lock(&gmnalni->gmni_rxthread_flag_lock);
-       if (gmnalni->gmni_rxthread_flag)
-               gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
-       else
-               gmnalni->gmni_rxthread_flag = 1;
-       spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
-
-       while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
-               CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
+       while(!gmnalni->gmni_thread_shutdown) {
 
                rx = gmnal_dequeue_rx(gmnalni);
-               if (rx == NULL) {
-                       CDEBUG(D_NET, "Receive thread time to exit\n");
+               if (rx == NULL)
                        break;
-               }
-                
+
                 /* We're connectionless: simply ignore packets on error */
                 
                 if (gmnal_unpack_msg(gmnalni, rx) == 0) {
@@ -213,11 +193,8 @@ gmnal_rx_thread(void *arg)
                 gmnal_post_rx(gmnalni, rx);
        }
 
-       spin_lock(&gmnalni->gmni_rxthread_flag_lock);
-       gmnalni->gmni_rxthread_flag /= 2;
-       spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
-
-       CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
+       CDEBUG(D_NET, "exiting\n");
+        atomic_dec(&gmnalni->gmni_nthreads);
        return 0;
 }
 
index 446f265..449c331 100644 (file)
 #include "gmnal.h"
 
 
-/*
- *      -1 indicates default value.
- *      This is 1 thread per cpu
- *      See start_kernel_threads
- */
-int num_rx_threads = -1;
 int num_txds = 5;
 int gm_port_id = 4;
 
index b4a1a99..00bedf5 100644 (file)
@@ -31,11 +31,11 @@ int
 gmnal_is_rxthread(gmnal_ni_t *gmnalni)
 {
        int i;
-       for (i=0; i<num_rx_threads; i++) {
+
+       for (i = 0; i < gmnalni->gmni_nrxthreads; i++)
                if (gmnalni->gmni_rxthread_pid[i] == current->pid)
-                       return(1);
-       }
-       return(0);
+                       return 1;
+       return 0;
 }
 
 gmnal_tx_t *
@@ -344,44 +344,29 @@ gmnal_free_rxs(gmnal_ni_t *gmnalni)
 }
 
 void
-gmnal_stop_rxthread(gmnal_ni_t *gmnalni)
-{
-       int     count = 2;
-        int     i;
-       
-       gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_STOP;
-
-        for (i = 0; i < num_rx_threads; i++)
-                up(&gmnalni->gmni_rxq_wait);
-
-       while (gmnalni->gmni_rxthread_flag != GMNAL_THREAD_RESET) {
-               CDEBUG(D_NET, "gmnal_stop_rxthread sleeping\n");
-                gmnal_yield(1);
-
-                count++;
-                if ((count & (count - 1)) == 0)
-                        CWARN("Waiting for rxthreads to stop\n");
-       }
-}
-
-void
-gmnal_stop_ctthread(gmnal_ni_t *gmnalni)
+gmnal_stop_threads(gmnal_ni_t *gmnalni)
 {
         int count = 2;
+        int i;
 
-       gmnalni->gmni_ctthread_flag = GMNAL_THREAD_STOP;
+        gmnalni->gmni_thread_shutdown = 1;
 
+        /* wake ctthread with an alarm */
        spin_lock(&gmnalni->gmni_gm_lock);
-       gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, 10, 
-                    NULL, NULL);
+       gm_set_alarm(gmnalni->gmni_port, &gmnalni->gmni_ctthread_alarm, 
+                     0, NULL, NULL);
        spin_unlock(&gmnalni->gmni_gm_lock);
 
-       while (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
-               CDEBUG(D_NET, "gmnal_stop_ctthread sleeping\n");
-                gmnal_yield(1);
+        /* wake each rxthread */
+        for (i = 0; i < num_online_cpus(); i++)
+                up(&gmnalni->gmni_rxq_wait);
+        
+       while (atomic_read(&gmnalni->gmni_nthreads) != 0) {
                 count++;
                 if ((count & (count - 1)) == 0)
-                        CWARN("Waiting for ctthread to stop\n");
+                        CWARN("Waiting for %d threads to stop\n",
+                              atomic_read(&gmnalni->gmni_nthreads));
+                gmnal_yield(1);
        }
 }
 
@@ -393,12 +378,15 @@ gmnal_stop_ctthread(gmnal_ni_t *gmnalni)
  *     callback events or sleeps.
  */
 int
-gmnal_start_kernel_threads(gmnal_ni_t *gmnalni)
+gmnal_start_threads(gmnal_ni_t *gmnalni)
 {
+        int     i;
+        int     pid;
+
+        gmnalni->gmni_thread_shutdown = 0;
+        gmnalni->gmni_nrxthreads = 0;
+        atomic_set(&gmnalni->gmni_nthreads, 0);
 
-       int     threads = 0;
-        int     flag;
-        
         INIT_LIST_HEAD(&gmnalni->gmni_rxq);
        spin_lock_init(&gmnalni->gmni_rxq_lock);
        sema_init(&gmnalni->gmni_rxq_wait, 0);
@@ -410,65 +398,27 @@ gmnal_start_kernel_threads(gmnal_ni_t *gmnalni)
        CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n");
        gm_initialize_alarm(&gmnalni->gmni_ctthread_alarm);
 
-       CDEBUG(D_NET, "Starting caretaker thread\n");
-       gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
-       gmnalni->gmni_ctthread_pid = 
-                kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0);
-       if (gmnalni->gmni_ctthread_pid <= 0) {
-               CERROR("Caretaker thread failed to start\n");
-               return -ENOMEM;
-       }
-
-       while (gmnalni->gmni_ctthread_flag != GMNAL_THREAD_RESET) {
-               gmnal_yield(1);
-               CDEBUG(D_NET, "Waiting for caretaker thread signs of life\n");
+        pid = kernel_thread(gmnal_ct_thread, (void*)gmnalni, 0);
+       if (pid < 0) {
+               CERROR("Caretaker thread failed to start: %d\n", pid);
+               return pid;
        }
+        atomic_inc(&gmnalni->gmni_nthreads);
 
-       CDEBUG(D_NET, "caretaker thread has started\n");
-
-       /*
-        *      Now start a number of receiver threads
-        *      these treads get work to do from the caretaker (ct) thread
-        */
-       gmnalni->gmni_rxthread_flag = GMNAL_THREAD_RESET;
-       gmnalni->gmni_rxthread_stop_flag = GMNAL_THREAD_RESET;
-
-       spin_lock_init(&gmnalni->gmni_rxthread_flag_lock);
-       for (threads=0; threads<NRXTHREADS; threads++)
-               gmnalni->gmni_rxthread_pid[threads] = -1;
+       for (i = 0; i < num_online_cpus(); i++) {
 
-        /*
-         *      If the default number of receive threades isn't
-         *      modified at load time, then start one thread per cpu
-         */
-        if (num_rx_threads == -1)
-                num_rx_threads = smp_num_cpus;
-
-       CDEBUG(D_NET, "Starting [%d] receive threads\n", num_rx_threads);
-       for (threads=0; threads<num_rx_threads; threads++) {
-               gmnalni->gmni_rxthread_pid[threads] = 
-                      kernel_thread(gmnal_rx_thread, (void*)gmnalni, 0);
-               if (gmnalni->gmni_rxthread_pid[threads] <= 0) {
-                       CERROR("Receive thread failed to start\n");
-                       gmnal_stop_rxthread(gmnalni);
-                       gmnal_stop_ctthread(gmnalni);
-                       return -ENOMEM;
-               }
-       }
-
-       for (;;) {
-               spin_lock(&gmnalni->gmni_rxthread_flag_lock);
-                flag = gmnalni->gmni_rxthread_flag;
-               spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
-                
-               if (flag == GMNAL_RXTHREADS_STARTED)
-                        break;
+                pid = kernel_thread(gmnal_rx_thread, (void*)gmnalni, 0);
+                if (pid < 0) {
+                        CERROR("rx thread failed to start: %d\n", pid);
+                        gmnal_stop_threads(gmnalni);
+                        return pid;
+                }
 
-               gmnal_yield(1);
+                atomic_inc(&gmnalni->gmni_nthreads);
+               gmnalni->gmni_rxthread_pid[i] = pid;
+                gmnalni->gmni_nrxthreads++;
        }
 
-       CDEBUG(D_NET, "receive threads seem to have started\n");
-
        return 0;
 }
 
@@ -760,7 +710,7 @@ gmnal_dequeue_rx(gmnal_ni_t *gmnalni)
                while(down_interruptible(&gmnalni->gmni_rxq_wait) != 0)
                         /* do nothing */;
 
-               if (gmnalni->gmni_rxthread_stop_flag == GMNAL_THREAD_STOP)
+               if (gmnalni->gmni_thread_shutdown)
                        return NULL;
 
                spin_lock(&gmnalni->gmni_rxq_lock);