int cfs_expr_list_match(__u32 value, struct cfs_expr_list *expr_list);
int cfs_expr_list_values(struct cfs_expr_list *expr_list,
int max, __u32 **values);
+static inline void
+cfs_expr_list_values_free(__u32 *values, int num)
+{
+ /* This array is allocated by LIBCFS_ALLOC(), so it shouldn't be freed
+ * by OBD_FREE() if it's called by module other than libcfs & LNet,
+ * otherwise we will see fake memory leak */
+ LIBCFS_FREE(values, num * sizeof(values[0]));
+}
+
void cfs_expr_list_free(struct cfs_expr_list *expr_list);
void cfs_expr_list_print(struct cfs_expr_list *expr_list);
int cfs_expr_list_parse(char *str, int len, unsigned min, unsigned max,
if (ni->ni_tx_queues != NULL)
cfs_percpt_free(ni->ni_tx_queues);
- if (ni->ni_cpts != NULL) {
- LIBCFS_FREE(ni->ni_cpts,
- sizeof(ni->ni_cpts[0] * ni->ni_ncpts));
- }
+ if (ni->ni_cpts != NULL)
+ cfs_expr_list_values_free(ni->ni_cpts, ni->ni_ncpts);
#ifndef __KERNEL__
# ifdef HAVE_LIBPTHREAD
# endif
#endif /* __KERNEL__ */
-#define PTLRPC_NTHRS_MIN 2
+#define PTLRPC_NTHRS_INIT 2
/**
- * The following constants determine how memory is used to buffer incoming
- * service requests.
+ * Buffer Constants
+ *
+ * Constants determine how memory is used to buffer incoming service requests.
*
* ?_NBUFS # buffers to allocate when growing the pool
* ?_BUFSIZE # bytes in a single request buffer
* Messages larger than ?_MAXREQSIZE are dropped. Request buffers are
* considered full when less than ?_MAXREQSIZE is left in them.
*/
-#define LDLM_THREADS_AUTO_MIN (2)
-#define LDLM_THREADS_AUTO_MAX min_t(unsigned, cfs_num_online_cpus() * \
- cfs_num_online_cpus() * 32, 128)
-#define LDLM_BL_THREADS LDLM_THREADS_AUTO_MIN
+/**
+ * Thread Constants
+ *
+ * Constants determine how threads are created for ptlrpc service.
+ *
+ * ?_NTHRS_INIT # threads to create for each service partition on
+ * initializing. If it's non-affinity service and
+ * there is only one partition, it's the overall #
+ * threads for the service while initializing.
+ * ?_NTHRS_BASE # threads should be created at least for each
+ * ptlrpc partition to keep the service healthy.
+ * It's the low-water mark of threads upper-limit
+ * for each partition.
+ * ?_THR_FACTOR # threads can be added on threads upper-limit for
+ * each CPU core. This factor is only for reference,
+ * we might decrease value of factor if number of cores
+ * per CPT is above a limit.
+ * ?_NTHRS_MAX # overall threads can be created for a service,
+ * it's a soft limit because if service is running
+ * on machine with hundreds of cores and tens of
+ * CPU partitions, we need to guarantee each partition
+ * has ?_NTHRS_BASE threads, which means total threads
+ * will be ?_NTHRS_BASE * number_of_cpts which can
+ * exceed ?_NTHRS_MAX.
+ *
+ * Examples
+ *
+ * #define MDT_NTHRS_INIT 2
+ * #define MDT_NTHRS_BASE 64
+ * #define MDT_NTHRS_FACTOR 8
+ * #define MDT_NTHRS_MAX 1024
+ *
+ * Example 1):
+ * ---------------------------------------------------------------------
+ * Server(A) has 16 cores, user configured it to 4 partitions so each
+ * partition has 4 cores, then actual number of service threads on each
+ * partition is:
+ * MDT_NTHRS_BASE(64) + cores(4) * MDT_NTHRS_FACTOR(8) = 96
+ *
+ * Total number of threads for the service is:
+ * 96 * partitions(4) = 384
+ *
+ * Example 2):
+ * ---------------------------------------------------------------------
+ * Server(B) has 32 cores, user configured it to 4 partitions so each
+ * partition has 8 cores, then actual number of service threads on each
+ * partition is:
+ * MDT_NTHRS_BASE(64) + cores(8) * MDT_NTHRS_FACTOR(8) = 128
+ *
+ * Total number of threads for the service is:
+ * 128 * partitions(4) = 512
+ *
+ * Example 3):
+ * ---------------------------------------------------------------------
+ * Server(B) has 96 cores, user configured it to 8 partitions so each
+ * partition has 12 cores, then actual number of service threads on each
+ * partition is:
+ * MDT_NTHRS_BASE(64) + cores(12) * MDT_NTHRS_FACTOR(8) = 160
+ *
+ * Total number of threads for the service is:
+ * 160 * partitions(8) = 1280
+ *
+ * However, it's above the soft limit MDT_NTHRS_MAX, so we choose this number
+ * as upper limit of threads number for each partition:
+ * MDT_NTHRS_MAX(1024) / partitions(8) = 128
+ *
+ * Example 4):
+ * ---------------------------------------------------------------------
+ * Server(C) have a thousand of cores and user configured it to 32 partitions
+ * MDT_NTHRS_BASE(64) * 32 = 2048
+ *
+ * which is already above soft limit MDT_NTHRS_MAX(1024), but we still need
+ * to guarantee that each partition has at least MDT_NTHRS_BASE(64) threads
+ * to keep service healthy, so total number of threads will just be 2048.
+ *
+ * NB: we don't suggest to choose server with that many cores because backend
+ * filesystem itself, buffer cache, or underlying network stack might
+ * have some SMP scalability issues at that large scale.
+ *
+ * If user already has a fat machine with hundreds or thousands of cores,
+ * there are two choices for configuration:
+ * a) create CPU table from subset of all CPUs and run Lustre on
+ * top of this subset
+ * b) bind service threads on a few partitions, see modparameters of
+ * MDS and OSS for details
+*
+ * NB: these calculations (and examples below) are simplified to help
+ * understanding, the real implementation is a little more complex,
+ * please see ptlrpc_server_nthreads_check() for details.
+ *
+ */
+
+ /*
+ * LDLM threads constants:
+ *
+ * Given 8 as factor and 24 as base threads number
+ *
+ * example 1)
+ * On 4-core machine we will have 24 + 8 * 4 = 56 threads.
+ *
+ * example 2)
+ * On 8-core machine with 2 partitions we will have 24 + 4 * 8 = 56
+ * threads for each partition and total threads number will be 112.
+ *
+ * example 3)
+ * On 64-core machine with 8 partitions we will need LDLM_NTHRS_BASE(24)
+ * threads for each partition to keep service healthy, so total threads
+ * number should be 24 * 8 = 192.
+ *
+ * So with these constants, threads number wil be at the similar level
+ * of old versions, unless target machine has over a hundred cores
+ */
+#define LDLM_THR_FACTOR 8
+#define LDLM_NTHRS_INIT PTLRPC_NTHRS_INIT
+#define LDLM_NTHRS_BASE 24
+#define LDLM_NTHRS_MAX (cfs_num_online_cpus() == 1 ? 64 : 128)
+
+#define LDLM_BL_THREADS LDLM_NTHRS_AUTO_INIT
#define LDLM_NBUFS (64 * cfs_num_online_cpus())
#define LDLM_BUFSIZE (8 * 1024)
#define LDLM_MAXREQSIZE (5 * 1024)
#define LDLM_MAXREPSIZE (1024)
-/** Absolute limits */
+ /*
+ * MDS threads constants:
+ *
+ * Please see examples in "Thread Constants", MDS threads number will be at
+ * the comparable level of old versions, unless the server has many cores.
+ */
#ifndef MDT_MAX_THREADS
-#define MDT_MIN_THREADS PTLRPC_NTHRS_MIN
-#define MDT_MAX_THREADS 512UL
+#define MDT_MAX_THREADS 1024
+#define MDT_MAX_OTHR_THREADS 256
+
+#else /* MDT_MAX_THREADS */
+#if MDT_MAX_THREADS < PTLRPC_NTHRS_INIT
+#undef MDT_MAX_THREADS
+#define MDT_MAX_THREADS PTLRPC_NTHRS_INIT
+#endif
+#define MDT_MAX_OTHR_THREADS max(PTLRPC_NTHRS_INIT, MDT_MAX_THREADS / 2)
#endif
-#define MDS_NBUFS (64 * cfs_num_online_cpus())
+
+/* default service */
+#define MDT_THR_FACTOR 8
+#define MDT_NTHRS_INIT PTLRPC_NTHRS_INIT
+#define MDT_NTHRS_MAX MDT_MAX_THREADS
+#define MDT_NTHRS_BASE min(64, MDT_NTHRS_MAX)
+
+/* read-page service */
+#define MDT_RDPG_THR_FACTOR 4
+#define MDT_RDPG_NTHRS_INIT PTLRPC_NTHRS_INIT
+#define MDT_RDPG_NTHRS_MAX MDT_MAX_OTHR_THREADS
+#define MDT_RDPG_NTHRS_BASE min(48, MDT_RDPG_NTHRS_MAX)
+
+/* these should be removed when we remove setattr service in the future */
+#define MDT_SETA_THR_FACTOR 4
+#define MDT_SETA_NTHRS_INIT PTLRPC_NTHRS_INIT
+#define MDT_SETA_NTHRS_MAX MDT_MAX_OTHR_THREADS
+#define MDT_SETA_NTHRS_BASE min(48, MDT_SETA_NTHRS_MAX)
+
+/* non-affinity threads */
+#define MDT_OTHR_NTHRS_INIT PTLRPC_NTHRS_INIT
+#define MDT_OTHR_NTHRS_MAX MDT_MAX_OTHR_THREADS
+
+#define MDS_NBUFS (64 * cfs_num_online_cpus())
/**
* Assume file name length = FNAME_MAX = 256 (true for ext3).
* path name length = PATH_MAX = 4096
#define SEQ_MAXREPSIZE (152)
/** MGS threads must be >= 3, see bug 22458 comment #28 */
-#define MGS_THREADS_AUTO_MIN 3
-#define MGS_THREADS_AUTO_MAX 32
+#define MGS_NTHRS_INIT (PTLRPC_NTHRS_INIT + 1)
+#define MGS_NTHRS_MAX 32
+
#define MGS_NBUFS (64 * cfs_num_online_cpus())
#define MGS_BUFSIZE (8 * 1024)
#define MGS_MAXREQSIZE (7 * 1024)
#define MGS_MAXREPSIZE (9 * 1024)
-/** Absolute OSS limits */
-#define OSS_THREADS_MIN 3 /* difficult replies, HPQ, others */
-#define OSS_THREADS_MAX 512
+ /*
+ * OSS threads constants:
+ *
+ * Given 8 as factor and 64 as base threads number
+ *
+ * example 1):
+ * On 8-core server configured to 2 partitions, we will have
+ * 64 + 8 * 4 = 96 threads for each partition, 192 total threads.
+ *
+ * example 2):
+ * On 32-core machine configured to 4 partitions, we will have
+ * 64 + 8 * 8 = 112 threads for each partition, so total threads number
+ * will be 112 * 4 = 448.
+ *
+ * example 3):
+ * On 64-core machine configured to 4 partitions, we will have
+ * 64 + 16 * 8 = 192 threads for each partition, so total threads number
+ * will be 192 * 4 = 768 which is above limit OSS_NTHRS_MAX(512), so we
+ * cut off the value to OSS_NTHRS_MAX(512) / 4 which is 128 threads
+ * for each partition.
+ *
+ * So we can see that with these constants, threads number wil be at the
+ * similar level of old versions, unless the server has many cores.
+ */
+ /* depress threads factor for VM with small memory size */
+#define OSS_THR_FACTOR min_t(int, 8, \
+ CFS_NUM_CACHEPAGES >> (28 - CFS_PAGE_SHIFT))
+#define OSS_NTHRS_INIT (PTLRPC_NTHRS_INIT + 1)
+#define OSS_NTHRS_BASE 64
+#define OSS_NTHRS_MAX 512
+
+/* threads for handling "create" request */
+#define OSS_CR_THR_FACTOR 1
+#define OSS_CR_NTHRS_INIT PTLRPC_NTHRS_INIT
+#define OSS_CR_NTHRS_BASE 8
+#define OSS_CR_NTHRS_MAX 64
+
#define OST_NBUFS (64 * cfs_num_online_cpus())
#define OST_BUFSIZE (8 * 1024)
char *srv_thread_name;
/** service thread list */
cfs_list_t srv_threads;
- /** threads to start at beginning of service */
- int srv_threads_min;
- /** thread upper limit */
- int srv_threads_max;
+ /** threads # should be created for each partition on initializing */
+ int srv_nthrs_cpt_init;
+ /** limit of threads number for each partition */
+ int srv_nthrs_cpt_limit;
/** Root of /proc dir tree for this service */
cfs_proc_dir_entry_t *srv_procroot;
/** Pointer to statistic data for this service */
__u32 srv_ctx_tags;
/** soft watchdog timeout multiplier */
int srv_watchdog_factor;
- /** bind threads to CPUs */
- unsigned srv_cpu_affinity:1;
/** under unregister_service */
unsigned srv_is_stopping:1;
+ /** max # request buffers in history per partition */
+ int srv_hist_nrqbds_cpt_max;
+ /** number of CPTs this service bound on */
+ int srv_ncpts;
+ /** CPTs array this service bound on */
+ __u32 *srv_cpts;
+ /** 2^srv_cptab_bits >= cfs_cpt_numbert(srv_cptable) */
+ int srv_cpt_bits;
+ /** CPT table this service is running over */
+ struct cfs_cpt_table *srv_cptable;
/**
- * max # request buffers in history, it needs to be convert into
- * per-partition value when we have multiple partitions
- */
- int srv_max_history_rqbds;
- /**
- * partition data for ptlrpc service, only one instance so far,
- * instance per CPT will come soon
+ * partition data for ptlrpc service
*/
- struct ptlrpc_service_part *srv_part;
+ struct ptlrpc_service_part *srv_parts[0];
};
/**
cfs_atomic_t scp_nreps_difficult;
};
+#define ptlrpc_service_for_each_part(part, i, svc) \
+ for (i = 0; \
+ i < (svc)->srv_ncpts && \
+ (svc)->srv_parts != NULL && \
+ ((part) = (svc)->srv_parts[i]) != NULL; i++)
+
/**
* Declaration of ptlrpcd control structure
*/
struct ptlrpc_service_thr_conf {
/* threadname should be 8 characters or less - 6 will be added on */
char *tc_thr_name;
- /* min number of service threads to start */
- unsigned int tc_nthrs_min;
- /* max number of service threads to start */
+ /* threads increasing factor for each CPU */
+ unsigned int tc_thr_factor;
+ /* service threads # to start on each partition while initializing */
+ unsigned int tc_nthrs_init;
+ /*
+ * low water of threads # upper-limit on each partition while running,
+ * service availability may be impacted if threads number is lower
+ * than this value. It can be ZERO if the service doesn't require
+ * CPU affinity or there is only one partition.
+ */
+ unsigned int tc_nthrs_base;
+ /* "soft" limit for total threads number */
unsigned int tc_nthrs_max;
/* user specified threads number, it will be validated due to
* other members of this structure. */
__u32 tc_ctx_tags;
};
+struct ptlrpc_service_cpt_conf {
+ struct cfs_cpt_table *cc_cptable;
+ /* string pattern to describe CPTs for a service */
+ char *cc_pattern;
+};
+
struct ptlrpc_service_conf {
/* service name */
char *psc_name;
struct ptlrpc_service_buf_conf psc_buf;
/* thread information */
struct ptlrpc_service_thr_conf psc_thr;
+ /* CPU partition information */
+ struct ptlrpc_service_cpt_conf psc_cpt;
/* function table */
struct ptlrpc_service_ops psc_ops;
};
cfs_atomic_t fo_r_in_flight;
cfs_atomic_t fo_w_in_flight;
- /*
- * per-filter pool of kiobuf's allocated by filter_common_setup() and
- * torn down by filter_cleanup(). Contains OST_NUM_THREADS elements of
- * which ->fo_iobuf_count were allocated.
- *
- * This pool contains kiobuf used by
- * filter_{prep,commit}rw_{read,write}() and is shared by all OST
- * threads.
- *
- * Locking: none, each OST thread uses only one element, determined by
- * its "ordinal number", ->t_id.
- */
- struct filter_iobuf **fo_iobuf_pool;
- int fo_iobuf_count;
+ /*
+ * per-filter pool of kiobuf's allocated by filter_common_setup() and
+ * torn down by filter_cleanup().
+ *
+ * This pool contains kiobuf used by
+ * filter_{prep,commit}rw_{read,write}() and is shared by all OST
+ * threads.
+ *
+ * Locking: protected by internal lock of cfs_hash, pool can be
+ * found from this hash table by t_id of ptlrpc_thread.
+ */
+ struct cfs_hash *fo_iobuf_hash;
cfs_list_t fo_llog_list;
cfs_spinlock_t fo_llog_list_lock;
CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444,
"number of DLM service threads to start");
+static char *ldlm_cpts;
+CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444,
+ "CPU partitions ldlm threads should run on");
+
extern cfs_mem_cache_t *ldlm_resource_slab;
extern cfs_mem_cache_t *ldlm_lock_slab;
static cfs_mutex_t ldlm_ref_mutex;
},
.psc_thr = {
.tc_thr_name = "ldlm_cb",
- .tc_nthrs_min = LDLM_THREADS_AUTO_MIN,
- .tc_nthrs_max = LDLM_THREADS_AUTO_MAX,
+ .tc_thr_factor = LDLM_THR_FACTOR,
+ .tc_nthrs_init = LDLM_NTHRS_INIT,
+ .tc_nthrs_base = LDLM_NTHRS_BASE,
+ .tc_nthrs_max = LDLM_NTHRS_MAX,
.tc_nthrs_user = ldlm_num_threads,
- .tc_ctx_tags = LCT_MD_THREAD | \
- LCT_DT_THREAD,
+ .tc_cpu_affinity = 1,
+ .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD,
+ },
+ .psc_cpt = {
+ .cc_pattern = ldlm_cpts,
},
.psc_ops = {
.so_req_handler = ldlm_callback_handler,
},
.psc_thr = {
.tc_thr_name = "ldlm_cn",
- .tc_nthrs_min = LDLM_THREADS_AUTO_MIN,
- .tc_nthrs_max = LDLM_THREADS_AUTO_MAX,
+ .tc_thr_factor = LDLM_THR_FACTOR,
+ .tc_nthrs_init = LDLM_NTHRS_INIT,
+ .tc_nthrs_base = LDLM_NTHRS_BASE,
+ .tc_nthrs_max = LDLM_NTHRS_MAX,
.tc_nthrs_user = ldlm_num_threads,
+ .tc_cpu_affinity = 1,
.tc_ctx_tags = LCT_MD_THREAD | \
LCT_DT_THREAD | \
LCT_CL_THREAD,
},
+ .psc_cpt = {
+ .cc_pattern = ldlm_cpts,
+ },
.psc_ops = {
.so_req_handler = ldlm_cancel_handler,
.so_hpreq_handler = ldlm_hpreq_handler,
#ifdef __KERNEL__
if (ldlm_num_threads == 0) {
- blp->blp_min_threads = LDLM_THREADS_AUTO_MIN;
- blp->blp_max_threads = LDLM_THREADS_AUTO_MAX;
+ blp->blp_min_threads = LDLM_NTHRS_INIT;
+ blp->blp_max_threads = LDLM_NTHRS_MAX;
} else {
blp->blp_min_threads = blp->blp_max_threads = \
- min_t(int, LDLM_THREADS_AUTO_MAX,
- max_t(int, LDLM_THREADS_AUTO_MIN,
- ldlm_num_threads));
+ min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
+ ldlm_num_threads));
}
- for (i = 0; i < blp->blp_min_threads; i++) {
- rc = ldlm_bl_thread_start(blp);
- if (rc < 0)
+ for (i = 0; i < blp->blp_min_threads; i++) {
+ rc = ldlm_bl_thread_start(blp);
+ if (rc < 0)
GOTO(out, rc);
- }
+ }
# ifdef HAVE_SERVER_SUPPORT
CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks);
cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
- if (rc < 0) {
- CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
+ if (rc < 0) {
+ CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
GOTO(out, rc);
- }
+ }
cfs_wait_event(expired_lock_thread.elt_waitq,
expired_lock_thread.elt_state == ELT_READY);
# endif /* HAVE_SERVER_SUPPORT */
- rc = ldlm_pools_init();
- if (rc)
+ rc = ldlm_pools_init();
+ if (rc) {
+ CERROR("Failed to initialize LDLM pools: %d\n", rc);
GOTO(out, rc);
+ }
#endif
- RETURN(0);
+ RETURN(0);
out:
ldlm_cleanup();
- return rc;
+ RETURN(rc);
}
static int ldlm_cleanup(void)
* Initialized in mdt_mod_init().
*/
static unsigned long mdt_num_threads;
+CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
+ "number of mdt service threads to start");
+
+static char *mdt_cpts;
+CFS_MODULE_PARM(mdt_cpts, "c", charp, 0444,
+ "CPU partitions MDT threads should run on");
+
+static unsigned long mdt_rdpg_num_threads;
+CFS_MODULE_PARM(mdt_rdpg_num_threads, "ul", ulong, 0444,
+ "number of mdt readpage service threads to start");
+
+static char *mdt_rdpg_cpts;
+CFS_MODULE_PARM(mdt_rdpg_cpts, "c", charp, 0444,
+ "CPU partitions MDT readpage threads should run on");
+
+/* NB: these two should be removed along with setattr service in the future */
+static unsigned long mdt_attr_num_threads;
+CFS_MODULE_PARM(mdt_attr_num_threads, "ul", ulong, 0444,
+ "number of mdt setattr service threads to start");
+
+static char *mdt_attr_cpts;
+CFS_MODULE_PARM(mdt_attr_cpts, "c", charp, 0444,
+ "CPU partitions MDT setattr threads should run on");
/* ptlrpc request handler for MDT. All handlers are
* grouped into several slices - struct mdt_opc_slice,
*/
.psc_thr = {
.tc_thr_name = LUSTRE_MDT_NAME,
- .tc_nthrs_min = MDT_MIN_THREADS,
- .tc_nthrs_max = MDT_MAX_THREADS,
+ .tc_thr_factor = MDT_THR_FACTOR,
+ .tc_nthrs_init = MDT_NTHRS_INIT,
+ .tc_nthrs_base = MDT_NTHRS_BASE,
+ .tc_nthrs_max = MDT_NTHRS_MAX,
.tc_nthrs_user = mdt_num_threads,
+ .tc_cpu_affinity = 1,
.tc_ctx_tags = LCT_MD_THREAD,
},
+ .psc_cpt = {
+ .cc_pattern = mdt_cpts,
+ },
.psc_ops = {
.so_req_handler = mdt_regular_handle,
.so_req_printer = target_print_req,
},
.psc_thr = {
.tc_thr_name = "mdt_rdpg",
- .tc_nthrs_min = MDT_MIN_THREADS,
- .tc_nthrs_max = MDT_MAX_THREADS,
- .tc_nthrs_user = mdt_num_threads,
+ .tc_thr_factor = MDT_RDPG_THR_FACTOR,
+ .tc_nthrs_init = MDT_RDPG_NTHRS_INIT,
+ .tc_nthrs_base = MDT_RDPG_NTHRS_BASE,
+ .tc_nthrs_max = MDT_RDPG_NTHRS_MAX,
+ .tc_nthrs_user = mdt_rdpg_num_threads,
+ .tc_cpu_affinity = 1,
.tc_ctx_tags = LCT_MD_THREAD,
},
+ .psc_cpt = {
+ .cc_pattern = mdt_rdpg_cpts,
+ },
.psc_ops = {
.so_req_handler = mdt_readpage_handle,
.so_req_printer = target_print_req,
},
.psc_thr = {
.tc_thr_name = "mdt_attr",
- .tc_nthrs_min = MDT_MIN_THREADS,
- .tc_nthrs_max = MDT_MAX_THREADS,
- .tc_nthrs_user = mdt_num_threads,
+ .tc_thr_factor = MDT_SETA_THR_FACTOR,
+ .tc_nthrs_init = MDT_SETA_NTHRS_INIT,
+ .tc_nthrs_base = MDT_SETA_NTHRS_BASE,
+ .tc_nthrs_max = MDT_SETA_NTHRS_MAX,
+ .tc_nthrs_user = mdt_attr_num_threads,
+ .tc_cpu_affinity = 1,
.tc_ctx_tags = LCT_MD_THREAD,
},
+ .psc_cpt = {
+ .cc_pattern = mdt_attr_cpts,
+ },
.psc_ops = {
.so_req_handler = mdt_regular_handle,
.so_req_printer = target_print_req,
},
.psc_thr = {
.tc_thr_name = "mdt_mdsc",
- .tc_nthrs_min = MDT_MIN_THREADS,
- .tc_nthrs_max = MDT_MAX_THREADS,
- .tc_nthrs_user = mdt_num_threads,
+ .tc_nthrs_init = MDT_OTHR_NTHRS_INIT,
+ .tc_nthrs_max = MDT_OTHR_NTHRS_MAX,
.tc_ctx_tags = LCT_MD_THREAD,
},
.psc_ops = {
},
.psc_thr = {
.tc_thr_name = "mdt_mdss",
- .tc_nthrs_min = MDT_MIN_THREADS,
- .tc_nthrs_max = MDT_MAX_THREADS,
- .tc_nthrs_user = mdt_num_threads,
+ .tc_nthrs_init = MDT_OTHR_NTHRS_INIT,
+ .tc_nthrs_max = MDT_OTHR_NTHRS_MAX,
.tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
},
.psc_ops = {
},
.psc_thr = {
.tc_thr_name = "mdt_dtss",
- .tc_nthrs_min = MDT_MIN_THREADS,
- .tc_nthrs_max = MDT_MAX_THREADS,
- .tc_nthrs_user = mdt_num_threads,
+ .tc_nthrs_init = MDT_OTHR_NTHRS_INIT,
+ .tc_nthrs_max = MDT_OTHR_NTHRS_MAX,
.tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD
},
.psc_ops = {
},
.psc_thr = {
.tc_thr_name = "mdt_fld",
- .tc_nthrs_min = MDT_MIN_THREADS,
- .tc_nthrs_max = MDT_MAX_THREADS,
- .tc_nthrs_user = mdt_num_threads,
+ .tc_nthrs_init = MDT_OTHR_NTHRS_INIT,
+ .tc_nthrs_max = MDT_OTHR_NTHRS_MAX,
.tc_ctx_tags = LCT_DT_THREAD | LCT_MD_THREAD
},
.psc_ops = {
},
.psc_thr = {
.tc_thr_name = "mdt_mds",
- .tc_nthrs_min = MDT_MIN_THREADS,
- .tc_nthrs_max = MDT_MAX_THREADS,
- .tc_nthrs_user = mdt_num_threads,
+ .tc_nthrs_init = MDT_OTHR_NTHRS_INIT,
+ .tc_nthrs_max = MDT_OTHR_NTHRS_MAX,
.tc_ctx_tags = LCT_MD_THREAD,
},
.psc_ops = {
err_free:
lut_client_free(exp);
err:
- CERROR("%s: Error %d while initializing export\n",
+ CERROR("%s: Failed to initialize export: rc = %d\n",
exp->exp_obd->obd_name, rc);
return rc;
}
MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
MODULE_LICENSE("GPL");
-CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
- "number of mdt service threads to start");
-
cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);
},
.psc_thr = {
.tc_thr_name = "ll_mgs",
- .tc_nthrs_min = MGS_THREADS_AUTO_MIN,
- .tc_nthrs_max = MGS_THREADS_AUTO_MAX,
+ .tc_nthrs_init = MGS_NTHRS_INIT,
+ .tc_nthrs_max = MGS_NTHRS_MAX,
.tc_ctx_tags = LCT_MD_THREAD,
},
.psc_ops = {
* at the OST layer there are only (potentially) multiple obd_device of type
* unknown at the time of OST thread creation.
*
- * Instead array of iobuf's is attached to struct filter_obd (->fo_iobuf_pool
- * field). This array has size OST_MAX_THREADS, so that each OST thread uses
- * it's very own iobuf.
+ * We create a cfs_hash for struct filter_obd (->fo_iobuf_hash field) on
+ * initializing, each OST thread will create it's own iobuf on the first
+ * access and insert it into ->fo_iobuf_hash with thread ID as key,
+ * so the iobuf can be found again by thread ID.
*
* Functions below
*
- * filter_kiobuf_pool_init()
+ * filter_iobuf_pool_init()
*
- * filter_kiobuf_pool_done()
+ * filter_iobuf_pool_done()
*
* filter_iobuf_get()
*
*/
static void filter_iobuf_pool_done(struct filter_obd *filter)
{
- struct filter_iobuf **pool;
- int i;
-
- ENTRY;
+ ENTRY;
- pool = filter->fo_iobuf_pool;
- if (pool != NULL) {
- for (i = 0; i < filter->fo_iobuf_count; ++ i) {
- if (pool[i] != NULL)
- filter_free_iobuf(pool[i]);
- }
- OBD_FREE(pool, filter->fo_iobuf_count * sizeof pool[0]);
- filter->fo_iobuf_pool = NULL;
- }
- EXIT;
+ if (filter->fo_iobuf_hash != NULL) {
+ cfs_hash_putref(filter->fo_iobuf_hash);
+ filter->fo_iobuf_hash = NULL;
+ }
+ EXIT;
}
static int filter_adapt_sptlrpc_conf(struct obd_device *obd, int initial)
return 0;
}
-/*
- * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write().
- */
-static int filter_iobuf_pool_init(struct filter_obd *filter)
+static unsigned
+filter_iobuf_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask)
{
- void **pool;
+ __u64 val = *((__u64 *)key);
- ENTRY;
+ return cfs_hash_long(val, hs->hs_cur_bits);
+}
+static void *
+filter_iobuf_hop_key(cfs_hlist_node_t *hnode)
+{
+ struct filter_iobuf *pool;
- OBD_ALLOC_GFP(filter->fo_iobuf_pool, OSS_THREADS_MAX * sizeof(*pool),
- CFS_ALLOC_KERNEL);
- if (filter->fo_iobuf_pool == NULL)
- RETURN(-ENOMEM);
+ pool = cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist);
+ return &pool->dr_hkey;
+}
- filter->fo_iobuf_count = OSS_THREADS_MAX;
+static int
+filter_iobuf_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
+{
+ struct filter_iobuf *pool;
- RETURN(0);
+ pool = cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist);
+ return pool->dr_hkey == *((__u64 *)key);
}
-/* Return iobuf allocated for @thread_id. We don't know in advance how
- * many threads there will be so we allocate a large empty array and only
- * fill in those slots that are actually in use.
- * If we haven't allocated a pool entry for this thread before, do so now. */
-void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti)
+static void *
+filter_iobuf_hop_object(cfs_hlist_node_t *hnode)
{
- int thread_id = (oti && oti->oti_thread) ?
- oti->oti_thread->t_id : -1;
- struct filter_iobuf *pool = NULL;
- struct filter_iobuf **pool_place = NULL;
+ return cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist);
+}
- if (thread_id >= 0) {
- LASSERT(thread_id < filter->fo_iobuf_count);
- pool = *(pool_place = &filter->fo_iobuf_pool[thread_id]);
- }
+static void
+filter_iobuf_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+ /* dummy, required by cfs_hash */
+}
- if (unlikely(pool == NULL)) {
- pool = filter_alloc_iobuf(filter, OBD_BRW_WRITE,
- PTLRPC_MAX_BRW_PAGES);
- if (pool_place != NULL)
- *pool_place = pool;
- }
+static void
+filter_iobuf_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+ /* dummy, required by cfs_hash */
+}
+
+static void
+filter_iobuf_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+ struct filter_iobuf *pool;
+
+ pool = cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist);
+ filter_free_iobuf(pool);
+}
+
+static struct cfs_hash_ops filter_iobuf_hops = {
+ .hs_hash = filter_iobuf_hop_hash,
+ .hs_key = filter_iobuf_hop_key,
+ .hs_keycmp = filter_iobuf_hop_keycmp,
+ .hs_object = filter_iobuf_hop_object,
+ .hs_get = filter_iobuf_hop_get,
+ .hs_put_locked = filter_iobuf_hop_put_locked,
+ .hs_exit = filter_iobuf_hop_exit
+};
- return pool;
+#define FILTER_IOBUF_HASH_BITS 9
+#define FILTER_IOBUF_HBKT_BITS 4
+
+/*
+ * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write().
+ */
+static int filter_iobuf_pool_init(struct filter_obd *filter)
+{
+ filter->fo_iobuf_hash = cfs_hash_create("filter_iobuf",
+ FILTER_IOBUF_HASH_BITS,
+ FILTER_IOBUF_HASH_BITS,
+ FILTER_IOBUF_HBKT_BITS, 0,
+ CFS_HASH_MIN_THETA,
+ CFS_HASH_MAX_THETA,
+ &filter_iobuf_hops,
+ CFS_HASH_RW_BKTLOCK |
+ CFS_HASH_NO_ITEMREF);
+
+ return filter->fo_iobuf_hash != NULL ? 0 : -ENOMEM;
+}
+
+/* Return iobuf allocated for @thread_id.
+ * If we haven't allocated a pool entry for this thread before, do so now and
+ * insert it into fo_iobuf_hash, otherwise we can find it from fo_iobuf_hash */
+void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti)
+{
+ struct filter_iobuf *pool = NULL;
+ __u64 key = 0;
+ int thread_id;
+ int rc;
+
+ thread_id = (oti && oti->oti_thread) ? oti->oti_thread->t_id : -1;
+ if (thread_id >= 0) {
+ struct ptlrpc_service_part *svcpt;
+
+ svcpt = oti->oti_thread->t_svcpt;
+ LASSERT(svcpt != NULL);
+
+ key = (__u64)(svcpt->scp_cpt) << 32 | thread_id;
+ pool = cfs_hash_lookup(filter->fo_iobuf_hash, &key);
+ if (pool != NULL)
+ return pool;
+ }
+
+ pool = filter_alloc_iobuf(filter, OBD_BRW_WRITE, PTLRPC_MAX_BRW_PAGES);
+ if (pool == NULL)
+ return NULL;
+
+ if (thread_id >= 0) {
+ pool->dr_hkey = key;
+ rc = cfs_hash_add_unique(filter->fo_iobuf_hash,
+ &key, &pool->dr_hlist);
+ /* ptlrpc service thould guarantee thread ID is unique */
+ LASSERT(rc != -EALREADY);
+ }
+
+ return pool;
}
/* mount the file system (secretly). lustre_cfg parameters are:
struct niobuf_remote *, struct inode *);
/* filter_io_*.c */
-struct filter_iobuf;
+struct filter_iobuf {
+ cfs_hlist_node_t dr_hlist;
+ __u64 dr_hkey;
+ /* number of reqs being processed */
+ cfs_atomic_t dr_numreqs;
+ cfs_waitq_t dr_wait;
+ int dr_max_pages;
+ int dr_npages;
+ int dr_error;
+ unsigned int dr_ignore_quota:1;
+ struct page **dr_pages;
+ unsigned long *dr_blocks;
+ struct filter_obd *dr_filter;
+};
+
int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
struct obd_ioobj *obj, struct niobuf_remote *, int,
struct niobuf_local *res, struct obd_trans_info *oti,
/* 512byte block min */
#define MAX_BLOCKS_PER_PAGE (CFS_PAGE_SIZE / 512)
-struct filter_iobuf {
- cfs_atomic_t dr_numreqs; /* number of reqs being processed */
- cfs_waitq_t dr_wait;
- int dr_max_pages;
- int dr_npages;
- int dr_error;
- struct page **dr_pages;
- unsigned long *dr_blocks;
- unsigned int dr_ignore_quota:1;
- struct filter_obd *dr_filter;
-};
static void record_start_io(struct filter_iobuf *iobuf, int rw, int size,
struct obd_export *exp)
if (iobuf->dr_blocks == NULL)
goto failed_2;
+ CFS_INIT_HLIST_NODE(&iobuf->dr_hlist);
iobuf->dr_filter = filter;
cfs_waitq_init(&iobuf->dr_wait);
cfs_atomic_set(&iobuf->dr_numreqs, 0);
void filter_free_iobuf(struct filter_iobuf *iobuf)
{
- int num_pages = iobuf->dr_max_pages;
+ int num_pages = iobuf->dr_max_pages;
- filter_clear_iobuf(iobuf);
+ filter_clear_iobuf(iobuf);
+ LASSERT(cfs_hlist_unhashed(&iobuf->dr_hlist));
OBD_FREE(iobuf->dr_blocks,
MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*iobuf->dr_blocks));
OBD_FREE(iobuf->dr_pages,
return;
}
- LASSERTF(filter->fo_iobuf_pool[thread_id] == iobuf,
- "iobuf mismatch for thread %d: pool %p iobuf %p\n",
- thread_id, filter->fo_iobuf_pool[thread_id], iobuf);
filter_clear_iobuf(iobuf);
}
CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
"number of OSS create threads to start");
+static char *oss_cpts;
+CFS_MODULE_PARM(oss_cpts, "s", charp, 0444,
+ "CPU partitions OSS threads should run on");
+
+static char *oss_io_cpts;
+CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
+ "CPU partitions OSS IO threads should run on");
+
/**
* Do not return server-side uid/gid to remote client
*/
LASSERT(thread != NULL);
LASSERT(thread->t_data == NULL);
- LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id);
OBD_ALLOC_PTR(tls);
if (tls == NULL)
#define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
+static struct cfs_cpt_table *ost_io_cptable;
+
/* Sigh - really, this is an OSS, the _server_, not the _target_ */
static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
{
static struct ptlrpc_service_conf svc_conf;
struct ost_obd *ost = &obd->u.ost;
struct lprocfs_static_vars lvars;
- int oss_min_threads = OSS_THREADS_MIN;
- int oss_max_threads = OSS_THREADS_MAX;
+ nodemask_t *mask;
int rc;
ENTRY;
cfs_mutex_init(&ost->ost_health_mutex);
- if (oss_num_threads == 0) {
- /* Base min threads on memory and cpus */
- oss_min_threads =
- cfs_num_online_cpus() * CFS_NUM_CACHEPAGES >>
- (27 - CFS_PAGE_SHIFT);
- if (oss_min_threads < OSS_THREADS_MIN)
- oss_min_threads = OSS_THREADS_MIN;
- /* Insure a 4x range for dynamic threads */
- if (oss_min_threads > OSS_THREADS_MAX / 4)
- oss_min_threads = OSS_THREADS_MAX / 4;
- oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
- }
-
svc_conf = (typeof(svc_conf)) {
.psc_name = LUSTRE_OSS_NAME,
.psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR,
},
.psc_thr = {
.tc_thr_name = "ll_ost",
- .tc_nthrs_min = oss_min_threads,
- .tc_nthrs_max = oss_max_threads,
+ .tc_thr_factor = OSS_THR_FACTOR,
+ .tc_nthrs_init = OSS_NTHRS_INIT,
+ .tc_nthrs_base = OSS_NTHRS_BASE,
+ .tc_nthrs_max = OSS_NTHRS_MAX,
.tc_nthrs_user = oss_num_threads,
+ .tc_cpu_affinity = 1,
.tc_ctx_tags = LCT_DT_THREAD,
},
+ .psc_cpt = {
+ .cc_pattern = oss_cpts,
+ },
.psc_ops = {
.so_req_handler = ost_handle,
.so_req_printer = target_print_req,
},
.psc_thr = {
.tc_thr_name = "ll_ost_create",
- .tc_nthrs_min = OSS_CR_THREADS_MIN,
- .tc_nthrs_max = OSS_CR_THREADS_MAX,
+ .tc_thr_factor = OSS_CR_THR_FACTOR,
+ .tc_nthrs_init = OSS_CR_NTHRS_INIT,
+ .tc_nthrs_base = OSS_CR_NTHRS_BASE,
+ .tc_nthrs_max = OSS_CR_NTHRS_MAX,
.tc_nthrs_user = oss_num_create_threads,
+ .tc_cpu_affinity = 1,
.tc_ctx_tags = LCT_DT_THREAD,
},
+ .psc_cpt = {
+ .cc_pattern = oss_cpts,
+ },
.psc_ops = {
.so_req_handler = ost_handle,
.so_req_printer = target_print_req,
GOTO(out_service, rc);
}
+ mask = cfs_cpt_table->ctb_nodemask;
+ /* event CPT feature is disabled in libcfs level by set partition
+ * number to 1, we still want to set node affinity for io service */
+ if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
+ int cpt = 0;
+ int i;
+
+ ost_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
+ for_each_node_mask(i, *mask) {
+ if (ost_io_cptable == NULL) {
+ CWARN("OSS failed to create CPT table\n");
+ break;
+ }
+
+ rc = cfs_cpt_set_node(ost_io_cptable, cpt++, i);
+ if (!rc) {
+ CWARN("OSS Failed to set node %d for"
+ "IO CPT table\n", i);
+ cfs_cpt_table_free(ost_io_cptable);
+ ost_io_cptable = NULL;
+ break;
+ }
+ }
+ }
+
memset(&svc_conf, 0, sizeof(svc_conf));
svc_conf = (typeof(svc_conf)) {
.psc_name = "ost_io",
},
.psc_thr = {
.tc_thr_name = "ll_ost_io",
- .tc_nthrs_min = oss_min_threads,
- .tc_nthrs_max = oss_max_threads,
+ .tc_thr_factor = OSS_THR_FACTOR,
+ .tc_nthrs_init = OSS_NTHRS_INIT,
+ .tc_nthrs_base = OSS_NTHRS_BASE,
+ .tc_nthrs_max = OSS_NTHRS_MAX,
.tc_nthrs_user = oss_num_threads,
.tc_cpu_affinity = 1,
.tc_ctx_tags = LCT_DT_THREAD,
},
+ .psc_cpt = {
+ .cc_cptable = ost_io_cptable,
+ .cc_pattern = ost_io_cptable == NULL ?
+ oss_io_cpts : NULL,
+ },
.psc_ops = {
.so_thr_init = ost_thread_init,
.so_thr_done = ost_thread_done,
ost->ost_create_service = NULL;
ost->ost_io_service = NULL;
- cfs_mutex_unlock(&ost->ost_health_mutex);
+ cfs_mutex_unlock(&ost->ost_health_mutex);
- lprocfs_obd_cleanup(obd);
+ lprocfs_obd_cleanup(obd);
+
+ if (ost_io_cptable != NULL) {
+ cfs_cpt_table_free(ost_io_cptable);
+ ost_io_cptable = NULL;
+ }
- RETURN(err);
+ RETURN(err);
}
static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r);
-/* threads for handling "create" request */
-#define OSS_CR_THREADS_MIN 2UL
-#define OSS_CR_THREADS_MAX 16UL
-
/* Quota stuff */
extern quota_interface_t *quota_interface;
*
* it might not be precise but should be good enough.
*/
-#define REQS_ALL_BITS(svcpt) ((int)(sizeof((svcpt)->scp_hist_seq) * 8))
-#define REQS_SEC_BITS 32
-#define REQS_USEC_BITS 16
-/* will be replaced by bits for total service partition number soon */
-#define REQS_CPT_BITS(svcpt) 0
-#define REQS_SEQ_BITS(svcpt) (REQS_ALL_BITS(svcpt) - REQS_CPT_BITS(svcpt) -\
- REQS_SEC_BITS - REQS_USEC_BITS)
-
-#define REQS_SEQ_SHIFT(svcpt) (REQS_CPT_BITS(svcpt))
-#define REQS_USEC_SHIFT(svcpt) (REQS_SEQ_SHIFT(svcpt) + REQS_SEQ_BITS(svcpt))
-#define REQS_SEC_SHIFT(svcpt) (REQS_USEC_SHIFT(svcpt) + REQS_USEC_BITS)
+
+#define REQS_CPT_BITS(svcpt) ((svcpt)->scp_service->srv_cpt_bits)
+
+#define REQS_SEC_SHIFT 32
+#define REQS_USEC_SHIFT 16
+#define REQS_SEQ_SHIFT(svcpt) REQS_CPT_BITS(svcpt)
static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
/* set sequence ID for request and add it to history list,
* it must be called with hold svcpt::scp_lock */
- LASSERT(REQS_SEQ_BITS(svcpt) > 0);
-
- new_seq = (sec << REQS_SEC_SHIFT(svcpt)) |
- (usec << REQS_USEC_SHIFT(svcpt)) | svcpt->scp_cpt;
+ new_seq = (sec << REQS_SEC_SHIFT) |
+ (usec << REQS_USEC_SHIFT) | svcpt->scp_cpt;
if (new_seq > svcpt->scp_hist_seq) {
/* This handles the initial case of scp_hist_seq == 0 or
* we just jumped into a new time window */
svcpt->scp_hist_seq = new_seq;
} else {
+ LASSERT(REQS_SEQ_SHIFT(svcpt) < REQS_USEC_SHIFT);
/* NB: increase sequence number in current usec bucket,
* however, it's possible that we used up all bits for
* sequence and jumped into the next usec bucket (future time),
* then we hope there will be less RPCs per bucket at some
* point, and sequence will catch up again */
- svcpt->scp_hist_seq += (1U << REQS_CPT_BITS(svcpt));
+ svcpt->scp_hist_seq += (1U << REQS_SEQ_SHIFT(svcpt));
new_seq = svcpt->scp_hist_seq;
}
int count, int *eof, void *data)
{
struct ptlrpc_service *svc = data;
+ struct ptlrpc_service_part *svcpt;
+ int total = 0;
+ int i;
*eof = 1;
- return snprintf(page, count, "%d\n", svc->srv_part->scp_hist_nrqbds);
+
+ ptlrpc_service_for_each_part(svcpt, i, svc)
+ total += svcpt->scp_hist_nrqbds;
+
+ return snprintf(page, count, "%d\n", total);
}
static int
ptlrpc_lprocfs_read_req_history_max(char *page, char **start, off_t off,
int count, int *eof, void *data)
{
- struct ptlrpc_service *svc = data;
+ struct ptlrpc_service *svc = data;
+ struct ptlrpc_service_part *svcpt;
+ int total = 0;
+ int i;
- *eof = 1;
- return snprintf(page, count, "%d\n", svc->srv_max_history_rqbds);
+ *eof = 1;
+ ptlrpc_service_for_each_part(svcpt, i, svc)
+ total += svc->srv_hist_nrqbds_cpt_max;
+
+ return snprintf(page, count, "%d\n", total);
}
static int
ptlrpc_lprocfs_write_req_history_max(struct file *file, const char *buffer,
unsigned long count, void *data)
{
- struct ptlrpc_service *svc = data;
- int bufpages;
- int val;
- int rc = lprocfs_write_helper(buffer, count, &val);
+ struct ptlrpc_service *svc = data;
+ int bufpages;
+ int val;
+ int rc;
+ rc = lprocfs_write_helper(buffer, count, &val);
if (rc < 0)
return rc;
return -ERANGE;
cfs_spin_lock(&svc->srv_lock);
- svc->srv_max_history_rqbds = val;
+
+ if (val == 0)
+ svc->srv_hist_nrqbds_cpt_max = 0;
+ else
+ svc->srv_hist_nrqbds_cpt_max = max(1, (val / svc->srv_ncpts));
+
cfs_spin_unlock(&svc->srv_lock);
return count;
static int
ptlrpc_lprocfs_rd_threads_min(char *page, char **start, off_t off,
- int count, int *eof, void *data)
+ int count, int *eof, void *data)
{
- struct ptlrpc_service *svc = data;
+ struct ptlrpc_service *svc = data;
- return snprintf(page, count, "%d\n", svc->srv_threads_min);
+ return snprintf(page, count, "%d\n",
+ svc->srv_nthrs_cpt_init * svc->srv_ncpts);
}
static int
ptlrpc_lprocfs_wr_threads_min(struct file *file, const char *buffer,
unsigned long count, void *data)
{
- struct ptlrpc_service *svc = data;
- int val;
- int rc = lprocfs_write_helper(buffer, count, &val);
+ struct ptlrpc_service *svc = data;
+ int val;
+ int rc = lprocfs_write_helper(buffer, count, &val);
- if (rc < 0)
- return rc;
+ if (rc < 0)
+ return rc;
- if (val < 2)
- return -ERANGE;
+ if (val / svc->srv_ncpts < PTLRPC_NTHRS_INIT)
+ return -ERANGE;
cfs_spin_lock(&svc->srv_lock);
- if (val > svc->srv_threads_max) {
+ if (val > svc->srv_nthrs_cpt_limit * svc->srv_ncpts) {
cfs_spin_unlock(&svc->srv_lock);
return -ERANGE;
}
- svc->srv_threads_min = val;
+ svc->srv_nthrs_cpt_init = val / svc->srv_ncpts;
+
cfs_spin_unlock(&svc->srv_lock);
return count;
int count, int *eof, void *data)
{
struct ptlrpc_service *svc = data;
+ struct ptlrpc_service_part *svcpt;
+ int total = 0;
+ int i;
- LASSERT(svc->srv_part != NULL);
- return snprintf(page, count, "%d\n",
- svc->srv_part->scp_nthrs_running);
+ LASSERT(svc->srv_parts != NULL);
+ ptlrpc_service_for_each_part(svcpt, i, svc)
+ total += svcpt->scp_nthrs_running;
+
+ return snprintf(page, count, "%d\n", total);
}
static int
ptlrpc_lprocfs_rd_threads_max(char *page, char **start, off_t off,
- int count, int *eof, void *data)
+ int count, int *eof, void *data)
{
- struct ptlrpc_service *svc = data;
+ struct ptlrpc_service *svc = data;
- return snprintf(page, count, "%d\n", svc->srv_threads_max);
+ return snprintf(page, count, "%d\n",
+ svc->srv_nthrs_cpt_limit * svc->srv_ncpts);
}
static int
ptlrpc_lprocfs_wr_threads_max(struct file *file, const char *buffer,
- unsigned long count, void *data)
+ unsigned long count, void *data)
{
- struct ptlrpc_service *svc = data;
- int val;
- int rc = lprocfs_write_helper(buffer, count, &val);
+ struct ptlrpc_service *svc = data;
+ int val;
+ int rc = lprocfs_write_helper(buffer, count, &val);
- if (rc < 0)
- return rc;
+ if (rc < 0)
+ return rc;
- if (val < 2)
- return -ERANGE;
+ if (val / svc->srv_ncpts < PTLRPC_NTHRS_INIT)
+ return -ERANGE;
cfs_spin_lock(&svc->srv_lock);
- if (val < svc->srv_threads_min) {
+ if (val < svc->srv_nthrs_cpt_init * svc->srv_ncpts) {
cfs_spin_unlock(&svc->srv_lock);
return -ERANGE;
}
- svc->srv_threads_max = val;
+ svc->srv_nthrs_cpt_limit = val / svc->srv_ncpts;
+
cfs_spin_unlock(&svc->srv_lock);
return count;
}
struct ptlrpc_srh_iterator {
- __u64 srhi_seq;
- struct ptlrpc_request *srhi_req;
+ int srhi_idx;
+ __u64 srhi_seq;
+ struct ptlrpc_request *srhi_req;
};
int
static void *
ptlrpc_lprocfs_svc_req_history_start(struct seq_file *s, loff_t *pos)
{
- struct ptlrpc_service *svc = s->private;
- struct ptlrpc_srh_iterator *srhi;
- int rc;
+ struct ptlrpc_service *svc = s->private;
+ struct ptlrpc_service_part *svcpt;
+ struct ptlrpc_srh_iterator *srhi;
+ int rc;
+ int i;
- OBD_ALLOC(srhi, sizeof(*srhi));
- if (srhi == NULL)
- return NULL;
+ OBD_ALLOC(srhi, sizeof(*srhi));
+ if (srhi == NULL)
+ return NULL;
- srhi->srhi_seq = 0;
- srhi->srhi_req = NULL;
+ srhi->srhi_seq = 0;
+ srhi->srhi_req = NULL;
- cfs_spin_lock(&svc->srv_part->scp_lock);
- rc = ptlrpc_lprocfs_svc_req_history_seek(svc->srv_part, srhi, *pos);
- cfs_spin_unlock(&svc->srv_part->scp_lock);
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ srhi->srhi_idx = i;
- if (rc == 0) {
- *pos = srhi->srhi_seq;
- return srhi;
- }
+ cfs_spin_lock(&svcpt->scp_lock);
+ rc = ptlrpc_lprocfs_svc_req_history_seek(svcpt, srhi, *pos);
+ cfs_spin_unlock(&svcpt->scp_lock);
+ if (rc == 0) {
+ *pos = srhi->srhi_seq;
+ return srhi;
+ }
+ }
- OBD_FREE(srhi, sizeof(*srhi));
- return NULL;
+ OBD_FREE(srhi, sizeof(*srhi));
+ return NULL;
}
static void
void *iter, loff_t *pos)
{
struct ptlrpc_service *svc = s->private;
- struct ptlrpc_service_part *svcpt = svc->srv_part;
struct ptlrpc_srh_iterator *srhi = iter;
- int rc;
+ struct ptlrpc_service_part *svcpt;
+ int rc = 0;
+ int i;
- cfs_spin_lock(&svcpt->scp_lock);
- rc = ptlrpc_lprocfs_svc_req_history_seek(svcpt, srhi, *pos + 1);
- cfs_spin_unlock(&svcpt->scp_lock);
+ for (i = srhi->srhi_idx; i < svc->srv_ncpts; i++) {
+ svcpt = svc->srv_parts[i];
+
+ srhi->srhi_idx = i;
+
+ cfs_spin_lock(&svcpt->scp_lock);
+ rc = ptlrpc_lprocfs_svc_req_history_seek(svcpt, srhi, *pos + 1);
+ cfs_spin_unlock(&svcpt->scp_lock);
+ if (rc == 0)
+ break;
+ }
if (rc != 0) {
OBD_FREE(srhi, sizeof(*srhi));
static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter)
{
struct ptlrpc_service *svc = s->private;
- struct ptlrpc_service_part *svcpt = svc->srv_part;
struct ptlrpc_srh_iterator *srhi = iter;
+ struct ptlrpc_service_part *svcpt;
struct ptlrpc_request *req;
int rc;
+ LASSERT(srhi->srhi_idx < svc->srv_ncpts);
+
+ svcpt = svc->srv_parts[srhi->srhi_idx];
+
cfs_spin_lock(&svcpt->scp_lock);
rc = ptlrpc_lprocfs_svc_req_history_seek(svcpt, srhi, srhi->srhi_seq);
return 0;
}
+#define PTLRPC_AT_LINE_SIZE 128
+
/* See also lprocfs_rd_timeouts */
static int ptlrpc_lprocfs_rd_timeouts(char *page, char **start, off_t off,
int count, int *eof, void *data)
time_t worstt;
unsigned int cur;
unsigned int worst;
+ int nob = 0;
int rc = 0;
+ int cpt;
+ int i;
- svcpt = svc->srv_part;
- LASSERT(svcpt != NULL);
+ LASSERT(svc->srv_parts != NULL);
+
+ if (AT_OFF) {
+ rc += snprintf(page + rc, count - rc,
+ "adaptive timeouts off, using obd_timeout %u\n",
+ obd_timeout);
+ *eof = 1;
+ return rc;
+ }
+
+ cpt = ((unsigned)off) / PTLRPC_AT_LINE_SIZE;
+
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (i < cpt)
+ continue;
+
+ cur = at_get(&svcpt->scp_at_estimate);
+ worst = svcpt->scp_at_estimate.at_worst_ever;
+ worstt = svcpt->scp_at_estimate.at_worst_time;
+ s2dhms(&ts, cfs_time_current_sec() - worstt);
+
+ nob = snprintf(page + rc, count - rc,
+ "%10s : cur %3u worst %3u (at %ld, "
+ DHMS_FMT" ago) ", "service",
+ cur, worst, worstt, DHMS_VARS(&ts));
+
+ nob += lprocfs_at_hist_helper(page, count, rc + nob,
+ &svcpt->scp_at_estimate);
+ LASSERT(nob < PTLRPC_AT_LINE_SIZE);
+ /* fill the whole line with spaces, so we can locate
+ * partition by offset on the next call... */
+ memset(page + rc + nob, ' ', PTLRPC_AT_LINE_SIZE - nob);
+ page[rc + PTLRPC_AT_LINE_SIZE - 1] = '\n';
+ rc += PTLRPC_AT_LINE_SIZE;
+
+ if (count - rc < PTLRPC_AT_LINE_SIZE)
+ break;
+ }
+
+ if (i == svc->srv_ncpts - 1)
+ *eof = 1;
- *eof = 1;
- cur = at_get(&svcpt->scp_at_estimate);
- worst = svcpt->scp_at_estimate.at_worst_ever;
- worstt = svcpt->scp_at_estimate.at_worst_time;
- s2dhms(&ts, cfs_time_current_sec() - worstt);
- if (AT_OFF)
- rc += snprintf(page + rc, count - rc,
- "adaptive timeouts off, using obd_timeout %u\n",
- obd_timeout);
- rc += snprintf(page + rc, count - rc,
- "%10s : cur %3u worst %3u (at %ld, "DHMS_FMT" ago) ",
- "service", cur, worst, worstt,
- DHMS_VARS(&ts));
- rc = lprocfs_at_hist_helper(page, count, rc, &svcpt->scp_at_estimate);
return rc;
}
static int ptlrpc_lprocfs_wr_hp_ratio(struct file *file, const char *buffer,
unsigned long count, void *data)
{
- struct ptlrpc_service *svc = data;
- int rc, val;
+ struct ptlrpc_service *svc = data;
+ int rc;
+ int val;
- rc = lprocfs_write_helper(buffer, count, &val);
- if (rc < 0)
- return rc;
- if (val < 0)
- return -ERANGE;
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc < 0)
+ return rc;
+
+ if (val < 0)
+ return -ERANGE;
cfs_spin_lock(&svc->srv_lock);
svc->srv_hpreq_ratio = val;
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_RQBD))
return (-ENOMEM);
+ /* NB: We need to replace LNET_INS_AFTER with LNET_INS_LOCAL
+ * after LNet SMP patches landed */
rc = LNetMEAttach(service->srv_req_portal,
match_id, 0, ~0, LNET_UNLINK, LNET_INS_AFTER, &me_h);
if (rc != 0) {
struct ptlrpc_service *svc = svcpt->scp_service;
struct ptlrpc_request_buffer_desc *rqbd;
- OBD_ALLOC_PTR(rqbd);
+ OBD_CPT_ALLOC_PTR(rqbd, svc->srv_cptable, svcpt->scp_cpt);
if (rqbd == NULL)
return NULL;
rqbd->rqbd_svcpt = svcpt;
- rqbd->rqbd_refcount = 0;
- rqbd->rqbd_cbid.cbid_fn = request_in_callback;
- rqbd->rqbd_cbid.cbid_arg = rqbd;
- CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);
- OBD_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
-
- if (rqbd->rqbd_buffer == NULL) {
- OBD_FREE_PTR(rqbd);
- return (NULL);
- }
+ rqbd->rqbd_refcount = 0;
+ rqbd->rqbd_cbid.cbid_fn = request_in_callback;
+ rqbd->rqbd_cbid.cbid_arg = rqbd;
+ CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);
+ OBD_CPT_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_cptable,
+ svcpt->scp_cpt, svc->srv_buf_size);
+ if (rqbd->rqbd_buffer == NULL) {
+ OBD_FREE_PTR(rqbd);
+ return NULL;
+ }
cfs_spin_lock(&svcpt->scp_lock);
cfs_list_add(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle);
}
int
-ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt)
+ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
{
struct ptlrpc_service *svc = svcpt->scp_service;
struct ptlrpc_request_buffer_desc *rqbd;
rc = -ENOMEM;
break;
}
-
- if (ptlrpc_server_post_idle_rqbds(svcpt) < 0) {
- rc = -EAGAIN;
- break;
- }
}
CDEBUG(D_RPCTRACE,
"%s: allocate %d new %d-byte reqbufs (%d/%d left), rc = %d\n",
- svc->srv_name, i, svc->srv_buf_size,
- svcpt->scp_nrqbds_posted, svcpt->scp_nrqbds_total, rc);
+ svc->srv_name, i, svc->srv_buf_size, svcpt->scp_nrqbds_posted,
+ svcpt->scp_nrqbds_total, rc);
+
+ if (post && rc == 0)
+ rc = ptlrpc_server_post_idle_rqbds(svcpt);
return rc;
}
}
static void
-ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf,
- int *min_p, int *max_p)
+ptlrpc_server_nthreads_check(struct ptlrpc_service *svc,
+ struct ptlrpc_service_conf *conf)
{
#ifdef __KERNEL__
struct ptlrpc_service_thr_conf *tc = &conf->psc_thr;
- int nthrs_min;
- int nthrs;
+ unsigned init;
+ unsigned total;
+ unsigned nthrs;
+ int weight;
- nthrs_min = PTLRPC_NTHRS_MIN + (conf->psc_ops.so_hpreq_handler != NULL);
- nthrs_min = max_t(int, nthrs_min, tc->tc_nthrs_min);
+ /*
+ * Common code for estimating & validating threads number.
+ * CPT affinity service could have percpt thread-pool instead
+ * of a global thread-pool, which means user might not always
+ * get the threads number they give it in conf::tc_nthrs_user
+ * even they did set. It's because we need to validate threads
+ * number for each CPT to guarantee each pool will have enough
+ * threads to keep the service healthy.
+ */
+ init = PTLRPC_NTHRS_INIT + (svc->srv_ops.so_hpreq_handler != NULL);
+ init = max_t(int, init, tc->tc_nthrs_init);
+
+ /* NB: please see comments in lustre_lnet.h for definition
+ * details of these members */
+ LASSERT(tc->tc_nthrs_max != 0);
+
+ if (tc->tc_nthrs_user != 0) {
+ /* In case there is a reason to test a service with many
+ * threads, we give a less strict check here, it can
+ * be up to 8 * nthrs_max */
+ total = min(tc->tc_nthrs_max * 8, tc->tc_nthrs_user);
+ nthrs = total / svc->srv_ncpts;
+ init = max(init, nthrs);
+ goto out;
+ }
- nthrs = tc->tc_nthrs_user;
- if (nthrs != 0) { /* validate it */
- nthrs = min_t(int, nthrs, tc->tc_nthrs_max);
- nthrs = max_t(int, nthrs, nthrs_min);
- *min_p = *max_p = nthrs;
- return;
+ total = tc->tc_nthrs_max;
+ if (tc->tc_nthrs_base == 0) {
+ /* don't care about base threads number per partition,
+ * this is most for non-affinity service */
+ nthrs = total / svc->srv_ncpts;
+ goto out;
}
- /*
- * NB: we will add some common code here for estimating, for example:
- * add a new member ptlrpc_service_thr_conf::tc_factor, and estimate
- * threads number based on:
- * (online_cpus * conf::tc_factor) + conf::tc_nthrs_base.
- *
- * So we can remove code block like estimation in ost_setup, also,
- * we might estimate MDS threads number as well instead of using
- * absolute number, and have more threads on fat servers to improve
- * availability of service.
- *
- * Also, we will need to validate threads number at here for
- * CPT affinity service (CPU ParTion) in the future.
- * A service can have percpt thread-pool instead of a global thread
- * pool for each service, which means user might not always get the
- * threads number they want even they set it in conf::tc_nthrs_user,
- * because we need to adjust threads number for each CPT, instead of
- * just use (conf::tc_nthrs_user / NCPTS), to make sure each pool
- * will be healthy.
- */
- *max_p = tc->tc_nthrs_max;
- *min_p = nthrs_min;
-#else /* __KERNEL__ */
- *max_p = *min_p = 1; /* whatever */
+ nthrs = tc->tc_nthrs_base;
+ if (svc->srv_ncpts == 1) {
+ int i;
+
+ /* NB: Increase the base number if it's single partition
+ * and total number of cores/HTs is larger or equal to 4.
+ * result will always < 2 * nthrs_base */
+ weight = cfs_cpt_weight(svc->srv_cptable, CFS_CPT_ANY);
+ for (i = 1; (weight >> (i + 1)) != 0 && /* >= 4 cores/HTs */
+ (tc->tc_nthrs_base >> i) != 0; i++)
+ nthrs += tc->tc_nthrs_base >> i;
+ }
+
+ if (tc->tc_thr_factor != 0) {
+ int factor = tc->tc_thr_factor;
+ const int fade = 4;
+
+ /*
+ * User wants to increase number of threads with for
+ * each CPU core/HT, most likely the factor is larger then
+ * one thread/core because service threads are supposed to
+ * be blocked by lock or wait for IO.
+ */
+ /*
+ * Amdahl's law says that adding processors wouldn't give
+ * a linear increasing of parallelism, so it's nonsense to
+ * have too many threads no matter how many cores/HTs
+ * there are.
+ */
+ if (cfs_cpu_ht_nsiblings(0) > 1) { /* weight is # of HTs */
+ /* depress thread factor for hyper-thread */
+ factor = factor - (factor >> 1) + (factor >> 3);
+ }
+
+ weight = cfs_cpt_weight(svc->srv_cptable, 0);
+ LASSERT(weight > 0);
+
+ for (; factor > 0 && weight > 0; factor--, weight -= fade)
+ nthrs += min(weight, fade) * factor;
+ }
+
+ if (nthrs * svc->srv_ncpts > tc->tc_nthrs_max) {
+ nthrs = max(tc->tc_nthrs_base,
+ tc->tc_nthrs_max / svc->srv_ncpts);
+ }
+ out:
+ nthrs = max(nthrs, tc->tc_nthrs_init);
+ svc->srv_nthrs_cpt_limit = nthrs;
+ svc->srv_nthrs_cpt_init = init;
+
+ if (nthrs * svc->srv_ncpts > tc->tc_nthrs_max) {
+ LCONSOLE_WARN("%s: This service may have more threads (%d) "
+ "than the given soft limit (%d)\n",
+ svc->srv_name, nthrs * svc->srv_ncpts,
+ tc->tc_nthrs_max);
+ }
#endif
}
*/
static int
ptlrpc_service_part_init(struct ptlrpc_service *svc,
- struct ptlrpc_service_part *svcpt)
+ struct ptlrpc_service_part *svcpt, int cpt)
{
struct ptlrpc_at_array *array;
int size;
int index;
int rc;
+ svcpt->scp_cpt = cpt;
CFS_INIT_LIST_HEAD(&svcpt->scp_threads);
/* rqbd and incoming request queue */
array->paa_deadline = -1;
/* allocate memory for scp_at_array (ptlrpc_at_array) */
- OBD_ALLOC(array->paa_reqs_array, sizeof(cfs_list_t) * size);
+ OBD_CPT_ALLOC(array->paa_reqs_array,
+ svc->srv_cptable, cpt, sizeof(cfs_list_t) * size);
if (array->paa_reqs_array == NULL)
return -ENOMEM;
for (index = 0; index < size; index++)
CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]);
- OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size);
+ OBD_CPT_ALLOC(array->paa_reqs_count,
+ svc->srv_cptable, cpt, sizeof(__u32) * size);
if (array->paa_reqs_count == NULL)
goto failed;
/* assign this before call ptlrpc_grow_req_bufs */
svcpt->scp_service = svc;
/* Now allocate the request buffers, but don't post them now */
- rc = ptlrpc_grow_req_bufs(svcpt);
+ rc = ptlrpc_grow_req_bufs(svcpt, 0);
/* We shouldn't be under memory pressure at startup, so
* fail if we can't allocate all our buffers at this time. */
if (rc != 0)
ptlrpc_register_service(struct ptlrpc_service_conf *conf,
cfs_proc_dir_entry_t *proc_entry)
{
+ struct ptlrpc_service_cpt_conf *cconf = &conf->psc_cpt;
struct ptlrpc_service *service;
+ struct ptlrpc_service_part *svcpt;
+ struct cfs_cpt_table *cptable;
+ __u32 *cpts = NULL;
+ int ncpts;
+ int cpt;
int rc;
+ int i;
ENTRY;
LASSERT(conf->psc_buf.bc_nbufs > 0);
conf->psc_buf.bc_req_max_size + SPTLRPC_MAX_PAYLOAD);
LASSERT(conf->psc_thr.tc_ctx_tags != 0);
- OBD_ALLOC_PTR(service);
- if (service == NULL)
+ cptable = cconf->cc_cptable;
+ if (cptable == NULL)
+ cptable = cfs_cpt_table;
+
+ if (!conf->psc_thr.tc_cpu_affinity) {
+ ncpts = 1;
+ } else {
+ ncpts = cfs_cpt_number(cptable);
+ if (cconf->cc_pattern != NULL) {
+ struct cfs_expr_list *el;
+
+ rc = cfs_expr_list_parse(cconf->cc_pattern,
+ strlen(cconf->cc_pattern),
+ 0, ncpts - 1, &el);
+ if (rc != 0) {
+ CERROR("%s: invalid CPT pattern string: %s",
+ conf->psc_name, cconf->cc_pattern);
+ RETURN(ERR_PTR(-EINVAL));
+ }
+
+ rc = cfs_expr_list_values(el, ncpts, &cpts);
+ cfs_expr_list_free(el);
+ if (rc <= 0) {
+ CERROR("%s: failed to parse CPT array %s: %d\n",
+ conf->psc_name, cconf->cc_pattern, rc);
+ RETURN(ERR_PTR(rc < 0 ? rc : -EINVAL));
+ }
+ ncpts = rc;
+ }
+ }
+
+ OBD_ALLOC(service, offsetof(struct ptlrpc_service, srv_parts[ncpts]));
+ if (service == NULL) {
+ if (cpts != NULL)
+ OBD_FREE(cpts, sizeof(*cpts) * ncpts);
RETURN(ERR_PTR(-ENOMEM));
+ }
+
+ service->srv_cptable = cptable;
+ service->srv_cpts = cpts;
+ service->srv_ncpts = ncpts;
+
+ service->srv_cpt_bits = 0; /* it's zero already, easy to read... */
+ while ((1 << service->srv_cpt_bits) < cfs_cpt_number(cptable))
+ service->srv_cpt_bits++;
/* public members */
cfs_spin_lock_init(&service->srv_lock);
CFS_INIT_LIST_HEAD(&service->srv_list); /* for safty of cleanup */
/* buffer configuration */
- service->srv_nbuf_per_group = test_req_buffer_pressure ?
- 1 : conf->psc_buf.bc_nbufs;
+ service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 :
+ max(conf->psc_buf.bc_nbufs /
+ service->srv_ncpts, 1U);
service->srv_max_req_size = conf->psc_buf.bc_req_max_size +
SPTLRPC_MAX_PAYLOAD;
service->srv_buf_size = conf->psc_buf.bc_buf_size;
conf->psc_buf.bc_rep_max_size + SPTLRPC_MAX_PAYLOAD)
service->srv_max_reply_size <<= 1;
- ptlrpc_server_nthreads_check(conf, &service->srv_threads_min,
- &service->srv_threads_max);
-
service->srv_thread_name = conf->psc_thr.tc_thr_name;
service->srv_ctx_tags = conf->psc_thr.tc_ctx_tags;
- service->srv_cpu_affinity = !!conf->psc_thr.tc_cpu_affinity;
service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO;
service->srv_ops = conf->psc_ops;
- OBD_ALLOC_PTR(service->srv_part);
- if (service->srv_part == NULL)
- GOTO(failed, rc = -ENOMEM);
+ for (i = 0; i < ncpts; i++) {
+ if (!conf->psc_thr.tc_cpu_affinity)
+ cpt = CFS_CPT_ANY;
+ else
+ cpt = cpts != NULL ? cpts[i] : i;
- rc = ptlrpc_service_part_init(service, service->srv_part);
- if (rc != 0)
- GOTO(failed, rc);
+ OBD_CPT_ALLOC(svcpt, cptable, cpt, sizeof(*svcpt));
+ if (svcpt == NULL)
+ GOTO(failed, rc = -ENOMEM);
+
+ service->srv_parts[i] = svcpt;
+ rc = ptlrpc_service_part_init(service, svcpt, cpt);
+ if (rc != 0)
+ GOTO(failed, rc);
+ }
+
+ ptlrpc_server_nthreads_check(service, conf);
rc = LNetSetLazyPortal(service->srv_req_portal);
LASSERT(rc == 0);
/* cull some history?
* I expect only about 1 or 2 rqbds need to be recycled here */
- while (svcpt->scp_hist_nrqbds > svc->srv_max_history_rqbds) {
+ while (svcpt->scp_hist_nrqbds > svc->srv_hist_nrqbds_cpt_max) {
rqbd = cfs_list_entry(svcpt->scp_hist_rqbds.next,
struct ptlrpc_request_buffer_desc,
rqbd_list);
cfs_list_entry (tmp, struct ptlrpc_service, srv_list);
struct ptlrpc_service_part *svcpt;
- svcpt = svc->srv_part;
+ LASSERT(svc->srv_ncpts == 1);
+ svcpt = svc->srv_parts[0];
if (svcpt->scp_nthrs_running != 0) /* I've recursed */
continue;
* space. */
if (avail <= low_water)
- ptlrpc_grow_req_bufs(svcpt);
+ ptlrpc_grow_req_bufs(svcpt, 1);
if (svcpt->scp_service->srv_stats) {
lprocfs_counter_add(svcpt->scp_service->srv_stats,
ptlrpc_threads_increasable(struct ptlrpc_service_part *svcpt)
{
return svcpt->scp_nthrs_running +
- svcpt->scp_nthrs_starting < svcpt->scp_service->srv_threads_max;
+ svcpt->scp_nthrs_starting <
+ svcpt->scp_service->srv_nthrs_cpt_limit;
}
/**
thread->t_pid = cfs_curproc_pid();
cfs_daemonize_ctxt(thread->t_name);
-#if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA)
- /* we need to do this before any per-thread allocation is done so that
- * we get the per-thread allocations on local node. bug 7342 */
- if (svc->srv_cpu_affinity) {
- int cpu, num_cpu;
-
- for (cpu = 0, num_cpu = 0; cpu < cfs_num_possible_cpus();
- cpu++) {
- if (!cpu_online(cpu))
- continue;
- if (num_cpu == thread->t_id % cfs_num_online_cpus())
- break;
- num_cpu++;
- }
- cfs_set_cpus_allowed(cfs_current(),
- node_to_cpumask(cpu_to_node(cpu)));
- }
-#endif
+ /* NB: we will call cfs_cpt_bind() for all threads, because we
+ * might want to run lustre server only on a subset of system CPUs,
+ * in that case ->scp_cpt is CFS_CPT_ANY */
+ rc = cfs_cpt_bind(svc->srv_cptable, svcpt->scp_cpt);
+ if (rc != 0) {
+ CWARN("%s: failed to bind %s on CPT %d\n",
+ svc->srv_name, thread->t_name, svcpt->scp_cpt);
+ }
#ifdef WITH_GROUP_INFO
ginfo = cfs_groups_alloc(0);
env->le_ctx.lc_thread = thread;
env->le_ctx.lc_cookie = 0x6;
+ while (!cfs_list_empty(&svcpt->scp_rqbd_idle)) {
+ rc = ptlrpc_server_post_idle_rqbds(svcpt);
+ if (rc >= 0)
+ continue;
+
+ CERROR("Failed to post rqbd for %s on CPT %d: %d\n",
+ svc->srv_name, svcpt->scp_cpt, rc);
+ goto out_srv_fini;
+ }
+
/* Alloc reply state structure for this one */
OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size);
if (!rs) {
*/
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
+ struct ptlrpc_service_part *svcpt;
+ int i;
ENTRY;
- if (svc != NULL && svc->srv_part != NULL)
- ptlrpc_svcpt_stop_threads(svc->srv_part);
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service != NULL)
+ ptlrpc_svcpt_stop_threads(svcpt);
+ }
+
EXIT;
}
int ptlrpc_start_threads(struct ptlrpc_service *svc)
{
- int i, rc = 0;
- ENTRY;
+ int rc = 0;
+ int i;
+ int j;
+ ENTRY;
- /* We require 2 threads min - see note in
- ptlrpc_server_handle_request */
- LASSERT(svc->srv_threads_min >= 2);
- for (i = 0; i < svc->srv_threads_min; i++) {
- rc = ptlrpc_start_thread(svc->srv_part, 1);
- /* We have enough threads, don't start more. b=15759 */
- if (rc == -EMFILE) {
- rc = 0;
- break;
- }
- if (rc) {
- CERROR("cannot start %s thread #%d: rc %d\n",
- svc->srv_thread_name, i, rc);
- ptlrpc_stop_all_threads(svc);
- break;
- }
- }
- RETURN(rc);
+ /* We require 2 threads min, see note in ptlrpc_server_handle_request */
+ LASSERT(svc->srv_nthrs_cpt_init >= PTLRPC_NTHRS_INIT);
+
+ for (i = 0; i < svc->srv_ncpts; i++) {
+ for (j = 0; j < svc->srv_nthrs_cpt_init; j++) {
+ rc = ptlrpc_start_thread(svc->srv_parts[i], 1);
+ if (rc == 0)
+ continue;
+
+ if (rc != -EMFILE)
+ goto failed;
+ /* We have enough threads, don't start more. b=15759 */
+ break;
+ }
+ }
+
+ RETURN(0);
+ failed:
+ CERROR("cannot start %s thread #%d_%d: rc %d\n",
+ svc->srv_thread_name, i, j, rc);
+ ptlrpc_stop_all_threads(svc);
+ RETURN(rc);
}
int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait)
LASSERT(svcpt != NULL);
- CDEBUG(D_RPCTRACE, "%s started %d min %d max %d\n",
- svc->srv_name, svcpt->scp_nthrs_running,
- svc->srv_threads_min, svc->srv_threads_max);
+ CDEBUG(D_RPCTRACE, "%s[%d] started %d min %d max %d\n",
+ svc->srv_name, svcpt->scp_cpt, svcpt->scp_nthrs_running,
+ svc->srv_nthrs_cpt_init, svc->srv_nthrs_cpt_limit);
again:
if (unlikely(svc->srv_is_stopping))
if (!ptlrpc_threads_increasable(svcpt) ||
(OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) &&
- svcpt->scp_nthrs_running == svc->srv_threads_min - 1))
+ svcpt->scp_nthrs_running == svc->srv_nthrs_cpt_init - 1))
RETURN(-EMFILE);
- OBD_ALLOC_PTR(thread);
+ OBD_CPT_ALLOC_PTR(thread, svc->srv_cptable, svcpt->scp_cpt);
if (thread == NULL)
RETURN(-ENOMEM);
cfs_waitq_init(&thread->t_ctl_waitq);
cfs_list_add(&thread->t_link, &svcpt->scp_threads);
cfs_spin_unlock(&svcpt->scp_lock);
- snprintf(thread->t_name, PTLRPC_THR_NAME_LEN,
- "%s_%02d", svc->srv_thread_name, thread->t_id);
+ if (svcpt->scp_cpt >= 0) {
+ snprintf(thread->t_name, PTLRPC_THR_NAME_LEN, "%s%02d_%03d",
+ svc->srv_thread_name, svcpt->scp_cpt, thread->t_id);
+ } else {
+ snprintf(thread->t_name, PTLRPC_THR_NAME_LEN, "%s_%04d",
+ svc->srv_thread_name, thread->t_id);
+ }
CDEBUG(D_RPCTRACE, "starting thread '%s'\n", thread->t_name);
/*
static void
ptlrpc_service_del_atimer(struct ptlrpc_service *svc)
{
- struct ptlrpc_service_part *svcpt;
+ struct ptlrpc_service_part *svcpt;
+ int i;
/* early disarm AT timer... */
- do { /* iterrate over multiple partitions in the future */
- svcpt = svc->srv_part;
- if (svcpt == NULL || svcpt->scp_service == NULL)
- break;
-
- cfs_timer_disarm(&svcpt->scp_at_timer);
- } while (0);
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service != NULL)
+ cfs_timer_disarm(&svcpt->scp_at_timer);
+ }
}
static void
struct ptlrpc_request_buffer_desc *rqbd;
struct l_wait_info lwi;
int rc;
+ int i;
- /* All history will be culled when the next request buffer is
+ /* All history will be culled when the next request buffer is
* freed in ptlrpc_service_purge_all() */
- svc->srv_max_history_rqbds = 0;
+ svc->srv_hist_nrqbds_cpt_max = 0;
rc = LNetClearLazyPortal(svc->srv_req_portal);
LASSERT(rc == 0);
- do { /* iterrate over multiple partitions in the future */
- svcpt = svc->srv_part;
- if (svcpt == NULL || svcpt->scp_service == NULL)
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service == NULL)
break;
/* Unlink all the request buffers. This forces a 'final'
rc = LNetMDUnlink(rqbd->rqbd_md_h);
LASSERT(rc == 0 || rc == -ENOENT);
}
- } while (0);
+ }
- do { /* iterrate over multiple partitions in the future */
- svcpt = svc->srv_part;
- if (svcpt == NULL || svcpt->scp_service == NULL)
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service == NULL)
break;
/* Wait for the network to release any buffers
cfs_spin_lock(&svcpt->scp_lock);
}
cfs_spin_unlock(&svcpt->scp_lock);
- } while (0);
+ }
}
static void
struct ptlrpc_request_buffer_desc *rqbd;
struct ptlrpc_request *req;
struct ptlrpc_reply_state *rs;
+ int i;
- do { /* iterrate over multiple partitions in the future */
- /* schedule all outstanding replies to terminate them */
- svcpt = svc->srv_part;
- if (svcpt == NULL || svcpt->scp_service == NULL)
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service == NULL)
break;
cfs_spin_lock(&svcpt->scp_rep_lock);
cfs_list_del(&rs->rs_list);
OBD_FREE_LARGE(rs, svc->srv_max_reply_size);
}
- } while (0);
+ }
}
static void
{
struct ptlrpc_service_part *svcpt;
struct ptlrpc_at_array *array;
+ int i;
- do { /* iterrate over multiple partitions in the future */
- svcpt = svc->srv_part;
- if (svcpt == NULL || svcpt->scp_service == NULL)
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ if (svcpt->scp_service == NULL)
break;
/* In case somebody rearmed this in the meantime */
sizeof(__u32) * array->paa_size);
array->paa_reqs_count = NULL;
}
- svcpt->scp_service = NULL;
- } while (0);
+ }
+
+ ptlrpc_service_for_each_part(svcpt, i, svc)
+ OBD_FREE_PTR(svcpt);
- do { /* iterrate over multiple partitions in the future */
- svcpt = svc->srv_part;
- if (svcpt != NULL)
- OBD_FREE_PTR(svcpt);
- } while (0);
+ if (svc->srv_cpts != NULL)
+ cfs_expr_list_values_free(svc->srv_cpts, svc->srv_ncpts);
- OBD_FREE_PTR(svc);
+ OBD_FREE(svc, offsetof(struct ptlrpc_service,
+ srv_parts[svc->srv_ncpts]));
}
int ptlrpc_unregister_service(struct ptlrpc_service *service)
* Right now, it just checks to make sure that requests aren't languishing
* in the queue. We'll use this health check to govern whether a node needs
* to be shot, so it's intentionally non-aggressive. */
-int ptlrpc_service_health_check(struct ptlrpc_service *svc)
+int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
{
- struct ptlrpc_service_part *svcpt;
struct ptlrpc_request *request;
struct timeval right_now;
long timediff;
- if (svc == NULL || svc->srv_part == NULL)
- return 0;
-
cfs_gettimeofday(&right_now);
- svcpt = svc->srv_part;
cfs_spin_lock(&svcpt->scp_req_lock);
if (!ptlrpc_server_request_pending(svcpt, 1)) {
cfs_spin_unlock(&svcpt->scp_req_lock);
return 0;
}
+
+int
+ptlrpc_service_health_check(struct ptlrpc_service *svc)
+{
+ struct ptlrpc_service_part *svcpt;
+ int i;
+
+ if (svc == NULL || svc->srv_parts == NULL)
+ return 0;
+
+ ptlrpc_service_for_each_part(svcpt, i, svc) {
+ int rc = ptlrpc_svcpt_health_check(svcpt);
+
+ if (rc != 0)
+ return rc;
+ }
+ return 0;
+}