+static void ptlrpcd_fini(void)
+{
+ int i;
+ ENTRY;
+
+ if (ptlrpcds != NULL) {
+ for (i = 0; i < ptlrpcds->pd_nthreads; i++)
+ ptlrpcd_stop(&ptlrpcds->pd_threads[i], 0);
+ for (i = 0; i < ptlrpcds->pd_nthreads; i++)
+ ptlrpcd_free(&ptlrpcds->pd_threads[i]);
+ ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
+ ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
+ OBD_FREE(ptlrpcds, ptlrpcds->pd_size);
+ ptlrpcds = NULL;
+ }
+
+ EXIT;
+}
+
+static int ptlrpcd_init(void)
+{
+ int nthreads = num_online_cpus();
+ char name[16];
+ int size, i = -1, j, rc = 0;
+ ENTRY;
+
+ if (max_ptlrpcds > 0 && max_ptlrpcds < nthreads)
+ nthreads = max_ptlrpcds;
+ if (nthreads < 2)
+ nthreads = 2;
+ if (nthreads < 3 && ptlrpcd_bind_policy == PDB_POLICY_NEIGHBOR)
+ ptlrpcd_bind_policy = PDB_POLICY_PAIR;
+ else if (nthreads % 2 != 0 && ptlrpcd_bind_policy == PDB_POLICY_PAIR)
+ nthreads &= ~1; /* make sure it is even */
+
+ size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
+ OBD_ALLOC(ptlrpcds, size);
+ if (ptlrpcds == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ snprintf(name, 15, "ptlrpcd_rcv");
+ set_bit(LIOD_RECOVERY, &ptlrpcds->pd_thread_rcv.pc_flags);
+ rc = ptlrpcd_start(-1, nthreads, name, &ptlrpcds->pd_thread_rcv);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ /* XXX: We start nthreads ptlrpc daemons. Each of them can process any
+ * non-recovery async RPC to improve overall async RPC efficiency.
+ *
+ * But there are some issues with async I/O RPCs and async non-I/O
+ * RPCs processed in the same set under some cases. The ptlrpcd may
+ * be blocked by some async I/O RPC(s), then will cause other async
+ * non-I/O RPC(s) can not be processed in time.
+ *
+ * Maybe we should distinguish blocked async RPCs from non-blocked
+ * async RPCs, and process them in different ptlrpcd sets to avoid
+ * unnecessary dependency. But how to distribute async RPCs load
+ * among all the ptlrpc daemons becomes another trouble. */
+ for (i = 0; i < nthreads; i++) {
+ snprintf(name, 15, "ptlrpcd_%d", i);
+ rc = ptlrpcd_start(i, nthreads, name, &ptlrpcds->pd_threads[i]);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
+
+ ptlrpcds->pd_size = size;
+ ptlrpcds->pd_index = 0;
+ ptlrpcds->pd_nthreads = nthreads;
+
+out:
+ if (rc != 0 && ptlrpcds != NULL) {
+ for (j = 0; j <= i; j++)
+ ptlrpcd_stop(&ptlrpcds->pd_threads[j], 0);
+ for (j = 0; j <= i; j++)
+ ptlrpcd_free(&ptlrpcds->pd_threads[j]);
+ ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
+ ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
+ OBD_FREE(ptlrpcds, size);
+ ptlrpcds = NULL;
+ }
+
+ RETURN(rc);
+}
+