+out:
+ if (pc->pc_npartners > 0) {
+ LASSERT(pc->pc_partners != NULL);
+
+ OBD_FREE_PTR_ARRAY(pc->pc_partners, pc->pc_npartners);
+ pc->pc_partners = NULL;
+ }
+ pc->pc_npartners = 0;
+ pc->pc_error = 0;
+ EXIT;
+}
+
+static void ptlrpcd_fini(void)
+{
+ int i;
+ int j;
+ int ncpts;
+
+ ENTRY;
+
+ if (ptlrpcds != NULL) {
+ for (i = 0; i < ptlrpcds_num; i++) {
+ if (ptlrpcds[i] == NULL)
+ break;
+ for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
+ ptlrpcd_stop(&ptlrpcds[i]->pd_threads[j], 0);
+ for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
+ ptlrpcd_free(&ptlrpcds[i]->pd_threads[j]);
+ OBD_FREE(ptlrpcds[i], ptlrpcds[i]->pd_size);
+ ptlrpcds[i] = NULL;
+ }
+ OBD_FREE_PTR_ARRAY(ptlrpcds, ptlrpcds_num);
+ }
+ ptlrpcds_num = 0;
+
+ ptlrpcd_stop(&ptlrpcd_rcv, 0);
+ ptlrpcd_free(&ptlrpcd_rcv);
+
+ if (ptlrpcds_cpt_idx != NULL) {
+ ncpts = cfs_cpt_number(cfs_cpt_tab);
+ OBD_FREE_PTR_ARRAY(ptlrpcds_cpt_idx, ncpts);
+ ptlrpcds_cpt_idx = NULL;
+ }
+
+ EXIT;
+}
+
+static int ptlrpcd_init(void)
+{
+ int nthreads;
+ int groupsize;
+ int size;
+ int i;
+ int j;
+ int rc = 0;
+ struct cfs_cpt_table *cptable;
+ __u32 *cpts = NULL;
+ int ncpts;
+ int cpt;
+ struct ptlrpcd *pd;
+
+ ENTRY;
+
+ /*
+ * Determine the CPTs that ptlrpcd threads will run on.
+ */
+ cptable = cfs_cpt_tab;
+ ncpts = cfs_cpt_number(cptable);
+ if (ptlrpcd_cpts != NULL) {
+ struct cfs_expr_list *el;
+
+ size = ncpts * sizeof(ptlrpcds_cpt_idx[0]);
+ OBD_ALLOC(ptlrpcds_cpt_idx, size);
+ if (ptlrpcds_cpt_idx == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ rc = cfs_expr_list_parse(ptlrpcd_cpts,
+ strlen(ptlrpcd_cpts),
+ 0, ncpts - 1, &el);
+ if (rc != 0) {
+ CERROR("%s: invalid CPT pattern string: %s",
+ "ptlrpcd_cpts", ptlrpcd_cpts);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ rc = cfs_expr_list_values(el, ncpts, &cpts);
+ cfs_expr_list_free(el);
+ if (rc <= 0) {
+ CERROR("%s: failed to parse CPT array %s: %d\n",
+ "ptlrpcd_cpts", ptlrpcd_cpts, rc);
+ if (rc == 0)
+ rc = -EINVAL;
+ GOTO(out, rc);
+ }
+
+ /*
+ * Create the cpt-to-index map. When there is no match
+ * in the cpt table, pick a cpt at random. This could
+ * be changed to take the topology of the system into
+ * account.
+ */
+ for (cpt = 0; cpt < ncpts; cpt++) {
+ for (i = 0; i < rc; i++)
+ if (cpts[i] == cpt)
+ break;
+ if (i >= rc)
+ i = cpt % rc;
+ ptlrpcds_cpt_idx[cpt] = i;
+ }
+
+ cfs_expr_list_values_free(cpts, rc);
+ ncpts = rc;
+ }
+ ptlrpcds_num = ncpts;
+
+ size = ncpts * sizeof(ptlrpcds[0]);
+ OBD_ALLOC(ptlrpcds, size);
+ if (ptlrpcds == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ /*
+ * The max_ptlrpcds parameter is obsolete, but do something
+ * sane if it has been tuned, and complain if
+ * ptlrpcd_per_cpt_max has also been tuned.
+ */
+ if (max_ptlrpcds != 0) {
+ CWARN("max_ptlrpcds is obsolete.\n");
+ if (ptlrpcd_per_cpt_max == 0) {
+ ptlrpcd_per_cpt_max = max_ptlrpcds / ncpts;
+ /* Round up if there is a remainder. */
+ if (max_ptlrpcds % ncpts != 0)
+ ptlrpcd_per_cpt_max++;
+ CWARN("Setting ptlrpcd_per_cpt_max = %d\n",
+ ptlrpcd_per_cpt_max);
+ } else {
+ CWARN("ptlrpd_per_cpt_max is also set!\n");
+ }
+ }
+
+ /*
+ * The ptlrpcd_bind_policy parameter is obsolete, but do
+ * something sane if it has been tuned, and complain if
+ * ptlrpcd_partner_group_size is also tuned.
+ */
+ if (ptlrpcd_bind_policy != 0) {
+ CWARN("ptlrpcd_bind_policy is obsolete.\n");
+ if (ptlrpcd_partner_group_size == 0) {
+ switch (ptlrpcd_bind_policy) {
+ case 1: /* PDB_POLICY_NONE */
+ case 2: /* PDB_POLICY_FULL */
+ ptlrpcd_partner_group_size = 1;
+ break;
+ case 3: /* PDB_POLICY_PAIR */
+ ptlrpcd_partner_group_size = 2;
+ break;
+ case 4: /* PDB_POLICY_NEIGHBOR */
+#ifdef CONFIG_NUMA
+ ptlrpcd_partner_group_size = -1; /* CPT */
+#else
+ ptlrpcd_partner_group_size = 3; /* Triplets */
+#endif
+ break;
+ default: /* Illegal value, use the default. */
+ ptlrpcd_partner_group_size = 2;
+ break;
+ }
+ CWARN("Setting ptlrpcd_partner_group_size = %d\n",
+ ptlrpcd_partner_group_size);
+ } else {
+ CWARN("ptlrpcd_partner_group_size is also set!\n");
+ }
+ }
+
+ if (ptlrpcd_partner_group_size == 0)
+ ptlrpcd_partner_group_size = 2;
+ else if (ptlrpcd_partner_group_size < 0)
+ ptlrpcd_partner_group_size = -1;
+ else if (ptlrpcd_per_cpt_max > 0 &&
+ ptlrpcd_partner_group_size > ptlrpcd_per_cpt_max)
+ ptlrpcd_partner_group_size = ptlrpcd_per_cpt_max;
+
+ /*
+ * Start the recovery thread first.
+ */
+ set_bit(LIOD_RECOVERY, &ptlrpcd_rcv.pc_flags);
+ ptlrpcd_ctl_init(&ptlrpcd_rcv, -1, CFS_CPT_ANY);
+ rc = ptlrpcd_start(&ptlrpcd_rcv);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ for (i = 0; i < ncpts; i++) {
+ if (cpts == NULL)
+ cpt = i;
+ else
+ cpt = cpts[i];
+
+ nthreads = cfs_cpt_weight(cptable, cpt);
+ if (ptlrpcd_per_cpt_max > 0 && ptlrpcd_per_cpt_max < nthreads)
+ nthreads = ptlrpcd_per_cpt_max;
+ if (nthreads < 2)
+ nthreads = 2;
+
+ if (ptlrpcd_partner_group_size <= 0) {
+ groupsize = nthreads;
+ } else if (nthreads <= ptlrpcd_partner_group_size) {
+ groupsize = nthreads;
+ } else {
+ groupsize = ptlrpcd_partner_group_size;
+ if (nthreads % groupsize != 0)
+ nthreads += groupsize - (nthreads % groupsize);
+ }
+
+ size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
+ OBD_CPT_ALLOC(pd, cptable, cpt, size);
+
+ if (!pd)
+ GOTO(out, rc = -ENOMEM);
+ pd->pd_size = size;
+ pd->pd_index = i;
+ pd->pd_cpt = cpt;
+ pd->pd_cursor = 0;
+ pd->pd_nthreads = nthreads;
+ pd->pd_groupsize = groupsize;
+ ptlrpcds[i] = pd;
+
+ /*
+ * The ptlrpcd threads in a partner group can access
+ * each other's struct ptlrpcd_ctl, so these must be
+ * initialized before any thead is started.
+ */
+ for (j = 0; j < nthreads; j++) {
+ ptlrpcd_ctl_init(&pd->pd_threads[j], j, cpt);
+ rc = ptlrpcd_partners(pd, j);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
+
+ /* XXX: We start nthreads ptlrpc daemons on this cpt.
+ * Each of them can process any non-recovery
+ * async RPC to improve overall async RPC
+ * efficiency.
+ *
+ * But there are some issues with async I/O RPCs
+ * and async non-I/O RPCs processed in the same
+ * set under some cases. The ptlrpcd may be
+ * blocked by some async I/O RPC(s), then will
+ * cause other async non-I/O RPC(s) can not be
+ * processed in time.
+ *
+ * Maybe we should distinguish blocked async RPCs
+ * from non-blocked async RPCs, and process them
+ * in different ptlrpcd sets to avoid unnecessary
+ * dependency. But how to distribute async RPCs
+ * load among all the ptlrpc daemons becomes
+ * another trouble.
+ */
+ for (j = 0; j < nthreads; j++) {
+ rc = ptlrpcd_start(&pd->pd_threads[j]);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
+ }
+out:
+ if (rc != 0)
+ ptlrpcd_fini();