+struct ptlrpc_hr_args {
+ int thread_index;
+ int cpu_index;
+ struct ptlrpc_hr_service *hrs;
+};
+
+static int hrt_dont_sleep(struct ptlrpc_hr_thread *t,
+ struct list_head *replies)
+{
+ int result;
+
+ spin_lock(&t->hrt_lock);
+ list_splice_init(&t->hrt_queue, replies);
+ result = test_bit(HRT_STOPPING, &t->hrt_flags) ||
+ !list_empty(replies);
+ spin_unlock(&t->hrt_lock);
+ return result;
+}
+
+static int ptlrpc_hr_main(void *arg)
+{
+ struct ptlrpc_hr_args * hr_args = arg;
+ struct ptlrpc_hr_service *hr = hr_args->hrs;
+ struct ptlrpc_hr_thread *t = &hr->hr_threads[hr_args->thread_index];
+ char threadname[20];
+ CFS_LIST_HEAD(replies);
+
+ snprintf(threadname, sizeof(threadname),
+ "ptlrpc_hr_%d", hr_args->thread_index);
+
+ cfs_daemonize_ctxt(threadname);
+#if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK)
+ set_cpus_allowed(cfs_current(),
+ node_to_cpumask(cpu_to_node(hr_args->cpu_index)));
+#endif
+ set_bit(HRT_RUNNING, &t->hrt_flags);
+ cfs_waitq_signal(&t->hrt_wait);
+
+ while (!test_bit(HRT_STOPPING, &t->hrt_flags)) {
+
+ cfs_wait_event(t->hrt_wait, hrt_dont_sleep(t, &replies));
+ while (!list_empty(&replies)) {
+ struct ptlrpc_reply_state *rs;
+
+ rs = list_entry(replies.prev,
+ struct ptlrpc_reply_state,
+ rs_list);
+ list_del_init(&rs->rs_list);
+ ptlrpc_handle_rs(rs);
+ }
+ }
+
+ clear_bit(HRT_RUNNING, &t->hrt_flags);
+ complete(&t->hrt_completion);
+
+ return 0;
+}
+
+static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu)
+{
+ struct ptlrpc_hr_thread *t = &hr->hr_threads[n];
+ struct ptlrpc_hr_args args;
+ int rc;
+ ENTRY;
+
+ args.thread_index = n;
+ args.cpu_index = cpu;
+ args.hrs = hr;
+
+ rc = cfs_kernel_thread(ptlrpc_hr_main, (void*)&args,
+ CLONE_VM|CLONE_FILES);
+ if (rc < 0) {
+ complete(&t->hrt_completion);
+ GOTO(out, rc);
+ }
+ cfs_wait_event(t->hrt_wait, test_bit(HRT_RUNNING, &t->hrt_flags));
+ RETURN(0);
+ out:
+ return rc;
+}
+
+static void ptlrpc_stop_hr_thread(struct ptlrpc_hr_thread *t)
+{
+ ENTRY;
+
+ set_bit(HRT_STOPPING, &t->hrt_flags);
+ cfs_waitq_signal(&t->hrt_wait);
+ wait_for_completion(&t->hrt_completion);
+
+ EXIT;
+}
+
+static void ptlrpc_stop_hr_threads(struct ptlrpc_hr_service *hrs)
+{
+ int n;
+ ENTRY;
+
+ for (n = 0; n < hrs->hr_n_threads; n++)
+ ptlrpc_stop_hr_thread(&hrs->hr_threads[n]);
+
+ EXIT;
+}
+
+static int ptlrpc_start_hr_threads(struct ptlrpc_hr_service *hr)
+{
+ int rc = -ENOMEM;
+ int n, cpu, threads_started = 0;
+ ENTRY;
+
+ LASSERT(hr != NULL);
+ LASSERT(hr->hr_n_threads > 0);
+
+ for (n = 0, cpu = 0; n < hr->hr_n_threads; n++) {
+#if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK)
+ while(!cpu_online(cpu)) {
+ cpu++;
+ if (cpu >= num_possible_cpus())
+ cpu = 0;
+ }
+#endif
+ rc = ptlrpc_start_hr_thread(hr, n, cpu);
+ if (rc != 0)
+ break;
+ threads_started++;
+ cpu++;
+ }
+ if (threads_started == 0) {
+ CERROR("No reply handling threads started\n");
+ RETURN(-ESRCH);
+ }
+ if (threads_started < hr->hr_n_threads) {
+ CWARN("Started only %d reply handling threads from %d\n",
+ threads_started, hr->hr_n_threads);
+ hr->hr_n_threads = threads_started;
+ }
+ RETURN(0);
+}
+