}
/*
- * NB: we will add some common at here for estimating, for example:
+ * NB: we will add some common code here for estimating, for example:
* add a new member ptlrpc_service_thr_conf::tc_factor, and estimate
* threads number based on:
* (online_cpus * conf::tc_factor) + conf::tc_nthrs_base.
* availability of service.
*
* Also, we will need to validate threads number at here for
- * CPT affinity service (CPU ParTiion) in the future.
+ * CPT affinity service (CPU ParTion) in the future.
* A service can have percpt thread-pool instead of a global thread
* pool for each service, which means user might not always get the
* threads number they want even they set it in conf::tc_nthrs_user,
*/
static int ptlrpc_main(void *arg)
{
- struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
- struct ptlrpc_thread *thread = data->thread;
+ struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg;
struct ptlrpc_service_part *svcpt = thread->t_svcpt;
struct ptlrpc_service *svc = svcpt->scp_service;
struct ptlrpc_reply_state *rs;
ENTRY;
thread->t_pid = cfs_curproc_pid();
- cfs_daemonize_ctxt(data->name);
+ cfs_daemonize_ctxt(thread->t_name);
#if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
/* we need to do this before any per-thread allocation is done so that
LASSERT(thread_is_starting(thread));
thread_clear_flags(thread, SVC_STARTING);
+ LASSERT(svcpt->scp_nthrs_starting == 1);
svcpt->scp_nthrs_starting--;
/* SVC_STOPPING may already be set here if someone else is trying
svcpt->scp_nthrs_running++;
cfs_spin_unlock(&svcpt->scp_lock);
- /*
- * wake up our creator. Note: @data is invalid after this point,
- * because it's allocated on ptlrpc_start_thread() stack.
- */
+ /* wake up our creator in case he's still waiting. */
cfs_waitq_signal(&thread->t_ctl_waitq);
thread->t_watchdog = lc_watchdog_add(ptlrpc_server_get_timeout(svcpt),
if (ptlrpc_threads_need_create(svcpt)) {
/* Ignore return code - we tried... */
- ptlrpc_start_thread(svcpt);
+ ptlrpc_start_thread(svcpt, 0);
}
/* Process all incoming reqs before handling any */
RETURN(0);
}
-static void ptlrpc_stop_thread(struct ptlrpc_service_part *svcpt,
- struct ptlrpc_thread *thread)
+static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
{
- struct l_wait_info lwi = { 0 };
+ struct l_wait_info lwi = { 0 };
+ struct ptlrpc_thread *thread;
+ CFS_LIST_HEAD (zombie);
+
ENTRY;
- CDEBUG(D_RPCTRACE, "Stopping thread [ %p : %u ]\n",
- thread, thread->t_pid);
+ CDEBUG(D_INFO, "Stopping threads for service %s\n",
+ svcpt->scp_service->srv_name);
cfs_spin_lock(&svcpt->scp_lock);
/* let the thread know that we would like it to stop asap */
- thread_add_flags(thread, SVC_STOPPING);
- cfs_spin_unlock(&svcpt->scp_lock);
+ list_for_each_entry(thread, &svcpt->scp_threads, t_link) {
+ CDEBUG(D_INFO, "Stopping thread %s #%u\n",
+ svcpt->scp_service->srv_thread_name, thread->t_id);
+ thread_add_flags(thread, SVC_STOPPING);
+ }
cfs_waitq_broadcast(&svcpt->scp_waitq);
- l_wait_event(thread->t_ctl_waitq,
- thread_is_stopped(thread), &lwi);
- cfs_spin_lock(&svcpt->scp_lock);
- cfs_list_del(&thread->t_link);
+ while (!cfs_list_empty(&svcpt->scp_threads)) {
+ thread = cfs_list_entry(svcpt->scp_threads.next,
+ struct ptlrpc_thread, t_link);
+ if (thread_is_stopped(thread)) {
+ cfs_list_del(&thread->t_link);
+ cfs_list_add(&thread->t_link, &zombie);
+ continue;
+ }
+ cfs_spin_unlock(&svcpt->scp_lock);
+
+ CDEBUG(D_INFO, "waiting for stopping-thread %s #%u\n",
+ svcpt->scp_service->srv_thread_name, thread->t_id);
+ l_wait_event(thread->t_ctl_waitq,
+ thread_is_stopped(thread), &lwi);
+
+ cfs_spin_lock(&svcpt->scp_lock);
+ }
+
cfs_spin_unlock(&svcpt->scp_lock);
- OBD_FREE_PTR(thread);
+ while (!cfs_list_empty(&zombie)) {
+ thread = cfs_list_entry(zombie.next,
+ struct ptlrpc_thread, t_link);
+ cfs_list_del(&thread->t_link);
+ OBD_FREE_PTR(thread);
+ }
EXIT;
}
*/
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
- struct ptlrpc_service_part *svcpt = svc->srv_part;
- struct ptlrpc_thread *thread;
ENTRY;
- LASSERT(svcpt != NULL);
-
- cfs_spin_lock(&svcpt->scp_lock);
- while (!cfs_list_empty(&svcpt->scp_threads)) {
- thread = cfs_list_entry(svcpt->scp_threads.next,
- struct ptlrpc_thread, t_link);
-
- cfs_spin_unlock(&svcpt->scp_lock);
- ptlrpc_stop_thread(svcpt, thread);
- cfs_spin_lock(&svcpt->scp_lock);
- }
-
- cfs_spin_unlock(&svcpt->scp_lock);
+ if (svc != NULL && svc->srv_part != NULL)
+ ptlrpc_svcpt_stop_threads(svc->srv_part);
EXIT;
}
ptlrpc_server_handle_request */
LASSERT(svc->srv_threads_min >= 2);
for (i = 0; i < svc->srv_threads_min; i++) {
- rc = ptlrpc_start_thread(svc->srv_part);
+ rc = ptlrpc_start_thread(svc->srv_part, 1);
/* We have enough threads, don't start more. b=15759 */
if (rc == -EMFILE) {
rc = 0;
RETURN(rc);
}
-int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt)
+int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
{
struct l_wait_info lwi = { 0 };
- struct ptlrpc_svc_data d;
struct ptlrpc_thread *thread;
struct ptlrpc_service *svc = svcpt->scp_service;
- char name[32];
int rc;
ENTRY;
svc->srv_name, svcpt->scp_nthrs_running,
svc->srv_threads_min, svc->srv_threads_max);
+ again:
if (unlikely(svc->srv_is_stopping))
RETURN(-ESRCH);
RETURN(-EMFILE);
}
+ if (svcpt->scp_nthrs_starting != 0) {
+ /* serialize starting because some modules (obdfilter)
+ * might require unique and contiguous t_id */
+ LASSERT(svcpt->scp_nthrs_starting == 1);
+ cfs_spin_unlock(&svcpt->scp_lock);
+ OBD_FREE_PTR(thread);
+ if (wait) {
+ CDEBUG(D_INFO, "Waiting for creating thread %s #%d\n",
+ svc->srv_thread_name, svcpt->scp_thr_nextid);
+ cfs_schedule();
+ goto again;
+ }
+
+ CDEBUG(D_INFO, "Creating thread %s #%d race, retry later\n",
+ svc->srv_thread_name, svcpt->scp_thr_nextid);
+ RETURN(-EAGAIN);
+ }
+
svcpt->scp_nthrs_starting++;
thread->t_id = svcpt->scp_thr_nextid++;
thread_add_flags(thread, SVC_STARTING);
cfs_list_add(&thread->t_link, &svcpt->scp_threads);
cfs_spin_unlock(&svcpt->scp_lock);
- sprintf(name, "%s_%02d", svc->srv_thread_name, thread->t_id);
- d.svc = svc;
- d.name = name;
- d.thread = thread;
-
- CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
-
- /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
- * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
- */
- rc = cfs_create_thread(ptlrpc_main, &d, CFS_DAEMON_FLAGS);
- if (rc < 0) {
- CERROR("cannot start thread '%s': rc %d\n", name, rc);
+ snprintf(thread->t_name, PTLRPC_THR_NAME_LEN,
+ "%s_%02d", svc->srv_thread_name, thread->t_id);
+ CDEBUG(D_RPCTRACE, "starting thread '%s'\n", thread->t_name);
+ /*
+ * CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+ * just drop the VM and FILES in cfs_daemonize_ctxt() right away.
+ */
+ rc = cfs_create_thread(ptlrpc_main, thread, CFS_DAEMON_FLAGS);
+ if (rc < 0) {
+ CERROR("cannot start thread '%s': rc %d\n",
+ thread->t_name, rc);
cfs_spin_lock(&svcpt->scp_lock);
cfs_list_del(&thread->t_link);
--svcpt->scp_nthrs_starting;
OBD_FREE(thread, sizeof(*thread));
RETURN(rc);
}
+
+ if (!wait)
+ RETURN(0);
+
l_wait_event(thread->t_ctl_waitq,
thread_is_running(thread) || thread_is_stopped(thread),
&lwi);