*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_RPC
-#ifdef __KERNEL__
-# include <libcfs/libcfs.h>
-#else /* __KERNEL__ */
-# include <liblustre.h>
-# include <ctype.h>
-#endif
-
+#include <linux/kthread.h>
+#include <libcfs/libcfs.h>
#include <lustre_net.h>
-# include <lustre_lib.h>
-
+#include <lustre_lib.h>
#include <lustre_ha.h>
#include <obd_class.h> /* for obd_zombie */
#include <obd_support.h> /* for OBD_FAIL_CHECK */
#include "ptlrpc_internal.h"
+/* One of these per CPT. */
struct ptlrpcd {
- int pd_size;
- int pd_index;
- int pd_nthreads;
- struct ptlrpcd_ctl pd_thread_rcv;
- struct ptlrpcd_ctl pd_threads[0];
+ int pd_size;
+ int pd_index;
+ int pd_cpt;
+ int pd_cursor;
+ int pd_nthreads;
+ int pd_groupsize;
+ struct ptlrpcd_ctl pd_threads[0];
};
-#ifdef __KERNEL__
+/*
+ * max_ptlrpcds is obsolete, but retained to ensure that the kernel
+ * module will load on a system where it has been tuned.
+ * A value other than 0 implies it was tuned, in which case the value
+ * is used to derive a setting for ptlrpcd_per_cpt_max.
+ */
static int max_ptlrpcds;
-CFS_MODULE_PARM(max_ptlrpcds, "i", int, 0644,
- "Max ptlrpcd thread count to be started.");
+module_param(max_ptlrpcds, int, 0644);
+MODULE_PARM_DESC(max_ptlrpcds,
+ "Max ptlrpcd thread count to be started (obsolete).");
-static int ptlrpcd_bind_policy = PDB_POLICY_PAIR;
-CFS_MODULE_PARM(ptlrpcd_bind_policy, "i", int, 0644,
- "Ptlrpcd threads binding mode.");
-#endif
-static struct ptlrpcd *ptlrpcds;
+/*
+ * ptlrpcd_bind_policy is obsolete, but retained to ensure that
+ * the kernel module will load on a system where it has been tuned.
+ * A value other than 0 implies it was tuned, in which case the value
+ * is used to derive a setting for ptlrpcd_partner_group_size.
+ */
+static int ptlrpcd_bind_policy;
+module_param(ptlrpcd_bind_policy, int, 0644);
+MODULE_PARM_DESC(ptlrpcd_bind_policy,
+ "Ptlrpcd threads binding mode (obsolete).");
+
+/*
+ * ptlrpcd_per_cpt_max: The maximum number of ptlrpcd threads to run
+ * in a CPT.
+ */
+static int ptlrpcd_per_cpt_max;
+module_param(ptlrpcd_per_cpt_max, int, 0644);
+MODULE_PARM_DESC(ptlrpcd_per_cpt_max,
+ "Max ptlrpcd thread count to be started per CPT.");
+
+/*
+ * ptlrpcd_partner_group_size: The desired number of threads in each
+ * ptlrpcd partner thread group. Default is 2, corresponding to the
+ * old PDB_POLICY_PAIR. A negative value makes all ptlrpcd threads in
+ * a CPT partners of each other.
+ */
+static int ptlrpcd_partner_group_size;
+module_param(ptlrpcd_partner_group_size, int, 0644);
+MODULE_PARM_DESC(ptlrpcd_partner_group_size,
+ "Number of ptlrpcd threads in a partner group.");
+
+/*
+ * ptlrpcd_cpts: A CPT string describing the CPU partitions that
+ * ptlrpcd threads should run on. Used to make ptlrpcd threads run on
+ * a subset of all CPTs.
+ *
+ * ptlrpcd_cpts=2
+ * ptlrpcd_cpts=[2]
+ * run ptlrpcd threads only on CPT 2.
+ *
+ * ptlrpcd_cpts=0-3
+ * ptlrpcd_cpts=[0-3]
+ * run ptlrpcd threads on CPTs 0, 1, 2, and 3.
+ *
+ * ptlrpcd_cpts=[0-3,5,7]
+ * run ptlrpcd threads on CPTS 0, 1, 2, 3, 5, and 7.
+ */
+static char *ptlrpcd_cpts;
+module_param(ptlrpcd_cpts, charp, 0644);
+MODULE_PARM_DESC(ptlrpcd_cpts,
+ "CPU partitions ptlrpcd threads should run in");
+
+/* ptlrpcds_cpt_idx maps cpt numbers to an index in the ptlrpcds array. */
+static int *ptlrpcds_cpt_idx;
+
+/* ptlrpcds_num is the number of entries in the ptlrpcds array. */
+static int ptlrpcds_num;
+static struct ptlrpcd **ptlrpcds;
+
+/*
+ * In addition to the regular thread pool above, there is a single
+ * global recovery thread. Recovery isn't critical for performance,
+ * and doesn't block, but must always be able to proceed, and it is
+ * possible that all normal ptlrpcd threads are blocked. Hence the
+ * need for a dedicated thread.
+ */
+static struct ptlrpcd_ctl ptlrpcd_rcv;
struct mutex ptlrpcd_mutex;
static int ptlrpcd_users = 0;
void ptlrpcd_wake(struct ptlrpc_request *req)
{
- struct ptlrpc_request_set *rq_set = req->rq_set;
-
- LASSERT(rq_set != NULL);
+ struct ptlrpc_request_set *set = req->rq_set;
- wake_up(&rq_set->set_waitq);
+ LASSERT(set != NULL);
+ wake_up(&set->set_waitq);
}
EXPORT_SYMBOL(ptlrpcd_wake);
static struct ptlrpcd_ctl *
-ptlrpcd_select_pc(struct ptlrpc_request *req, pdl_policy_t policy, int index)
+ptlrpcd_select_pc(struct ptlrpc_request *req)
{
- int idx = 0;
-
- if (req != NULL && req->rq_send_state != LUSTRE_IMP_FULL)
- return &ptlrpcds->pd_thread_rcv;
-
-#ifdef __KERNEL__
- switch (policy) {
- case PDL_POLICY_SAME:
- idx = smp_processor_id() % ptlrpcds->pd_nthreads;
- break;
- case PDL_POLICY_LOCAL:
- /* Before CPU partition patches available, process it the same
- * as "PDL_POLICY_ROUND". */
-# ifdef CFS_CPU_MODE_NUMA
-# warning "fix this code to use new CPU partition APIs"
-# endif
- /* Fall through to PDL_POLICY_ROUND until the CPU
- * CPU partition patches are available. */
- index = -1;
- case PDL_POLICY_PREFERRED:
- if (index >= 0 && index < num_online_cpus()) {
- idx = index % ptlrpcds->pd_nthreads;
- break;
- }
- /* Fall through to PDL_POLICY_ROUND for bad index. */
- default:
- /* Fall through to PDL_POLICY_ROUND for unknown policy. */
- case PDL_POLICY_ROUND:
- /* We do not care whether it is strict load balance. */
- idx = ptlrpcds->pd_index + 1;
- if (idx == smp_processor_id())
- idx++;
- idx %= ptlrpcds->pd_nthreads;
- ptlrpcds->pd_index = idx;
- break;
- }
-#endif /* __KERNEL__ */
-
- return &ptlrpcds->pd_threads[idx];
+ struct ptlrpcd *pd;
+ int cpt;
+ int idx;
+
+ if (req != NULL && req->rq_send_state != LUSTRE_IMP_FULL)
+ return &ptlrpcd_rcv;
+
+ cpt = cfs_cpt_current(cfs_cpt_tab, 1);
+ if (ptlrpcds_cpt_idx == NULL)
+ idx = cpt;
+ else
+ idx = ptlrpcds_cpt_idx[cpt];
+ pd = ptlrpcds[idx];
+
+ /* We do not care whether it is strict load balance. */
+ idx = pd->pd_cursor;
+ if (++idx == pd->pd_nthreads)
+ idx = 0;
+ pd->pd_cursor = idx;
+
+ return &pd->pd_threads[idx];
}
/**
*/
void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
{
- cfs_list_t *tmp, *pos;
-#ifdef __KERNEL__
- struct ptlrpcd_ctl *pc;
- struct ptlrpc_request_set *new;
- int count, i;
-
- pc = ptlrpcd_select_pc(NULL, PDL_POLICY_LOCAL, -1);
- new = pc->pc_set;
-#endif
+ struct list_head *tmp, *pos;
+ struct ptlrpcd_ctl *pc;
+ struct ptlrpc_request_set *new;
+ int count, i;
- cfs_list_for_each_safe(pos, tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- cfs_list_entry(pos, struct ptlrpc_request,
- rq_set_chain);
+ pc = ptlrpcd_select_pc(NULL);
+ new = pc->pc_set;
- LASSERT(req->rq_phase == RQ_PHASE_NEW);
-#ifdef __KERNEL__
- req->rq_set = new;
- req->rq_queued_time = cfs_time_current();
-#else
- cfs_list_del_init(&req->rq_set_chain);
- req->rq_set = NULL;
- ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
- cfs_atomic_dec(&set->set_remaining);
-#endif
- }
+ list_for_each_safe(pos, tmp, &set->set_requests) {
+ struct ptlrpc_request *req =
+ list_entry(pos, struct ptlrpc_request,
+ rq_set_chain);
+
+ LASSERT(req->rq_phase == RQ_PHASE_NEW);
+ req->rq_set = new;
+ req->rq_queued_time = ktime_get_seconds();
+ }
-#ifdef __KERNEL__
spin_lock(&new->set_new_req_lock);
- cfs_list_splice_init(&set->set_requests, &new->set_new_requests);
- i = cfs_atomic_read(&set->set_remaining);
- count = cfs_atomic_add_return(i, &new->set_new_count);
- cfs_atomic_set(&set->set_remaining, 0);
+ list_splice_init(&set->set_requests, &new->set_new_requests);
+ i = atomic_read(&set->set_remaining);
+ count = atomic_add_return(i, &new->set_new_count);
+ atomic_set(&set->set_remaining, 0);
spin_unlock(&new->set_new_req_lock);
if (count == i) {
wake_up(&new->set_waitq);
- /* XXX: It maybe unnecessary to wakeup all the partners. But to
+ /*
+ * XXX: It maybe unnecessary to wakeup all the partners. But to
* guarantee the async RPC can be processed ASAP, we have
- * no other better choice. It maybe fixed in future. */
+ * no other better choice. It maybe fixed in future.
+ */
for (i = 0; i < pc->pc_npartners; i++)
wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
}
-#endif
}
-EXPORT_SYMBOL(ptlrpcd_add_rqset);
-#ifdef __KERNEL__
/**
* Return transferred RPCs count.
*/
static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
- struct ptlrpc_request_set *src)
+ struct ptlrpc_request_set *src)
{
- cfs_list_t *tmp, *pos;
- struct ptlrpc_request *req;
- int rc = 0;
+ struct ptlrpc_request *req;
+ int rc = 0;
spin_lock(&src->set_new_req_lock);
- if (likely(!cfs_list_empty(&src->set_new_requests))) {
- cfs_list_for_each_safe(pos, tmp, &src->set_new_requests) {
- req = cfs_list_entry(pos, struct ptlrpc_request,
- rq_set_chain);
- req->rq_set = des;
- }
- cfs_list_splice_init(&src->set_new_requests,
- &des->set_requests);
- rc = cfs_atomic_read(&src->set_new_count);
- cfs_atomic_add(rc, &des->set_remaining);
- cfs_atomic_set(&src->set_new_count, 0);
- }
+ if (likely(!list_empty(&src->set_new_requests))) {
+ list_for_each_entry(req, &src->set_new_requests, rq_set_chain)
+ req->rq_set = des;
+
+ list_splice_init(&src->set_new_requests,
+ &des->set_requests);
+ rc = atomic_read(&src->set_new_count);
+ atomic_add(rc, &des->set_remaining);
+ atomic_set(&src->set_new_count, 0);
+ }
spin_unlock(&src->set_new_req_lock);
return rc;
}
-#endif
/**
* Requests that are added to the ptlrpcd queue are sent via
* ptlrpcd_check->ptlrpc_check_set().
*/
-void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx)
+void ptlrpcd_add_req(struct ptlrpc_request *req)
{
- struct ptlrpcd_ctl *pc;
+ struct ptlrpcd_ctl *pc;
if (req->rq_reqmsg)
lustre_msg_set_jobid(req->rq_reqmsg, NULL);
spin_lock(&req->rq_lock);
- if (req->rq_invalid_rqset) {
- struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5),
- back_to_sleep, NULL);
-
- req->rq_invalid_rqset = 0;
+ if (req->rq_invalid_rqset) {
+ req->rq_invalid_rqset = 0;
spin_unlock(&req->rq_lock);
- l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi);
- } else if (req->rq_set) {
- /* If we have a vaid "rq_set", just reuse it to avoid double
- * linked. */
- LASSERT(req->rq_phase == RQ_PHASE_NEW);
- LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
-
- /* ptlrpc_check_set will decrease the count */
- cfs_atomic_inc(&req->rq_set->set_remaining);
+ if (wait_event_idle_timeout(req->rq_set_waitq,
+ req->rq_set == NULL,
+ cfs_time_seconds(5)) == 0)
+ l_wait_event_abortable(req->rq_set_waitq,
+ req->rq_set == NULL);
+ } else if (req->rq_set) {
+ /*
+ * If we have a vaid "rq_set", just reuse it to avoid double
+ * linked.
+ */
+ LASSERT(req->rq_phase == RQ_PHASE_NEW);
+ LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
+
+ /* ptlrpc_check_set will decrease the count */
+ atomic_inc(&req->rq_set->set_remaining);
spin_unlock(&req->rq_lock);
wake_up(&req->rq_set->set_waitq);
return;
} else {
spin_unlock(&req->rq_lock);
- }
+ }
- pc = ptlrpcd_select_pc(req, policy, idx);
+ pc = ptlrpcd_select_pc(req);
- DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]",
- req, pc->pc_name, pc->pc_index);
+ DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s+%d]",
+ req, pc->pc_name, pc->pc_index);
- ptlrpc_set_add_new_req(pc, req);
+ ptlrpc_set_add_new_req(pc, req);
}
EXPORT_SYMBOL(ptlrpcd_add_req);
static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set)
{
- cfs_atomic_inc(&set->set_refcount);
+ atomic_inc(&set->set_refcount);
}
/**
*/
static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
{
- cfs_list_t *tmp, *pos;
- struct ptlrpc_request *req;
- struct ptlrpc_request_set *set = pc->pc_set;
- int rc = 0;
- int rc2;
- ENTRY;
-
- if (cfs_atomic_read(&set->set_new_count)) {
+ struct ptlrpc_request *req, *tmp;
+ struct ptlrpc_request_set *set = pc->pc_set;
+ int rc = 0;
+ int rc2;
+
+ ENTRY;
+
+ if (atomic_read(&set->set_new_count)) {
spin_lock(&set->set_new_req_lock);
- if (likely(!cfs_list_empty(&set->set_new_requests))) {
- cfs_list_splice_init(&set->set_new_requests,
- &set->set_requests);
- cfs_atomic_add(cfs_atomic_read(&set->set_new_count),
- &set->set_remaining);
- cfs_atomic_set(&set->set_new_count, 0);
- /*
- * Need to calculate its timeout.
- */
- rc = 1;
- }
+ if (likely(!list_empty(&set->set_new_requests))) {
+ list_splice_init(&set->set_new_requests,
+ &set->set_requests);
+ atomic_add(atomic_read(&set->set_new_count),
+ &set->set_remaining);
+ atomic_set(&set->set_new_count, 0);
+ /*
+ * Need to calculate its timeout.
+ */
+ rc = 1;
+ }
spin_unlock(&set->set_new_req_lock);
- }
-
- /* We should call lu_env_refill() before handling new requests to make
- * sure that env key the requests depending on really exists.
- */
- rc2 = lu_env_refill(env);
- if (rc2 != 0) {
- /*
- * XXX This is very awkward situation, because
- * execution can neither continue (request
- * interpreters assume that env is set up), nor repeat
- * the loop (as this potentially results in a tight
- * loop of -ENOMEM's).
- *
- * Fortunately, refill only ever does something when
- * new modules are loaded, i.e., early during boot up.
- */
- CERROR("Failure to refill session: %d\n", rc2);
- RETURN(rc);
- }
-
- if (cfs_atomic_read(&set->set_remaining))
- rc |= ptlrpc_check_set(env, set);
-
- if (!cfs_list_empty(&set->set_requests)) {
- /*
- * XXX: our set never completes, so we prune the completed
- * reqs after each iteration. boy could this be smarter.
- */
- cfs_list_for_each_safe(pos, tmp, &set->set_requests) {
- req = cfs_list_entry(pos, struct ptlrpc_request,
- rq_set_chain);
- if (req->rq_phase != RQ_PHASE_COMPLETE)
- continue;
-
- cfs_list_del_init(&req->rq_set_chain);
- req->rq_set = NULL;
- ptlrpc_req_finished(req);
- }
- }
-
- if (rc == 0) {
- /*
- * If new requests have been added, make sure to wake up.
- */
- rc = cfs_atomic_read(&set->set_new_count);
-
-#ifdef __KERNEL__
- /* If we have nothing to do, check whether we can take some
- * work from our partner threads. */
- if (rc == 0 && pc->pc_npartners > 0) {
- struct ptlrpcd_ctl *partner;
- struct ptlrpc_request_set *ps;
- int first = pc->pc_cursor;
-
- do {
- partner = pc->pc_partners[pc->pc_cursor++];
- if (pc->pc_cursor >= pc->pc_npartners)
- pc->pc_cursor = 0;
- if (partner == NULL)
- continue;
+ }
+
+ /*
+ * We should call lu_env_refill() before handling new requests to make
+ * sure that env key the requests depending on really exists.
+ */
+ rc2 = lu_env_refill(env);
+ if (rc2 != 0) {
+ /*
+ * XXX This is very awkward situation, because
+ * execution can neither continue (request
+ * interpreters assume that env is set up), nor repeat
+ * the loop (as this potentially results in a tight
+ * loop of -ENOMEM's).
+ *
+ * Fortunately, refill only ever does something when
+ * new modules are loaded, i.e., early during boot up.
+ */
+ CERROR("Failure to refill session: %d\n", rc2);
+ RETURN(rc);
+ }
+
+ if (atomic_read(&set->set_remaining))
+ rc |= ptlrpc_check_set(env, set);
+
+ /*
+ * NB: ptlrpc_check_set has already moved complted request at the
+ * head of seq::set_requests
+ */
+ list_for_each_entry_safe(req, tmp, &set->set_requests, rq_set_chain) {
+ if (req->rq_phase != RQ_PHASE_COMPLETE)
+ break;
+
+ list_del_init(&req->rq_set_chain);
+ req->rq_set = NULL;
+ ptlrpc_req_finished(req);
+ }
+
+ if (rc == 0) {
+ /*
+ * If new requests have been added, make sure to wake up.
+ */
+ rc = atomic_read(&set->set_new_count);
+
+ /*
+ * If we have nothing to do, check whether we can take some
+ * work from our partner threads.
+ */
+ if (rc == 0 && pc->pc_npartners > 0) {
+ struct ptlrpcd_ctl *partner;
+ struct ptlrpc_request_set *ps;
+ int first = pc->pc_cursor;
+
+ do {
+ partner = pc->pc_partners[pc->pc_cursor++];
+ if (pc->pc_cursor >= pc->pc_npartners)
+ pc->pc_cursor = 0;
+ if (partner == NULL)
+ continue;
spin_lock(&partner->pc_lock);
ps = partner->pc_set;
ptlrpc_reqset_get(ps);
spin_unlock(&partner->pc_lock);
- if (cfs_atomic_read(&ps->set_new_count)) {
- rc = ptlrpcd_steal_rqset(set, ps);
- if (rc > 0)
- CDEBUG(D_RPCTRACE, "transfer %d"
- " async RPCs [%d->%d]\n",
- rc, partner->pc_index,
- pc->pc_index);
- }
- ptlrpc_reqset_put(ps);
- } while (rc == 0 && pc->pc_cursor != first);
- }
-#endif
- }
+ if (atomic_read(&ps->set_new_count)) {
+ rc = ptlrpcd_steal_rqset(set, ps);
+ if (rc > 0)
+ CDEBUG(D_RPCTRACE,
+ "transfer %d async RPCs [%d->%d]\n",
+ rc, partner->pc_index,
+ pc->pc_index);
+ }
+ ptlrpc_reqset_put(ps);
+ } while (rc == 0 && pc->pc_cursor != first);
+ }
+ }
- RETURN(rc);
+ RETURN(rc || test_bit(LIOD_STOP, &pc->pc_flags));
}
-#ifdef __KERNEL__
/**
* Main ptlrpcd thread.
* ptlrpc's code paths like to execute in process context, so we have this
* thread which spins on a set which contains the rpcs and sends them.
- *
*/
static int ptlrpcd(void *arg)
{
- struct ptlrpcd_ctl *pc = arg;
- struct ptlrpc_request_set *set = pc->pc_set;
- struct lu_env env = { .le_ses = NULL };
- int rc, exit = 0;
- ENTRY;
-
- unshare_fs_struct();
-#if defined(CONFIG_SMP)
- if (test_bit(LIOD_BIND, &pc->pc_flags)) {
- int index = pc->pc_index;
-
- if (index >= 0 && index < num_possible_cpus()) {
- while (!cpu_online(index)) {
- if (++index >= num_possible_cpus())
- index = 0;
- }
- set_cpus_allowed_ptr(current,
- cpumask_of_node(cpu_to_node(index)));
- }
+ struct ptlrpcd_ctl *pc = arg;
+ struct ptlrpc_request_set *set;
+ struct lu_context ses = { 0 };
+ struct lu_env env = { .le_ses = &ses };
+ int rc = 0;
+ int exit = 0;
+
+ ENTRY;
+ if (cfs_cpt_bind(cfs_cpt_tab, pc->pc_cpt) != 0)
+ CWARN("Failed to bind %s on CPT %d\n", pc->pc_name, pc->pc_cpt);
+
+ /*
+ * Allocate the request set after the thread has been bound
+ * above. This is safe because no requests will be queued
+ * until all ptlrpcd threads have confirmed that they have
+ * successfully started.
+ */
+ set = ptlrpc_prep_set();
+ if (set == NULL)
+ GOTO(failed, rc = -ENOMEM);
+ spin_lock(&pc->pc_lock);
+ pc->pc_set = set;
+ spin_unlock(&pc->pc_lock);
+
+ /* Both client and server (MDT/OST) may use the environment. */
+ rc = lu_context_init(&env.le_ctx, LCT_MD_THREAD |
+ LCT_DT_THREAD |
+ LCT_CL_THREAD |
+ LCT_REMEMBER |
+ LCT_NOREF);
+ if (rc != 0)
+ GOTO(failed, rc);
+ rc = lu_context_init(env.le_ses, LCT_SESSION |
+ LCT_REMEMBER |
+ LCT_NOREF);
+ if (rc != 0) {
+ lu_context_fini(&env.le_ctx);
+ GOTO(failed, rc);
}
-#endif
- /*
- * XXX So far only "client" ptlrpcd uses an environment. In
- * the future, ptlrpcd thread (or a thread-set) has to given
- * an argument, describing its "scope".
- */
- rc = lu_context_init(&env.le_ctx,
- LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
+
complete(&pc->pc_starting);
- if (rc != 0)
- RETURN(rc);
-
- /*
- * This mainloop strongly resembles ptlrpc_set_wait() except that our
- * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when
- * there are requests in the set. New requests come in on the set's
- * new_req_list and ptlrpcd_check() moves them into the set.
- */
- do {
- struct l_wait_info lwi;
- int timeout;
-
- timeout = ptlrpc_set_next_timeout(set);
- lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
- ptlrpc_expired_set, set);
-
- lu_context_enter(&env.le_ctx);
- l_wait_event(set->set_waitq,
- ptlrpcd_check(&env, pc), &lwi);
- lu_context_exit(&env.le_ctx);
-
- /*
- * Abort inflight rpcs for forced stop case.
- */
+ /*
+ * This mainloop strongly resembles ptlrpc_set_wait() except that our
+ * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when
+ * there are requests in the set. New requests come in on the set's
+ * new_req_list and ptlrpcd_check() moves them into the set.
+ */
+ do {
+ time64_t timeout;
+
+ timeout = ptlrpc_set_next_timeout(set);
+
+ lu_context_enter(&env.le_ctx);
+ lu_context_enter(env.le_ses);
+ if (timeout == 0)
+ wait_event_idle(set->set_waitq,
+ ptlrpcd_check(&env, pc));
+ else if (wait_event_idle_timeout(set->set_waitq,
+ ptlrpcd_check(&env, pc),
+ cfs_time_seconds(timeout))
+ == 0)
+ ptlrpc_expired_set(set);
+ lu_context_exit(&env.le_ctx);
+ lu_context_exit(env.le_ses);
+
+ /*
+ * Abort inflight rpcs for forced stop case.
+ */
if (test_bit(LIOD_STOP, &pc->pc_flags)) {
if (test_bit(LIOD_FORCE, &pc->pc_flags))
- ptlrpc_abort_set(set);
- exit++;
- }
-
- /*
- * Let's make one more loop to make sure that ptlrpcd_check()
- * copied all raced new rpcs into the set so we can kill them.
- */
- } while (exit < 2);
-
- /*
- * Wait for inflight requests to drain.
- */
- if (!cfs_list_empty(&set->set_requests))
- ptlrpc_set_wait(set);
- lu_context_fini(&env.le_ctx);
+ ptlrpc_abort_set(set);
+ exit++;
+ }
+
+ /*
+ * Let's make one more loop to make sure that ptlrpcd_check()
+ * copied all raced new rpcs into the set so we can kill them.
+ */
+ } while (exit < 2);
+
+ /*
+ * Wait for inflight requests to drain.
+ */
+ if (!list_empty(&set->set_requests))
+ ptlrpc_set_wait(&env, set);
+ lu_context_fini(&env.le_ctx);
+ lu_context_fini(env.le_ses);
complete(&pc->pc_finishing);
- return 0;
+ return 0;
+
+failed:
+ pc->pc_error = rc;
+ complete(&pc->pc_starting);
+ RETURN(rc);
}
-/* XXX: We want multiple CPU cores to share the async RPC load. So we start many
- * ptlrpcd threads. We also want to reduce the ptlrpcd overhead caused by
- * data transfer cross-CPU cores. So we bind ptlrpcd thread to specified
- * CPU core. But binding all ptlrpcd threads maybe cause response delay
- * because of some CPU core(s) busy with other loads.
- *
- * For example: "ls -l", some async RPCs for statahead are assigned to
- * ptlrpcd_0, and ptlrpcd_0 is bound to CPU_0, but CPU_0 may be quite busy
- * with other non-ptlrpcd, like "ls -l" itself (we want to the "ls -l"
- * thread, statahead thread, and ptlrpcd thread can run in parallel), under
- * such case, the statahead async RPCs can not be processed in time, it is
- * unexpected. If ptlrpcd_0 can be re-scheduled on other CPU core, it may
- * be better. But it breaks former data transfer policy.
- *
- * So we shouldn't be blind for avoiding the data transfer. We make some
- * compromise: divide the ptlrpcd threds pool into two parts. One part is
- * for bound mode, each ptlrpcd thread in this part is bound to some CPU
- * core. The other part is for free mode, all the ptlrpcd threads in the
- * part can be scheduled on any CPU core. We specify some partnership
- * between bound mode ptlrpcd thread(s) and free mode ptlrpcd thread(s),
- * and the async RPC load within the partners are shared.
- *
- * It can partly avoid data transfer cross-CPU (if the bound mode ptlrpcd
- * thread can be scheduled in time), and try to guarantee the async RPC
- * processed ASAP (as long as the free mode ptlrpcd thread can be scheduled
- * on any CPU core).
- *
- * As for how to specify the partnership between bound mode ptlrpcd
- * thread(s) and free mode ptlrpcd thread(s), the simplest way is to use
- * <free bound> pair. In future, we can specify some more complex
- * partnership based on the patches for CPU partition. But before such
- * patches are available, we prefer to use the simplest one.
- */
-# ifdef CFS_CPU_MODE_NUMA
-# warning "fix ptlrpcd_bind() to use new CPU partition APIs"
-# endif
-static int ptlrpcd_bind(int index, int max)
+static void ptlrpcd_ctl_init(struct ptlrpcd_ctl *pc, int index, int cpt)
{
- struct ptlrpcd_ctl *pc;
- int rc = 0;
-#if defined(CONFIG_NUMA)
- cpumask_t mask;
-#endif
ENTRY;
- LASSERT(index <= max - 1);
- pc = &ptlrpcds->pd_threads[index];
- switch (ptlrpcd_bind_policy) {
- case PDB_POLICY_NONE:
- pc->pc_npartners = -1;
- break;
- case PDB_POLICY_FULL:
- pc->pc_npartners = 0;
- set_bit(LIOD_BIND, &pc->pc_flags);
- break;
- case PDB_POLICY_PAIR:
- LASSERT(max % 2 == 0);
- pc->pc_npartners = 1;
- break;
- case PDB_POLICY_NEIGHBOR:
-#if defined(CONFIG_NUMA)
- {
- int i;
- mask = *cpumask_of_node(cpu_to_node(index));
- for (i = max; i < num_online_cpus(); i++)
- cpu_clear(i, mask);
- pc->pc_npartners = cpus_weight(mask) - 1;
- set_bit(LIOD_BIND, &pc->pc_flags);
+ pc->pc_index = index;
+ pc->pc_cpt = cpt;
+ init_completion(&pc->pc_starting);
+ init_completion(&pc->pc_finishing);
+ spin_lock_init(&pc->pc_lock);
+
+ if (index < 0) {
+ /* Recovery thread. */
+ snprintf(pc->pc_name, sizeof(pc->pc_name), "ptlrpcd_rcv");
+ } else {
+ /* Regular thread. */
+ snprintf(pc->pc_name, sizeof(pc->pc_name),
+ "ptlrpcd_%02d_%02d", cpt, index);
}
-#else
- LASSERT(max >= 3);
- pc->pc_npartners = 2;
-#endif
- break;
- default:
- CERROR("unknown ptlrpcd bind policy %d\n", ptlrpcd_bind_policy);
- rc = -EINVAL;
- }
-
- if (rc == 0 && pc->pc_npartners > 0) {
- OBD_ALLOC(pc->pc_partners,
- sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners);
- if (pc->pc_partners == NULL) {
- pc->pc_npartners = 0;
- rc = -ENOMEM;
- } else {
- switch (ptlrpcd_bind_policy) {
- case PDB_POLICY_PAIR:
- if (index & 0x1) {
- set_bit(LIOD_BIND, &pc->pc_flags);
- pc->pc_partners[0] = &ptlrpcds->
- pd_threads[index - 1];
- ptlrpcds->pd_threads[index - 1].
- pc_partners[0] = pc;
- }
- break;
- case PDB_POLICY_NEIGHBOR:
-#if defined(CONFIG_NUMA)
- {
- struct ptlrpcd_ctl *ppc;
- int i, pidx;
- /* partners are cores in the same NUMA node.
- * setup partnership only with ptlrpcd threads
- * that are already initialized
- */
- for (pidx = 0, i = 0; i < index; i++) {
- if (cpu_isset(i, mask)) {
- ppc = &ptlrpcds->pd_threads[i];
- pc->pc_partners[pidx++] = ppc;
- ppc->pc_partners[ppc->
- pc_npartners++] = pc;
- }
- }
- /* adjust number of partners to the number
- * of partnership really setup */
- pc->pc_npartners = pidx;
- }
-#else
- if (index & 0x1)
- set_bit(LIOD_BIND, &pc->pc_flags);
- if (index > 0) {
- pc->pc_partners[0] = &ptlrpcds->
- pd_threads[index - 1];
- ptlrpcds->pd_threads[index - 1].
- pc_partners[1] = pc;
- if (index == max - 1) {
- pc->pc_partners[1] =
- &ptlrpcds->pd_threads[0];
- ptlrpcds->pd_threads[0].
- pc_partners[0] = pc;
- }
- }
-#endif
- break;
- }
- }
- }
- RETURN(rc);
+ EXIT;
}
-#else /* !__KERNEL__ */
-
-/**
- * In liblustre we do not have separate threads, so this function
- * is called from time to time all across common code to see
- * if something needs to be processed on ptlrpcd set.
+/* XXX: We want multiple CPU cores to share the async RPC load. So we
+ * start many ptlrpcd threads. We also want to reduce the ptlrpcd
+ * overhead caused by data transfer cross-CPU cores. So we bind
+ * all ptlrpcd threads to a CPT, in the expectation that CPTs
+ * will be defined in a way that matches these boundaries. Within
+ * a CPT a ptlrpcd thread can be scheduled on any available core.
+ *
+ * Each ptlrpcd thread has its own request queue. This can cause
+ * response delay if the thread is already busy. To help with
+ * this we define partner threads: these are other threads bound
+ * to the same CPT which will check for work in each other's
+ * request queues if they have no work to do.
+ *
+ * The desired number of partner threads can be tuned by setting
+ * ptlrpcd_partner_group_size. The default is to create pairs of
+ * partner threads.
*/
-int ptlrpcd_check_async_rpcs(void *arg)
+static int ptlrpcd_partners(struct ptlrpcd *pd, int index)
{
- struct ptlrpcd_ctl *pc = arg;
- int rc = 0;
-
- /*
- * Single threaded!!
- */
- pc->pc_recurred++;
-
- if (pc->pc_recurred == 1) {
- rc = lu_env_refill(&pc->pc_env);
- if (rc == 0) {
- lu_context_enter(&pc->pc_env.le_ctx);
- rc = ptlrpcd_check(&pc->pc_env, pc);
- if (!rc)
- ptlrpc_expired_set(pc->pc_set);
- /*
- * XXX: send replay requests.
- */
- if (test_bit(LIOD_RECOVERY, &pc->pc_flags))
- rc = ptlrpcd_check(&pc->pc_env, pc);
- lu_context_exit(&pc->pc_env.le_ctx);
- }
- }
-
- pc->pc_recurred--;
- return rc;
-}
+ struct ptlrpcd_ctl *pc;
+ struct ptlrpcd_ctl **ppc;
+ int first;
+ int i;
+ int rc = 0;
-int ptlrpcd_idle(void *arg)
-{
- struct ptlrpcd_ctl *pc = arg;
+ ENTRY;
- return (cfs_atomic_read(&pc->pc_set->set_new_count) == 0 &&
- cfs_atomic_read(&pc->pc_set->set_remaining) == 0);
-}
+ LASSERT(index >= 0 && index < pd->pd_nthreads);
+ pc = &pd->pd_threads[index];
+ pc->pc_npartners = pd->pd_groupsize - 1;
-#endif
+ if (pc->pc_npartners <= 0)
+ GOTO(out, rc);
+
+ OBD_CPT_ALLOC(pc->pc_partners, cfs_cpt_tab, pc->pc_cpt,
+ sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners);
+ if (pc->pc_partners == NULL) {
+ pc->pc_npartners = 0;
+ GOTO(out, rc = -ENOMEM);
+ }
-int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc)
+ first = index - index % pd->pd_groupsize;
+ ppc = pc->pc_partners;
+ for (i = first; i < first + pd->pd_groupsize; i++) {
+ if (i != index)
+ *ppc++ = &pd->pd_threads[i];
+ }
+out:
+ RETURN(rc);
+}
+
+int ptlrpcd_start(struct ptlrpcd_ctl *pc)
{
- int rc;
- ENTRY;
+ struct task_struct *task;
+ int rc = 0;
- /*
- * Do not allow start second thread for one pc.
- */
+ ENTRY;
+
+ /*
+ * Do not allow starting a second thread for one pc.
+ */
if (test_and_set_bit(LIOD_START, &pc->pc_flags)) {
CWARN("Starting second thread (%s) for same pc %p\n",
- name, pc);
+ pc->pc_name, pc);
RETURN(0);
}
- pc->pc_index = index;
- init_completion(&pc->pc_starting);
- init_completion(&pc->pc_finishing);
- spin_lock_init(&pc->pc_lock);
- strncpy(pc->pc_name, name, sizeof(pc->pc_name) - 1);
- pc->pc_set = ptlrpc_prep_set();
- if (pc->pc_set == NULL)
- GOTO(out, rc = -ENOMEM);
-
-#ifndef __KERNEL__
- pc->pc_wait_callback =
- liblustre_register_wait_callback("ptlrpcd_check_async_rpcs",
- &ptlrpcd_check_async_rpcs, pc);
- pc->pc_idle_callback =
- liblustre_register_idle_callback("ptlrpcd_check_idle_rpcs",
- &ptlrpcd_idle, pc);
- RETURN(0);
-#else
- /*
- * So far only "client" ptlrpcd uses an environment. In the future,
- * ptlrpcd thread (or a thread-set) has to be given an argument,
- * describing its "scope".
- */
- rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER);
- if (rc != 0)
- GOTO(out_set, rc);
+ task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name);
+ if (IS_ERR(task))
+ GOTO(out_set, rc = PTR_ERR(task));
- {
- struct task_struct *task;
- if (index >= 0) {
- rc = ptlrpcd_bind(index, max);
- if (rc < 0)
- GOTO(out_env, rc);
- }
-
- task = kthread_run(ptlrpcd, pc, pc->pc_name);
- if (IS_ERR(task))
- GOTO(out_env, rc = PTR_ERR(task));
+ wait_for_completion(&pc->pc_starting);
+ rc = pc->pc_error;
+ if (rc != 0)
+ GOTO(out_set, rc);
- wait_for_completion(&pc->pc_starting);
- }
RETURN(0);
-out_env:
- lu_context_fini(&pc->pc_env.le_ctx);
-
out_set:
if (pc->pc_set != NULL) {
struct ptlrpc_request_set *set = pc->pc_set;
spin_unlock(&pc->pc_lock);
ptlrpc_set_destroy(set);
}
- clear_bit(LIOD_BIND, &pc->pc_flags);
-#endif
-out:
clear_bit(LIOD_START, &pc->pc_flags);
RETURN(rc);
}
void ptlrpcd_free(struct ptlrpcd_ctl *pc)
{
struct ptlrpc_request_set *set = pc->pc_set;
+
ENTRY;
if (!test_bit(LIOD_START, &pc->pc_flags)) {
goto out;
}
-#ifdef __KERNEL__
wait_for_completion(&pc->pc_finishing);
-#else
- liblustre_deregister_wait_callback(pc->pc_wait_callback);
- liblustre_deregister_idle_callback(pc->pc_idle_callback);
-#endif
- lu_context_fini(&pc->pc_env.le_ctx);
spin_lock(&pc->pc_lock);
pc->pc_set = NULL;
clear_bit(LIOD_START, &pc->pc_flags);
clear_bit(LIOD_STOP, &pc->pc_flags);
clear_bit(LIOD_FORCE, &pc->pc_flags);
- clear_bit(LIOD_BIND, &pc->pc_flags);
out:
-#ifdef __KERNEL__
- if (pc->pc_npartners > 0) {
- LASSERT(pc->pc_partners != NULL);
-
- OBD_FREE(pc->pc_partners,
- sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners);
- pc->pc_partners = NULL;
- }
- pc->pc_npartners = 0;
-#endif
- EXIT;
+ if (pc->pc_npartners > 0) {
+ LASSERT(pc->pc_partners != NULL);
+
+ OBD_FREE_PTR_ARRAY(pc->pc_partners, pc->pc_npartners);
+ pc->pc_partners = NULL;
+ }
+ pc->pc_npartners = 0;
+ pc->pc_error = 0;
+ EXIT;
}
static void ptlrpcd_fini(void)
{
- int i;
+ int i;
+ int j;
+ int ncpts;
+
ENTRY;
if (ptlrpcds != NULL) {
- for (i = 0; i < ptlrpcds->pd_nthreads; i++)
- ptlrpcd_stop(&ptlrpcds->pd_threads[i], 0);
- for (i = 0; i < ptlrpcds->pd_nthreads; i++)
- ptlrpcd_free(&ptlrpcds->pd_threads[i]);
- ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
- ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
- OBD_FREE(ptlrpcds, ptlrpcds->pd_size);
- ptlrpcds = NULL;
+ for (i = 0; i < ptlrpcds_num; i++) {
+ if (ptlrpcds[i] == NULL)
+ break;
+ for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
+ ptlrpcd_stop(&ptlrpcds[i]->pd_threads[j], 0);
+ for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
+ ptlrpcd_free(&ptlrpcds[i]->pd_threads[j]);
+ OBD_FREE(ptlrpcds[i], ptlrpcds[i]->pd_size);
+ ptlrpcds[i] = NULL;
+ }
+ OBD_FREE_PTR_ARRAY(ptlrpcds, ptlrpcds_num);
+ }
+ ptlrpcds_num = 0;
+
+ ptlrpcd_stop(&ptlrpcd_rcv, 0);
+ ptlrpcd_free(&ptlrpcd_rcv);
+
+ if (ptlrpcds_cpt_idx != NULL) {
+ ncpts = cfs_cpt_number(cfs_cpt_tab);
+ OBD_FREE_PTR_ARRAY(ptlrpcds_cpt_idx, ncpts);
+ ptlrpcds_cpt_idx = NULL;
}
EXIT;
static int ptlrpcd_init(void)
{
- int nthreads = num_online_cpus();
- char name[16];
- int size, i = -1, j, rc = 0;
+ int nthreads;
+ int groupsize;
+ int size;
+ int i;
+ int j;
+ int rc = 0;
+ struct cfs_cpt_table *cptable;
+ __u32 *cpts = NULL;
+ int ncpts;
+ int cpt;
+ struct ptlrpcd *pd;
+
ENTRY;
-#ifdef __KERNEL__
- if (max_ptlrpcds > 0 && max_ptlrpcds < nthreads)
- nthreads = max_ptlrpcds;
- if (nthreads < 2)
- nthreads = 2;
- if (nthreads < 3 && ptlrpcd_bind_policy == PDB_POLICY_NEIGHBOR)
- ptlrpcd_bind_policy = PDB_POLICY_PAIR;
- else if (nthreads % 2 != 0 && ptlrpcd_bind_policy == PDB_POLICY_PAIR)
- nthreads &= ~1; /* make sure it is even */
+ /*
+ * Determine the CPTs that ptlrpcd threads will run on.
+ */
+ cptable = cfs_cpt_tab;
+ ncpts = cfs_cpt_number(cptable);
+ if (ptlrpcd_cpts != NULL) {
+ struct cfs_expr_list *el;
+
+ size = ncpts * sizeof(ptlrpcds_cpt_idx[0]);
+ OBD_ALLOC(ptlrpcds_cpt_idx, size);
+ if (ptlrpcds_cpt_idx == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ rc = cfs_expr_list_parse(ptlrpcd_cpts,
+ strlen(ptlrpcd_cpts),
+ 0, ncpts - 1, &el);
+ if (rc != 0) {
+ CERROR("%s: invalid CPT pattern string: %s",
+ "ptlrpcd_cpts", ptlrpcd_cpts);
+ GOTO(out, rc = -EINVAL);
+ }
+
+ rc = cfs_expr_list_values(el, ncpts, &cpts);
+ cfs_expr_list_free(el);
+ if (rc <= 0) {
+ CERROR("%s: failed to parse CPT array %s: %d\n",
+ "ptlrpcd_cpts", ptlrpcd_cpts, rc);
+ if (rc == 0)
+ rc = -EINVAL;
+ GOTO(out, rc);
+ }
+
+ /*
+ * Create the cpt-to-index map. When there is no match
+ * in the cpt table, pick a cpt at random. This could
+ * be changed to take the topology of the system into
+ * account.
+ */
+ for (cpt = 0; cpt < ncpts; cpt++) {
+ for (i = 0; i < rc; i++)
+ if (cpts[i] == cpt)
+ break;
+ if (i >= rc)
+ i = cpt % rc;
+ ptlrpcds_cpt_idx[cpt] = i;
+ }
+
+ cfs_expr_list_values_free(cpts, rc);
+ ncpts = rc;
+ }
+ ptlrpcds_num = ncpts;
+
+ size = ncpts * sizeof(ptlrpcds[0]);
+ OBD_ALLOC(ptlrpcds, size);
+ if (ptlrpcds == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ /*
+ * The max_ptlrpcds parameter is obsolete, but do something
+ * sane if it has been tuned, and complain if
+ * ptlrpcd_per_cpt_max has also been tuned.
+ */
+ if (max_ptlrpcds != 0) {
+ CWARN("max_ptlrpcds is obsolete.\n");
+ if (ptlrpcd_per_cpt_max == 0) {
+ ptlrpcd_per_cpt_max = max_ptlrpcds / ncpts;
+ /* Round up if there is a remainder. */
+ if (max_ptlrpcds % ncpts != 0)
+ ptlrpcd_per_cpt_max++;
+ CWARN("Setting ptlrpcd_per_cpt_max = %d\n",
+ ptlrpcd_per_cpt_max);
+ } else {
+ CWARN("ptlrpd_per_cpt_max is also set!\n");
+ }
+ }
+
+ /*
+ * The ptlrpcd_bind_policy parameter is obsolete, but do
+ * something sane if it has been tuned, and complain if
+ * ptlrpcd_partner_group_size is also tuned.
+ */
+ if (ptlrpcd_bind_policy != 0) {
+ CWARN("ptlrpcd_bind_policy is obsolete.\n");
+ if (ptlrpcd_partner_group_size == 0) {
+ switch (ptlrpcd_bind_policy) {
+ case 1: /* PDB_POLICY_NONE */
+ case 2: /* PDB_POLICY_FULL */
+ ptlrpcd_partner_group_size = 1;
+ break;
+ case 3: /* PDB_POLICY_PAIR */
+ ptlrpcd_partner_group_size = 2;
+ break;
+ case 4: /* PDB_POLICY_NEIGHBOR */
+#ifdef CONFIG_NUMA
+ ptlrpcd_partner_group_size = -1; /* CPT */
#else
- nthreads = 1;
+ ptlrpcd_partner_group_size = 3; /* Triplets */
#endif
+ break;
+ default: /* Illegal value, use the default. */
+ ptlrpcd_partner_group_size = 2;
+ break;
+ }
+ CWARN("Setting ptlrpcd_partner_group_size = %d\n",
+ ptlrpcd_partner_group_size);
+ } else {
+ CWARN("ptlrpcd_partner_group_size is also set!\n");
+ }
+ }
+
+ if (ptlrpcd_partner_group_size == 0)
+ ptlrpcd_partner_group_size = 2;
+ else if (ptlrpcd_partner_group_size < 0)
+ ptlrpcd_partner_group_size = -1;
+ else if (ptlrpcd_per_cpt_max > 0 &&
+ ptlrpcd_partner_group_size > ptlrpcd_per_cpt_max)
+ ptlrpcd_partner_group_size = ptlrpcd_per_cpt_max;
+
+ /*
+ * Start the recovery thread first.
+ */
+ set_bit(LIOD_RECOVERY, &ptlrpcd_rcv.pc_flags);
+ ptlrpcd_ctl_init(&ptlrpcd_rcv, -1, CFS_CPT_ANY);
+ rc = ptlrpcd_start(&ptlrpcd_rcv);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ for (i = 0; i < ncpts; i++) {
+ if (cpts == NULL)
+ cpt = i;
+ else
+ cpt = cpts[i];
+
+ nthreads = cfs_cpt_weight(cptable, cpt);
+ if (ptlrpcd_per_cpt_max > 0 && ptlrpcd_per_cpt_max < nthreads)
+ nthreads = ptlrpcd_per_cpt_max;
+ if (nthreads < 2)
+ nthreads = 2;
+
+ if (ptlrpcd_partner_group_size <= 0) {
+ groupsize = nthreads;
+ } else if (nthreads <= ptlrpcd_partner_group_size) {
+ groupsize = nthreads;
+ } else {
+ groupsize = ptlrpcd_partner_group_size;
+ if (nthreads % groupsize != 0)
+ nthreads += groupsize - (nthreads % groupsize);
+ }
- size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
- OBD_ALLOC(ptlrpcds, size);
- if (ptlrpcds == NULL)
- GOTO(out, rc = -ENOMEM);
-
- snprintf(name, 15, "ptlrpcd_rcv");
- set_bit(LIOD_RECOVERY, &ptlrpcds->pd_thread_rcv.pc_flags);
- rc = ptlrpcd_start(-1, nthreads, name, &ptlrpcds->pd_thread_rcv);
- if (rc < 0)
- GOTO(out, rc);
-
- /* XXX: We start nthreads ptlrpc daemons. Each of them can process any
- * non-recovery async RPC to improve overall async RPC efficiency.
- *
- * But there are some issues with async I/O RPCs and async non-I/O
- * RPCs processed in the same set under some cases. The ptlrpcd may
- * be blocked by some async I/O RPC(s), then will cause other async
- * non-I/O RPC(s) can not be processed in time.
- *
- * Maybe we should distinguish blocked async RPCs from non-blocked
- * async RPCs, and process them in different ptlrpcd sets to avoid
- * unnecessary dependency. But how to distribute async RPCs load
- * among all the ptlrpc daemons becomes another trouble. */
- for (i = 0; i < nthreads; i++) {
- snprintf(name, 15, "ptlrpcd_%d", i);
- rc = ptlrpcd_start(i, nthreads, name, &ptlrpcds->pd_threads[i]);
- if (rc < 0)
- GOTO(out, rc);
- }
-
- ptlrpcds->pd_size = size;
- ptlrpcds->pd_index = 0;
- ptlrpcds->pd_nthreads = nthreads;
+ size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
+ OBD_CPT_ALLOC(pd, cptable, cpt, size);
+
+ if (!pd)
+ GOTO(out, rc = -ENOMEM);
+ pd->pd_size = size;
+ pd->pd_index = i;
+ pd->pd_cpt = cpt;
+ pd->pd_cursor = 0;
+ pd->pd_nthreads = nthreads;
+ pd->pd_groupsize = groupsize;
+ ptlrpcds[i] = pd;
+
+ /*
+ * The ptlrpcd threads in a partner group can access
+ * each other's struct ptlrpcd_ctl, so these must be
+ * initialized before any thead is started.
+ */
+ for (j = 0; j < nthreads; j++) {
+ ptlrpcd_ctl_init(&pd->pd_threads[j], j, cpt);
+ rc = ptlrpcd_partners(pd, j);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
+ /* XXX: We start nthreads ptlrpc daemons on this cpt.
+ * Each of them can process any non-recovery
+ * async RPC to improve overall async RPC
+ * efficiency.
+ *
+ * But there are some issues with async I/O RPCs
+ * and async non-I/O RPCs processed in the same
+ * set under some cases. The ptlrpcd may be
+ * blocked by some async I/O RPC(s), then will
+ * cause other async non-I/O RPC(s) can not be
+ * processed in time.
+ *
+ * Maybe we should distinguish blocked async RPCs
+ * from non-blocked async RPCs, and process them
+ * in different ptlrpcd sets to avoid unnecessary
+ * dependency. But how to distribute async RPCs
+ * load among all the ptlrpc daemons becomes
+ * another trouble.
+ */
+ for (j = 0; j < nthreads; j++) {
+ rc = ptlrpcd_start(&pd->pd_threads[j]);
+ if (rc < 0)
+ GOTO(out, rc);
+ }
+ }
out:
- if (rc != 0 && ptlrpcds != NULL) {
- for (j = 0; j <= i; j++)
- ptlrpcd_stop(&ptlrpcds->pd_threads[j], 0);
- for (j = 0; j <= i; j++)
- ptlrpcd_free(&ptlrpcds->pd_threads[j]);
- ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
- ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
- OBD_FREE(ptlrpcds, size);
- ptlrpcds = NULL;
- }
+ if (rc != 0)
+ ptlrpcd_fini();
RETURN(rc);
}
int ptlrpcd_addref(void)
{
- int rc = 0;
- ENTRY;
+ int rc = 0;
+
+ ENTRY;
mutex_lock(&ptlrpcd_mutex);
- if (++ptlrpcd_users == 1) {
+ if (++ptlrpcd_users == 1) {
rc = ptlrpcd_init();
if (rc < 0)
ptlrpcd_users--;
}
mutex_unlock(&ptlrpcd_mutex);
- RETURN(rc);
+ RETURN(rc);
}
EXPORT_SYMBOL(ptlrpcd_addref);
void ptlrpcd_decref(void)
{
mutex_lock(&ptlrpcd_mutex);
- if (--ptlrpcd_users == 0)
- ptlrpcd_fini();
+ if (--ptlrpcd_users == 0)
+ ptlrpcd_fini();
mutex_unlock(&ptlrpcd_mutex);
}
EXPORT_SYMBOL(ptlrpcd_decref);