+ if (ptlrpcd_partner_group_size == 0)
+ ptlrpcd_partner_group_size = 2;
+ else if (ptlrpcd_partner_group_size < 0)
+ ptlrpcd_partner_group_size = -1;
+ else if (ptlrpcd_per_cpt_max > 0 &&
+ ptlrpcd_partner_group_size > ptlrpcd_per_cpt_max)
+ ptlrpcd_partner_group_size = ptlrpcd_per_cpt_max;
+
+ /*
+ * Start the recovery thread first.
+ */
+ set_bit(LIOD_RECOVERY, &ptlrpcd_rcv.pc_flags);
+ ptlrpcd_ctl_init(&ptlrpcd_rcv, -1, CFS_CPT_ANY);
+ rc = ptlrpcd_start(&ptlrpcd_rcv);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ for (i = 0; i < ncpts; i++) {
+ if (cpts == NULL)
+ cpt = i;
+ else
+ cpt = cpts[i];
+
+ nthreads = cfs_cpt_weight(cptable, cpt);
+ if (ptlrpcd_per_cpt_max > 0 && ptlrpcd_per_cpt_max < nthreads)
+ nthreads = ptlrpcd_per_cpt_max;
+ if (nthreads < 2)
+ nthreads = 2;
+
+ if (ptlrpcd_partner_group_size <= 0) {
+ groupsize = nthreads;
+ } else if (nthreads <= ptlrpcd_partner_group_size) {
+ groupsize = nthreads;
+ } else {
+ groupsize = ptlrpcd_partner_group_size;
+ if (nthreads % groupsize != 0)
+ nthreads += groupsize - (nthreads % groupsize);
+ }
+
+ size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
+ OBD_CPT_ALLOC(pd, cptable, cpt, size);
+ if (!pd)
+ GOTO(out, rc = -ENOMEM);
+ pd->pd_size = size;
+ pd->pd_index = i;
+ pd->pd_cpt = cpt;
+ pd->pd_cursor = 0;
+ pd->pd_nthreads = nthreads;
+ pd->pd_groupsize = groupsize;
+ ptlrpcds[i] = pd;
+
+ /*
+ * The ptlrpcd threads in a partner group can access
+ * each other's struct ptlrpcd_ctl, so these must be
+ * initialized before any thead is started.
+ */
+ for (j = 0; j < nthreads; j++) {
+ ptlrpcd_ctl_init(&pd->pd_threads[j], j, cpt);
+ rc = ptlrpcd_partners(pd, j);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
+
+ /* XXX: We start nthreads ptlrpc daemons on this cpt.
+ * Each of them can process any non-recovery
+ * async RPC to improve overall async RPC
+ * efficiency.
+ *
+ * But there are some issues with async I/O RPCs
+ * and async non-I/O RPCs processed in the same
+ * set under some cases. The ptlrpcd may be
+ * blocked by some async I/O RPC(s), then will
+ * cause other async non-I/O RPC(s) can not be
+ * processed in time.
+ *
+ * Maybe we should distinguish blocked async RPCs
+ * from non-blocked async RPCs, and process them
+ * in different ptlrpcd sets to avoid unnecessary
+ * dependency. But how to distribute async RPCs
+ * load among all the ptlrpc daemons becomes
+ * another trouble.
+ */
+ for (j = 0; j < nthreads; j++) {
+ rc = ptlrpcd_start(&pd->pd_threads[j]);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
+ }