Whamcloud - gitweb
LU-56 ptlrpc: svc thread starting/stopping cleanup
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index c0960e3..e6fe4c7 100644 (file)
@@ -471,7 +471,7 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf,
        }
 
        /*
-        * NB: we will add some common at here for estimating, for example:
+        * NB: we will add some common code here for estimating, for example:
         * add a new member ptlrpc_service_thr_conf::tc_factor, and estimate
         * threads number based on:
         *     (online_cpus * conf::tc_factor) + conf::tc_nthrs_base.
@@ -482,7 +482,7 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf,
         * availability of service.
         *
         * Also, we will need to validate threads number at here for
-        * CPT affinity service (CPU ParTiion) in the future.
+        * CPT affinity service (CPU ParTion) in the future.
         * A service can have percpt thread-pool instead of a global thread
         * pool for each service, which means user might not always get the
         * threads number they want even they set it in conf::tc_nthrs_user,
@@ -2172,8 +2172,7 @@ ptlrpc_wait_event(struct ptlrpc_service_part *svcpt,
  */
 static int ptlrpc_main(void *arg)
 {
-       struct ptlrpc_svc_data          *data = (struct ptlrpc_svc_data *)arg;
-       struct ptlrpc_thread            *thread = data->thread;
+       struct ptlrpc_thread            *thread = (struct ptlrpc_thread *)arg;
        struct ptlrpc_service_part      *svcpt = thread->t_svcpt;
        struct ptlrpc_service           *svc = svcpt->scp_service;
        struct ptlrpc_reply_state       *rs;
@@ -2185,7 +2184,7 @@ static int ptlrpc_main(void *arg)
         ENTRY;
 
         thread->t_pid = cfs_curproc_pid();
-        cfs_daemonize_ctxt(data->name);
+        cfs_daemonize_ctxt(thread->t_name);
 
 #if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
         /* we need to do this before any per-thread allocation is done so that
@@ -2250,6 +2249,7 @@ static int ptlrpc_main(void *arg)
        LASSERT(thread_is_starting(thread));
        thread_clear_flags(thread, SVC_STARTING);
 
+       LASSERT(svcpt->scp_nthrs_starting == 1);
        svcpt->scp_nthrs_starting--;
 
        /* SVC_STOPPING may already be set here if someone else is trying
@@ -2260,10 +2260,7 @@ static int ptlrpc_main(void *arg)
        svcpt->scp_nthrs_running++;
        cfs_spin_unlock(&svcpt->scp_lock);
 
-       /*
-        * wake up our creator. Note: @data is invalid after this point,
-        * because it's allocated on ptlrpc_start_thread() stack.
-        */
+       /* wake up our creator in case he's still waiting. */
        cfs_waitq_signal(&thread->t_ctl_waitq);
 
        thread->t_watchdog = lc_watchdog_add(ptlrpc_server_get_timeout(svcpt),
@@ -2286,7 +2283,7 @@ static int ptlrpc_main(void *arg)
 
                if (ptlrpc_threads_need_create(svcpt)) {
                        /* Ignore return code - we tried... */
-                       ptlrpc_start_thread(svcpt);
+                       ptlrpc_start_thread(svcpt, 0);
                 }
 
                /* Process all incoming reqs before handling any */
@@ -2495,29 +2492,53 @@ static int ptlrpc_start_hr_threads(struct ptlrpc_hr_service *hr)
         RETURN(0);
 }
 
-static void ptlrpc_stop_thread(struct ptlrpc_service_part *svcpt,
-                              struct ptlrpc_thread *thread)
+static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
 {
-       struct l_wait_info lwi = { 0 };
+       struct l_wait_info      lwi = { 0 };
+       struct ptlrpc_thread    *thread;
+       CFS_LIST_HEAD           (zombie);
+
        ENTRY;
 
-       CDEBUG(D_RPCTRACE, "Stopping thread [ %p : %u ]\n",
-              thread, thread->t_pid);
+       CDEBUG(D_INFO, "Stopping threads for service %s\n",
+              svcpt->scp_service->srv_name);
 
        cfs_spin_lock(&svcpt->scp_lock);
        /* let the thread know that we would like it to stop asap */
-       thread_add_flags(thread, SVC_STOPPING);
-       cfs_spin_unlock(&svcpt->scp_lock);
+       list_for_each_entry(thread, &svcpt->scp_threads, t_link) {
+               CDEBUG(D_INFO, "Stopping thread %s #%u\n",
+                      svcpt->scp_service->srv_thread_name, thread->t_id);
+               thread_add_flags(thread, SVC_STOPPING);
+       }
 
        cfs_waitq_broadcast(&svcpt->scp_waitq);
-       l_wait_event(thread->t_ctl_waitq,
-                    thread_is_stopped(thread), &lwi);
 
-       cfs_spin_lock(&svcpt->scp_lock);
-       cfs_list_del(&thread->t_link);
+       while (!cfs_list_empty(&svcpt->scp_threads)) {
+               thread = cfs_list_entry(svcpt->scp_threads.next,
+                                       struct ptlrpc_thread, t_link);
+               if (thread_is_stopped(thread)) {
+                       cfs_list_del(&thread->t_link);
+                       cfs_list_add(&thread->t_link, &zombie);
+                       continue;
+               }
+               cfs_spin_unlock(&svcpt->scp_lock);
+
+               CDEBUG(D_INFO, "waiting for stopping-thread %s #%u\n",
+                      svcpt->scp_service->srv_thread_name, thread->t_id);
+               l_wait_event(thread->t_ctl_waitq,
+                            thread_is_stopped(thread), &lwi);
+
+               cfs_spin_lock(&svcpt->scp_lock);
+       }
+
        cfs_spin_unlock(&svcpt->scp_lock);
 
-       OBD_FREE_PTR(thread);
+       while (!cfs_list_empty(&zombie)) {
+               thread = cfs_list_entry(zombie.next,
+                                       struct ptlrpc_thread, t_link);
+               cfs_list_del(&thread->t_link);
+               OBD_FREE_PTR(thread);
+       }
        EXIT;
 }
 
@@ -2526,23 +2547,10 @@ static void ptlrpc_stop_thread(struct ptlrpc_service_part *svcpt,
  */
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
 {
-       struct ptlrpc_service_part      *svcpt = svc->srv_part;
-       struct ptlrpc_thread            *thread;
        ENTRY;
 
-       LASSERT(svcpt != NULL);
-
-       cfs_spin_lock(&svcpt->scp_lock);
-       while (!cfs_list_empty(&svcpt->scp_threads)) {
-               thread = cfs_list_entry(svcpt->scp_threads.next,
-                                       struct ptlrpc_thread, t_link);
-
-               cfs_spin_unlock(&svcpt->scp_lock);
-               ptlrpc_stop_thread(svcpt, thread);
-               cfs_spin_lock(&svcpt->scp_lock);
-       }
-
-       cfs_spin_unlock(&svcpt->scp_lock);
+       if (svc != NULL && svc->srv_part != NULL)
+               ptlrpc_svcpt_stop_threads(svc->srv_part);
        EXIT;
 }
 
@@ -2555,7 +2563,7 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
            ptlrpc_server_handle_request */
         LASSERT(svc->srv_threads_min >= 2);
         for (i = 0; i < svc->srv_threads_min; i++) {
-               rc = ptlrpc_start_thread(svc->srv_part);
+               rc = ptlrpc_start_thread(svc->srv_part, 1);
                 /* We have enough threads, don't start more.  b=15759 */
                 if (rc == -EMFILE) {
                         rc = 0;
@@ -2571,13 +2579,11 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc)
         RETURN(rc);
 }
 
-int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
+int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
 {
        struct l_wait_info      lwi = { 0 };
-       struct ptlrpc_svc_data  d;
        struct ptlrpc_thread    *thread;
        struct ptlrpc_service   *svc = svcpt->scp_service;
-       char                    name[32];
        int                     rc;
        ENTRY;
 
@@ -2587,6 +2593,7 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
               svc->srv_name, svcpt->scp_nthrs_running,
               svc->srv_threads_min, svc->srv_threads_max);
 
+ again:
        if (unlikely(svc->srv_is_stopping))
                RETURN(-ESRCH);
 
@@ -2607,6 +2614,24 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
                RETURN(-EMFILE);
        }
 
+       if (svcpt->scp_nthrs_starting != 0) {
+               /* serialize starting because some modules (obdfilter)
+                * might require unique and contiguous t_id */
+               LASSERT(svcpt->scp_nthrs_starting == 1);
+               cfs_spin_unlock(&svcpt->scp_lock);
+               OBD_FREE_PTR(thread);
+               if (wait) {
+                       CDEBUG(D_INFO, "Waiting for creating thread %s #%d\n",
+                              svc->srv_thread_name, svcpt->scp_thr_nextid);
+                       cfs_schedule();
+                       goto again;
+               }
+
+               CDEBUG(D_INFO, "Creating thread %s #%d race, retry later\n",
+                      svc->srv_thread_name, svcpt->scp_thr_nextid);
+               RETURN(-EAGAIN);
+       }
+
        svcpt->scp_nthrs_starting++;
        thread->t_id = svcpt->scp_thr_nextid++;
        thread_add_flags(thread, SVC_STARTING);
@@ -2615,20 +2640,18 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
        cfs_list_add(&thread->t_link, &svcpt->scp_threads);
        cfs_spin_unlock(&svcpt->scp_lock);
 
-        sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id);
-        d.svc = svc;
-        d.name = name;
-        d.thread = thread;
-
-        CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-
-        /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
-         * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
-         */
-        rc = cfs_create_thread(ptlrpc_main, &d, CFS_DAEMON_FLAGS);
-        if (rc < 0) {
-                CERROR("cannot start thread '%s': rc %d\n", name, rc);
+       snprintf(thread->t_name, PTLRPC_THR_NAME_LEN,
+                "%s_%02d", svc->srv_thread_name, thread->t_id);
 
+       CDEBUG(D_RPCTRACE, "starting thread '%s'\n", thread->t_name);
+       /*
+        * CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+        * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
+        */
+       rc = cfs_create_thread(ptlrpc_main, thread, CFS_DAEMON_FLAGS);
+       if (rc < 0) {
+               CERROR("cannot start thread '%s': rc %d\n",
+                      thread->t_name, rc);
                cfs_spin_lock(&svcpt->scp_lock);
                cfs_list_del(&thread->t_link);
                --svcpt->scp_nthrs_starting;
@@ -2637,6 +2660,10 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
                 OBD_FREE(thread, sizeof(*thread));
                 RETURN(rc);
         }
+
+       if (!wait)
+               RETURN(0);
+
         l_wait_event(thread->t_ctl_waitq,
                      thread_is_running(thread) || thread_is_stopped(thread),
                      &lwi);