Whamcloud - gitweb
LU-56 ptlrpc: svc thread starting/stopping cleanup
authorLiang Zhen <liang@whamcloud.com>
Fri, 25 May 2012 12:40:24 +0000 (20:40 +0800)
committerAndreas Dilger <adilger@whamcloud.com>
Tue, 26 Jun 2012 16:04:24 +0000 (12:04 -0400)
This patch covered two things:
- serialize creation of ptlrpc service thread
  In current version we can parallel create service threads, so there
  could be "hole" of thread ID if one creation failed, it could be
  problemaic because some modules require thread ID to be strictly
  contiguous and unique. Serializing thread creation can resolve this
  issue.
- code cleanup for for stopping of ptlrpc servcie threads, this is
  just for the next step work of partitioned ptlrpc service.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: Ied8ad89003aa9d53fa73a4e5166a2c8d07a1aae9
Reviewed-on: http://review.whamcloud.com/2912
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: wangdi <di.wang@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/include/lustre_net.h
lustre/ptlrpc/pinger.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/service.c

index 64b5878..039effa 100644 (file)
@@ -967,6 +967,7 @@ enum {
         SVC_SIGNAL      = 1 << 5,
 };
 
         SVC_SIGNAL      = 1 << 5,
 };
 
+#define PTLRPC_THR_NAME_LEN            32
 /**
  * Definition of server service thread structure
  */
 /**
  * Definition of server service thread structure
  */
@@ -998,6 +999,7 @@ struct ptlrpc_thread {
        struct ptlrpc_service_part      *t_svcpt;
        cfs_waitq_t                     t_ctl_waitq;
        struct lu_env                   *t_env;
        struct ptlrpc_service_part      *t_svcpt;
        cfs_waitq_t                     t_ctl_waitq;
        struct lu_env                   *t_env;
+       char                            t_name[PTLRPC_THR_NAME_LEN];
 };
 
 static inline int thread_is_init(struct ptlrpc_thread *thread)
 };
 
 static inline int thread_is_init(struct ptlrpc_thread *thread)
@@ -1668,11 +1670,6 @@ void ptlrpc_hr_fini(void);
 # define ptlrpc_hr_fini() do {} while(0)
 #endif
 
 # define ptlrpc_hr_fini() do {} while(0)
 #endif
 
-struct ptlrpc_svc_data {
-        char *name;
-        struct ptlrpc_service *svc;
-        struct ptlrpc_thread *thread;
-};
 /** @} */
 
 /* ptlrpc/import.c */
 /** @} */
 
 /* ptlrpc/import.c */
index be520f6..2d48a43 100644 (file)
@@ -262,11 +262,10 @@ static void ptlrpc_pinger_process_import(struct obd_import *imp,
 
 static int ptlrpc_pinger_main(void *arg)
 {
 
 static int ptlrpc_pinger_main(void *arg)
 {
-        struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
-        struct ptlrpc_thread *thread = data->thread;
-        ENTRY;
+        struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg;
+       ENTRY;
 
 
-        cfs_daemonize(data->name);
+        cfs_daemonize(thread->t_name);
 
         /* Record that the thread is running */
         thread_set_flags(thread, SVC_RUNNING);
 
         /* Record that the thread is running */
         thread_set_flags(thread, SVC_RUNNING);
@@ -343,7 +342,6 @@ static struct ptlrpc_thread *pinger_thread = NULL;
 int ptlrpc_start_pinger(void)
 {
         struct l_wait_info lwi = { 0 };
 int ptlrpc_start_pinger(void)
 {
         struct l_wait_info lwi = { 0 };
-        struct ptlrpc_svc_data d;
         int rc;
 #ifndef ENABLE_PINGER
         return 0;
         int rc;
 #ifndef ENABLE_PINGER
         return 0;
@@ -359,12 +357,12 @@ int ptlrpc_start_pinger(void)
         cfs_waitq_init(&pinger_thread->t_ctl_waitq);
         cfs_waitq_init(&suspend_timeouts_waitq);
 
         cfs_waitq_init(&pinger_thread->t_ctl_waitq);
         cfs_waitq_init(&suspend_timeouts_waitq);
 
-        d.name = "ll_ping";
-        d.thread = pinger_thread;
+       strcpy(pinger_thread->t_name, "ll_ping");
 
 
-        /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
-         * just drop the VM and FILES in cfs_daemonize_ctxt() right away. */
-        rc = cfs_create_thread(ptlrpc_pinger_main, &d, CFS_DAEMON_FLAGS);
+       /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+        * just drop the VM and FILES in cfs_daemonize_ctxt() right away. */
+        rc = cfs_create_thread(ptlrpc_pinger_main,
+                              pinger_thread, CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 CERROR("cannot start thread: %d\n", rc);
                 OBD_FREE(pinger_thread, sizeof(*pinger_thread));
         if (rc < 0) {
                 CERROR("cannot start thread: %d\n", rc);
                 OBD_FREE(pinger_thread, sizeof(*pinger_thread));
index 2282bf8..ff5be4e 100644 (file)
@@ -47,7 +47,7 @@ struct ldlm_res_id;
 struct ptlrpc_request_set;
 extern int test_req_buffer_pressure;
 
 struct ptlrpc_request_set;
 extern int test_req_buffer_pressure;
 
-int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt);
+int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait);
 /* ptlrpcd.c */
 int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc);
 
 /* ptlrpcd.c */
 int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc);
 
index c0960e3..e6fe4c7 100644 (file)
@@ -471,7 +471,7 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf,
        }
 
        /*
        }
 
        /*
-        * NB: we will add some common at here for estimating, for example:
+        * NB: we will add some common code here for estimating, for example:
         * add a new member ptlrpc_service_thr_conf::tc_factor, and estimate
         * threads number based on:
         *     (online_cpus * conf::tc_factor) + conf::tc_nthrs_base.
         * add a new member ptlrpc_service_thr_conf::tc_factor, and estimate
         * threads number based on:
         *     (online_cpus * conf::tc_factor) + conf::tc_nthrs_base.
@@ -482,7 +482,7 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf,
         * availability of service.
         *
         * Also, we will need to validate threads number at here for
         * availability of service.
         *
         * Also, we will need to validate threads number at here for
-        * CPT affinity service (CPU ParTiion) in the future.
+        * CPT affinity service (CPU ParTion) in the future.
         * A service can have percpt thread-pool instead of a global thread
         * pool for each service, which means user might not always get the
         * threads number they want even they set it in conf::tc_nthrs_user,
         * A service can have percpt thread-pool instead of a global thread
         * pool for each service, which means user might not always get the
         * threads number they want even they set it in conf::tc_nthrs_user,
@@ -2172,8 +2172,7 @@ ptlrpc_wait_event(struct ptlrpc_service_part *svcpt,
  */
 static int ptlrpc_main(void *arg)
 {
  */
 static int ptlrpc_main(void *arg)
 {
-       struct ptlrpc_svc_data          *data = (struct ptlrpc_svc_data *)arg;
-       struct ptlrpc_thread            *thread = data->thread;
+       struct ptlrpc_thread            *thread = (struct ptlrpc_thread *)arg;
        struct ptlrpc_service_part      *svcpt = thread->t_svcpt;
        struct ptlrpc_service           *svc = svcpt->scp_service;
        struct ptlrpc_reply_state       *rs;
        struct ptlrpc_service_part      *svcpt = thread->t_svcpt;
        struct ptlrpc_service           *svc = svcpt->scp_service;
        struct ptlrpc_reply_state       *rs;
@@ -2185,7 +2184,7 @@ static int ptlrpc_main(void *arg)
         ENTRY;
 
         thread->t_pid = cfs_curproc_pid();
         ENTRY;
 
         thread->t_pid = cfs_curproc_pid();
-        cfs_daemonize_ctxt(data->name);
+        cfs_daemonize_ctxt(thread->t_name);
 
 #if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
         /* we need to do this before any per-thread allocation is done so that
 
 #if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
         /* we need to do this before any per-thread allocation is done so that
@@ -2250,6 +2249,7 @@ static int ptlrpc_main(void *arg)
        LASSERT(thread_is_starting(thread));
        thread_clear_flags(thread, SVC_STARTING);
 
        LASSERT(thread_is_starting(thread));
        thread_clear_flags(thread, SVC_STARTING);
 
+       LASSERT(svcpt->scp_nthrs_starting == 1);
        svcpt->scp_nthrs_starting--;
 
        /* SVC_STOPPING may already be set here if someone else is trying
        svcpt->scp_nthrs_starting--;
 
        /* SVC_STOPPING may already be set here if someone else is trying
@@ -2260,10 +2260,7 @@ static int ptlrpc_main(void *arg)
        svcpt->scp_nthrs_running++;
        cfs_spin_unlock(&svcpt->scp_lock);
 
        svcpt->scp_nthrs_running++;
        cfs_spin_unlock(&svcpt->scp_lock);
 
-       /*
-        * wake up our creator. Note: @data is invalid after this point,
-        * because it's allocated on ptlrpc_start_thread() stack.
-        */
+       /* wake up our creator in case he's still waiting. */
        cfs_waitq_signal(&thread->t_ctl_waitq);
 
        thread->t_watchdog = lc_watchdog_add(ptlrpc_server_get_timeout(svcpt),
        cfs_waitq_signal(&thread->t_ctl_waitq);
 
        thread->t_watchdog = lc_watchdog_add(ptlrpc_server_get_timeout(svcpt),
@@ -2286,7 +2283,7 @@ static int ptlrpc_main(void *arg)
 
                if (ptlrpc_threads_need_create(svcpt)) {
                        /* Ignore return code - we tried... */
 
                if (ptlrpc_threads_need_create(svcpt)) {
                        /* Ignore return code - we tried... */
-                       ptlrpc_start_thread(svcpt);
+                       ptlrpc_start_thread(svcpt, 0);
                 }
 
                /* Process all incoming reqs before handling any */
                 }
 
                /* Process all incoming reqs before handling any */
@@ -2495,29 +2492,53 @@ static int ptlrpc_start_hr_threads(struct ptlrpc_hr_service *hr)
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-static void ptlrpc_stop_thread(struct ptlrpc_service_part *svcpt,
-                              struct ptlrpc_thread *thread)
+static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
 {
 {
-       struct l_wait_info lwi = { 0 };
+       struct l_wait_info      lwi = { 0 };
+       struct ptlrpc_thread    *thread;
+       CFS_LIST_HEAD           (zombie);
+
        ENTRY;
 
        ENTRY;
 
-       CDEBUG(D_RPCTRACE, "Stopping thread [ %p : %u ]\n",
-              thread, thread->t_pid);
+       CDEBUG(D_INFO, "Stopping threads for service %s\n",
+              svcpt->scp_service->srv_name);
 
        cfs_spin_lock(&svcpt->scp_lock);
        /* let the thread know that we would like it to stop asap */
 
        cfs_spin_lock(&svcpt->scp_lock);
        /* let the thread know that we would like it to stop asap */
-       thread_add_flags(thread, SVC_STOPPING);
-       cfs_spin_unlock(&svcpt->scp_lock);
+       list_for_each_entry(thread, &svcpt->scp_threads, t_link) {
+               CDEBUG(D_INFO, "Stopping thread %s #%u\n",
+                      svcpt->scp_service->srv_thread_name, thread->t_id);
+               thread_add_flags(thread, SVC_STOPPING);
+       }
 
        cfs_waitq_broadcast(&svcpt->scp_waitq);
 
        cfs_waitq_broadcast(&svcpt->scp_waitq);
-       l_wait_event(thread->t_ctl_waitq,
-                    thread_is_stopped(thread), &lwi);
 
 
-       cfs_spin_lock(&svcpt->scp_lock);
-       cfs_list_del(&thread->t_link);
+       while (!cfs_list_empty(&svcpt->scp_threads)) {
+               thread = cfs_list_entry(svcpt->scp_threads.next,
+                                       struct ptlrpc_thread, t_link);
+               if (thread_is_stopped(thread)) {
+                       cfs_list_del(&thread->t_link);
+                       cfs_list_add(&thread->t_link, &zombie);
+                       continue;
+               }
+               cfs_spin_unlock(&svcpt->scp_lock);
+
+               CDEBUG(D_INFO, "waiting for stopping-thread %s #%u\n",
+                      svcpt->scp_service->srv_thread_name, thread->t_id);
+               l_wait_event(thread->t_ctl_waitq,
+                            thread_is_stopped(thread), &lwi);
+
+               cfs_spin_lock(&svcpt->scp_lock);
+       }
+
        cfs_spin_unlock(&svcpt->scp_lock);
 
        cfs_spin_unlock(&svcpt->scp_lock);
 
-       OBD_FREE_PTR(thread);
+       while (!cfs_list_empty(&zombie)) {
+               thread = cfs_list_entry(zombie.next,
+                                       struct ptlrpc_thread, t_link);
+               cfs_list_del(&thread->t_link);
+               OBD_FREE_PTR(thread);
+       }
        EXIT;
 }
 
        EXIT;
 }
 
@@ -2526,23 +2547,10 @@ static void ptlrpc_stop_thread(struct ptlrpc_service_part *svcpt,
  */
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 {
  */
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 {
-       struct ptlrpc_service_part      *svcpt = svc->srv_part;
-       struct ptlrpc_thread            *thread;
        ENTRY;
 
        ENTRY;
 
-       LASSERT(svcpt != NULL);
-
-       cfs_spin_lock(&svcpt->scp_lock);
-       while (!cfs_list_empty(&svcpt->scp_threads)) {
-               thread = cfs_list_entry(svcpt->scp_threads.next,
-                                       struct ptlrpc_thread, t_link);
-
-               cfs_spin_unlock(&svcpt->scp_lock);
-               ptlrpc_stop_thread(svcpt, thread);
-               cfs_spin_lock(&svcpt->scp_lock);
-       }
-
-       cfs_spin_unlock(&svcpt->scp_lock);
+       if (svc != NULL && svc->srv_part != NULL)
+               ptlrpc_svcpt_stop_threads(svc->srv_part);
        EXIT;
 }
 
        EXIT;
 }
 
@@ -2555,7 +2563,7 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
            ptlrpc_server_handle_request */
         LASSERT(svc->srv_threads_min >= 2);
         for (i = 0; i < svc->srv_threads_min; i++) {
            ptlrpc_server_handle_request */
         LASSERT(svc->srv_threads_min >= 2);
         for (i = 0; i < svc->srv_threads_min; i++) {
-               rc = ptlrpc_start_thread(svc->srv_part);
+               rc = ptlrpc_start_thread(svc->srv_part, 1);
                 /* We have enough threads, don't start more.  b=15759 */
                 if (rc == -EMFILE) {
                         rc = 0;
                 /* We have enough threads, don't start more.  b=15759 */
                 if (rc == -EMFILE) {
                         rc = 0;
@@ -2571,13 +2579,11 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
+int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
 {
        struct l_wait_info      lwi = { 0 };
 {
        struct l_wait_info      lwi = { 0 };
-       struct ptlrpc_svc_data  d;
        struct ptlrpc_thread    *thread;
        struct ptlrpc_service   *svc = svcpt->scp_service;
        struct ptlrpc_thread    *thread;
        struct ptlrpc_service   *svc = svcpt->scp_service;
-       char                    name[32];
        int                     rc;
        ENTRY;
 
        int                     rc;
        ENTRY;
 
@@ -2587,6 +2593,7 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
               svc->srv_name, svcpt->scp_nthrs_running,
               svc->srv_threads_min, svc->srv_threads_max);
 
               svc->srv_name, svcpt->scp_nthrs_running,
               svc->srv_threads_min, svc->srv_threads_max);
 
+ again:
        if (unlikely(svc->srv_is_stopping))
                RETURN(-ESRCH);
 
        if (unlikely(svc->srv_is_stopping))
                RETURN(-ESRCH);
 
@@ -2607,6 +2614,24 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
                RETURN(-EMFILE);
        }
 
                RETURN(-EMFILE);
        }
 
+       if (svcpt->scp_nthrs_starting != 0) {
+               /* serialize starting because some modules (obdfilter)
+                * might require unique and contiguous t_id */
+               LASSERT(svcpt->scp_nthrs_starting == 1);
+               cfs_spin_unlock(&svcpt->scp_lock);
+               OBD_FREE_PTR(thread);
+               if (wait) {
+                       CDEBUG(D_INFO, "Waiting for creating thread %s #%d\n",
+                              svc->srv_thread_name, svcpt->scp_thr_nextid);
+                       cfs_schedule();
+                       goto again;
+               }
+
+               CDEBUG(D_INFO, "Creating thread %s #%d race, retry later\n",
+                      svc->srv_thread_name, svcpt->scp_thr_nextid);
+               RETURN(-EAGAIN);
+       }
+
        svcpt->scp_nthrs_starting++;
        thread->t_id = svcpt->scp_thr_nextid++;
        thread_add_flags(thread, SVC_STARTING);
        svcpt->scp_nthrs_starting++;
        thread->t_id = svcpt->scp_thr_nextid++;
        thread_add_flags(thread, SVC_STARTING);
@@ -2615,20 +2640,18 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
        cfs_list_add(&thread->t_link, &svcpt->scp_threads);
        cfs_spin_unlock(&svcpt->scp_lock);
 
        cfs_list_add(&thread->t_link, &svcpt->scp_threads);
        cfs_spin_unlock(&svcpt->scp_lock);
 
-        sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id);
-        d.svc = svc;
-        d.name = name;
-        d.thread = thread;
-
-        CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-
-        /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
-         * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
-         */
-        rc = cfs_create_thread(ptlrpc_main, &d, CFS_DAEMON_FLAGS);
-        if (rc < 0) {
-                CERROR("cannot start thread '%s': rc %d\n", name, rc);
+       snprintf(thread->t_name, PTLRPC_THR_NAME_LEN,
+                "%s_%02d", svc->srv_thread_name, thread->t_id);
 
 
+       CDEBUG(D_RPCTRACE, "starting thread '%s'\n", thread->t_name);
+       /*
+        * CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+        * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
+        */
+       rc = cfs_create_thread(ptlrpc_main, thread, CFS_DAEMON_FLAGS);
+       if (rc < 0) {
+               CERROR("cannot start thread '%s': rc %d\n",
+                      thread->t_name, rc);
                cfs_spin_lock(&svcpt->scp_lock);
                cfs_list_del(&thread->t_link);
                --svcpt->scp_nthrs_starting;
                cfs_spin_lock(&svcpt->scp_lock);
                cfs_list_del(&thread->t_link);
                --svcpt->scp_nthrs_starting;
@@ -2637,6 +2660,10 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
                 OBD_FREE(thread, sizeof(*thread));
                 RETURN(rc);
         }
                 OBD_FREE(thread, sizeof(*thread));
                 RETURN(rc);
         }
+
+       if (!wait)
+               RETURN(0);
+
         l_wait_event(thread->t_ctl_waitq,
                      thread_is_running(thread) || thread_is_stopped(thread),
                      &lwi);
         l_wait_event(thread->t_ctl_waitq,
                      thread_is_running(thread) || thread_is_stopped(thread),
                      &lwi);