- struct ptlrpcd_ctl *pc = arg;
- struct ptlrpc_request_set *set = pc->pc_set;
- struct lu_env env = { .le_ses = NULL };
- int rc, exit = 0;
- ENTRY;
-
- cfs_daemonize_ctxt(pc->pc_name);
-#if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK)
- if (cfs_test_bit(LIOD_BIND, &pc->pc_flags)) {
- int index = pc->pc_index;
-
- if (index >= 0 && index < cfs_num_possible_cpus()) {
- while (!cfs_cpu_online(index)) {
- if (++index >= cfs_num_possible_cpus())
- index = 0;
- }
- cfs_set_cpus_allowed(cfs_current(),
- node_to_cpumask(cpu_to_node(index)));
- }
- }
-#endif
- /*
- * XXX So far only "client" ptlrpcd uses an environment. In
- * the future, ptlrpcd thread (or a thread-set) has to given
- * an argument, describing its "scope".
- */
- rc = lu_context_init(&env.le_ctx,
- LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
- cfs_complete(&pc->pc_starting);
-
- if (rc != 0)
- RETURN(rc);
-
- /*
- * This mainloop strongly resembles ptlrpc_set_wait() except that our
- * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when
- * there are requests in the set. New requests come in on the set's
- * new_req_list and ptlrpcd_check() moves them into the set.
- */
- do {
- struct l_wait_info lwi;
- int timeout;
-
- rc = lu_env_refill(&env);
- if (rc != 0) {
- /*
- * XXX This is very awkward situation, because
- * execution can neither continue (request
- * interpreters assume that env is set up), nor repeat
- * the loop (as this potentially results in a tight
- * loop of -ENOMEM's).
- *
- * Fortunately, refill only ever does something when
- * new modules are loaded, i.e., early during boot up.
- */
- CERROR("Failure to refill session: %d\n", rc);
- continue;
- }
-
- timeout = ptlrpc_set_next_timeout(set);
- lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
- ptlrpc_expired_set, set);
-
- lu_context_enter(&env.le_ctx);
- l_wait_event(set->set_waitq,
- ptlrpcd_check(&env, pc), &lwi);
- lu_context_exit(&env.le_ctx);
-
- /*
- * Abort inflight rpcs for forced stop case.
- */
- if (cfs_test_bit(LIOD_STOP, &pc->pc_flags)) {
- if (cfs_test_bit(LIOD_FORCE, &pc->pc_flags))
- ptlrpc_abort_set(set);
- exit++;
- }
-
- /*
- * Let's make one more loop to make sure that ptlrpcd_check()
- * copied all raced new rpcs into the set so we can kill them.
- */
- } while (exit < 2);
-
- /*
- * Wait for inflight requests to drain.
- */
- if (!cfs_list_empty(&set->set_requests))
- ptlrpc_set_wait(set);
- lu_context_fini(&env.le_ctx);
- cfs_complete(&pc->pc_finishing);
-
- cfs_clear_bit(LIOD_START, &pc->pc_flags);
- cfs_clear_bit(LIOD_STOP, &pc->pc_flags);
- cfs_clear_bit(LIOD_FORCE, &pc->pc_flags);
- cfs_clear_bit(LIOD_BIND, &pc->pc_flags);
- return 0;
+ struct ptlrpcd_ctl *pc = arg;
+ struct ptlrpc_request_set *set;
+ struct lu_context ses = { 0 };
+ struct lu_env env = { .le_ses = &ses };
+ int rc = 0;
+ int exit = 0;
+
+ ENTRY;
+ if (cfs_cpt_bind(cfs_cpt_tab, pc->pc_cpt) != 0)
+ CWARN("Failed to bind %s on CPT %d\n", pc->pc_name, pc->pc_cpt);
+
+ /*
+ * Allocate the request set after the thread has been bound
+ * above. This is safe because no requests will be queued
+ * until all ptlrpcd threads have confirmed that they have
+ * successfully started.
+ */
+ set = ptlrpc_prep_set();
+ if (set == NULL)
+ GOTO(failed, rc = -ENOMEM);
+ spin_lock(&pc->pc_lock);
+ pc->pc_set = set;
+ spin_unlock(&pc->pc_lock);
+
+ /* Both client and server (MDT/OST) may use the environment. */
+ rc = lu_context_init(&env.le_ctx, LCT_MD_THREAD |
+ LCT_DT_THREAD |
+ LCT_CL_THREAD |
+ LCT_REMEMBER |
+ LCT_NOREF);
+ if (rc != 0)
+ GOTO(failed, rc);
+ rc = lu_context_init(env.le_ses, LCT_SESSION |
+ LCT_REMEMBER |
+ LCT_NOREF);
+ if (rc != 0) {
+ lu_context_fini(&env.le_ctx);
+ GOTO(failed, rc);
+ }
+
+ complete(&pc->pc_starting);
+
+ /*
+ * This mainloop strongly resembles ptlrpc_set_wait() except that our
+ * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when
+ * there are requests in the set. New requests come in on the set's
+ * new_req_list and ptlrpcd_check() moves them into the set.
+ */
+ do {
+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
+ time64_t timeout;
+
+ timeout = ptlrpc_set_next_timeout(set);
+
+ lu_context_enter(&env.le_ctx);
+ lu_context_enter(env.le_ses);
+
+ add_wait_queue(&set->set_waitq, &wait);
+ while (!ptlrpcd_check(&env, pc)) {
+ int ret;
+
+ if (timeout == 0)
+ ret = wait_woken(&wait, TASK_IDLE,
+ MAX_SCHEDULE_TIMEOUT);
+ else
+ ret = wait_woken(&wait, TASK_IDLE,
+ cfs_time_seconds(timeout));
+ if (ret != 0)
+ continue;
+ /* Timed out */
+ ptlrpc_expired_set(set);
+ break;
+ }
+ remove_wait_queue(&set->set_waitq, &wait);
+
+ lu_context_exit(&env.le_ctx);
+ lu_context_exit(env.le_ses);
+
+ /*
+ * Abort inflight rpcs for forced stop case.
+ */
+ if (test_bit(LIOD_STOP, &pc->pc_flags)) {
+ if (test_bit(LIOD_FORCE, &pc->pc_flags))
+ ptlrpc_abort_set(set);
+ exit++;
+ }
+
+ /*
+ * Let's make one more loop to make sure that ptlrpcd_check()
+ * copied all raced new rpcs into the set so we can kill them.
+ */
+ } while (exit < 2);
+
+ /*
+ * Wait for inflight requests to drain.
+ */
+ if (!list_empty(&set->set_requests))
+ ptlrpc_set_wait(&env, set);
+ lu_context_fini(&env.le_ctx);
+ lu_context_fini(env.le_ses);
+
+ complete(&pc->pc_finishing);
+
+ return 0;
+
+failed:
+ pc->pc_error = rc;
+ complete(&pc->pc_starting);
+ RETURN(rc);