X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fptlrpc%2Fservice.c;h=e6fe4c718f1cc2f605dba15eb8eb36fdb3ddcde6;hp=c0960e3cc9a2f4c928fac60bc960aa90ac9d1dde;hb=ff0c89a73e141ce019ee2a94e5d01a8a37dd830a;hpb=12b6c0b993bca79091d035cbe4cf1805d8adcc47 diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index c0960e3..e6fe4c7 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -471,7 +471,7 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf, } /* - * NB: we will add some common at here for estimating, for example: + * NB: we will add some common code here for estimating, for example: * add a new member ptlrpc_service_thr_conf::tc_factor, and estimate * threads number based on: * (online_cpus * conf::tc_factor) + conf::tc_nthrs_base. @@ -482,7 +482,7 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf, * availability of service. * * Also, we will need to validate threads number at here for - * CPT affinity service (CPU ParTiion) in the future. + * CPT affinity service (CPU ParTion) in the future. * A service can have percpt thread-pool instead of a global thread * pool for each service, which means user might not always get the * threads number they want even they set it in conf::tc_nthrs_user, @@ -2172,8 +2172,7 @@ ptlrpc_wait_event(struct ptlrpc_service_part *svcpt, */ static int ptlrpc_main(void *arg) { - struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg; - struct ptlrpc_thread *thread = data->thread; + struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg; struct ptlrpc_service_part *svcpt = thread->t_svcpt; struct ptlrpc_service *svc = svcpt->scp_service; struct ptlrpc_reply_state *rs; @@ -2185,7 +2184,7 @@ static int ptlrpc_main(void *arg) ENTRY; thread->t_pid = cfs_curproc_pid(); - cfs_daemonize_ctxt(data->name); + cfs_daemonize_ctxt(thread->t_name); #if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA) /* we need to do this before any per-thread allocation is done so that @@ -2250,6 +2249,7 @@ static int ptlrpc_main(void *arg) LASSERT(thread_is_starting(thread)); thread_clear_flags(thread, SVC_STARTING); + LASSERT(svcpt->scp_nthrs_starting == 1); svcpt->scp_nthrs_starting--; /* SVC_STOPPING may already be set here if someone else is trying @@ -2260,10 +2260,7 @@ static int ptlrpc_main(void *arg) svcpt->scp_nthrs_running++; cfs_spin_unlock(&svcpt->scp_lock); - /* - * wake up our creator. Note: @data is invalid after this point, - * because it's allocated on ptlrpc_start_thread() stack. - */ + /* wake up our creator in case he's still waiting. */ cfs_waitq_signal(&thread->t_ctl_waitq); thread->t_watchdog = lc_watchdog_add(ptlrpc_server_get_timeout(svcpt), @@ -2286,7 +2283,7 @@ static int ptlrpc_main(void *arg) if (ptlrpc_threads_need_create(svcpt)) { /* Ignore return code - we tried... */ - ptlrpc_start_thread(svcpt); + ptlrpc_start_thread(svcpt, 0); } /* Process all incoming reqs before handling any */ @@ -2495,29 +2492,53 @@ static int ptlrpc_start_hr_threads(struct ptlrpc_hr_service *hr) RETURN(0); } -static void ptlrpc_stop_thread(struct ptlrpc_service_part *svcpt, - struct ptlrpc_thread *thread) +static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt) { - struct l_wait_info lwi = { 0 }; + struct l_wait_info lwi = { 0 }; + struct ptlrpc_thread *thread; + CFS_LIST_HEAD (zombie); + ENTRY; - CDEBUG(D_RPCTRACE, "Stopping thread [ %p : %u ]\n", - thread, thread->t_pid); + CDEBUG(D_INFO, "Stopping threads for service %s\n", + svcpt->scp_service->srv_name); cfs_spin_lock(&svcpt->scp_lock); /* let the thread know that we would like it to stop asap */ - thread_add_flags(thread, SVC_STOPPING); - cfs_spin_unlock(&svcpt->scp_lock); + list_for_each_entry(thread, &svcpt->scp_threads, t_link) { + CDEBUG(D_INFO, "Stopping thread %s #%u\n", + svcpt->scp_service->srv_thread_name, thread->t_id); + thread_add_flags(thread, SVC_STOPPING); + } cfs_waitq_broadcast(&svcpt->scp_waitq); - l_wait_event(thread->t_ctl_waitq, - thread_is_stopped(thread), &lwi); - cfs_spin_lock(&svcpt->scp_lock); - cfs_list_del(&thread->t_link); + while (!cfs_list_empty(&svcpt->scp_threads)) { + thread = cfs_list_entry(svcpt->scp_threads.next, + struct ptlrpc_thread, t_link); + if (thread_is_stopped(thread)) { + cfs_list_del(&thread->t_link); + cfs_list_add(&thread->t_link, &zombie); + continue; + } + cfs_spin_unlock(&svcpt->scp_lock); + + CDEBUG(D_INFO, "waiting for stopping-thread %s #%u\n", + svcpt->scp_service->srv_thread_name, thread->t_id); + l_wait_event(thread->t_ctl_waitq, + thread_is_stopped(thread), &lwi); + + cfs_spin_lock(&svcpt->scp_lock); + } + cfs_spin_unlock(&svcpt->scp_lock); - OBD_FREE_PTR(thread); + while (!cfs_list_empty(&zombie)) { + thread = cfs_list_entry(zombie.next, + struct ptlrpc_thread, t_link); + cfs_list_del(&thread->t_link); + OBD_FREE_PTR(thread); + } EXIT; } @@ -2526,23 +2547,10 @@ static void ptlrpc_stop_thread(struct ptlrpc_service_part *svcpt, */ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) { - struct ptlrpc_service_part *svcpt = svc->srv_part; - struct ptlrpc_thread *thread; ENTRY; - LASSERT(svcpt != NULL); - - cfs_spin_lock(&svcpt->scp_lock); - while (!cfs_list_empty(&svcpt->scp_threads)) { - thread = cfs_list_entry(svcpt->scp_threads.next, - struct ptlrpc_thread, t_link); - - cfs_spin_unlock(&svcpt->scp_lock); - ptlrpc_stop_thread(svcpt, thread); - cfs_spin_lock(&svcpt->scp_lock); - } - - cfs_spin_unlock(&svcpt->scp_lock); + if (svc != NULL && svc->srv_part != NULL) + ptlrpc_svcpt_stop_threads(svc->srv_part); EXIT; } @@ -2555,7 +2563,7 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc) ptlrpc_server_handle_request */ LASSERT(svc->srv_threads_min >= 2); for (i = 0; i < svc->srv_threads_min; i++) { - rc = ptlrpc_start_thread(svc->srv_part); + rc = ptlrpc_start_thread(svc->srv_part, 1); /* We have enough threads, don't start more. b=15759 */ if (rc == -EMFILE) { rc = 0; @@ -2571,13 +2579,11 @@ int ptlrpc_start_threads(struct ptlrpc_service *svc) RETURN(rc); } -int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt) +int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait) { struct l_wait_info lwi = { 0 }; - struct ptlrpc_svc_data d; struct ptlrpc_thread *thread; struct ptlrpc_service *svc = svcpt->scp_service; - char name[32]; int rc; ENTRY; @@ -2587,6 +2593,7 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt) svc->srv_name, svcpt->scp_nthrs_running, svc->srv_threads_min, svc->srv_threads_max); + again: if (unlikely(svc->srv_is_stopping)) RETURN(-ESRCH); @@ -2607,6 +2614,24 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt) RETURN(-EMFILE); } + if (svcpt->scp_nthrs_starting != 0) { + /* serialize starting because some modules (obdfilter) + * might require unique and contiguous t_id */ + LASSERT(svcpt->scp_nthrs_starting == 1); + cfs_spin_unlock(&svcpt->scp_lock); + OBD_FREE_PTR(thread); + if (wait) { + CDEBUG(D_INFO, "Waiting for creating thread %s #%d\n", + svc->srv_thread_name, svcpt->scp_thr_nextid); + cfs_schedule(); + goto again; + } + + CDEBUG(D_INFO, "Creating thread %s #%d race, retry later\n", + svc->srv_thread_name, svcpt->scp_thr_nextid); + RETURN(-EAGAIN); + } + svcpt->scp_nthrs_starting++; thread->t_id = svcpt->scp_thr_nextid++; thread_add_flags(thread, SVC_STARTING); @@ -2615,20 +2640,18 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt) cfs_list_add(&thread->t_link, &svcpt->scp_threads); cfs_spin_unlock(&svcpt->scp_lock); - sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id); - d.svc = svc; - d.name = name; - d.thread = thread; - - CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name); - - /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we - * just drop the VM and FILES in cfs_daemonize_ctxt() right away. - */ - rc = cfs_create_thread(ptlrpc_main, &d, CFS_DAEMON_FLAGS); - if (rc < 0) { - CERROR("cannot start thread '%s': rc %d\n", name, rc); + snprintf(thread->t_name, PTLRPC_THR_NAME_LEN, + "%s_%02d", svc->srv_thread_name, thread->t_id); + CDEBUG(D_RPCTRACE, "starting thread '%s'\n", thread->t_name); + /* + * CLONE_VM and CLONE_FILES just avoid a needless copy, because we + * just drop the VM and FILES in cfs_daemonize_ctxt() right away. + */ + rc = cfs_create_thread(ptlrpc_main, thread, CFS_DAEMON_FLAGS); + if (rc < 0) { + CERROR("cannot start thread '%s': rc %d\n", + thread->t_name, rc); cfs_spin_lock(&svcpt->scp_lock); cfs_list_del(&thread->t_link); --svcpt->scp_nthrs_starting; @@ -2637,6 +2660,10 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt) OBD_FREE(thread, sizeof(*thread)); RETURN(rc); } + + if (!wait) + RETURN(0); + l_wait_event(thread->t_ctl_waitq, thread_is_running(thread) || thread_is_stopped(thread), &lwi);