From: Liang Zhen Date: Sun, 17 Jun 2012 02:57:03 +0000 (+0800) Subject: LU-56 ptlrpc: partitioned ptlrpc service X-Git-Tag: 2.2.60~30 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=d800fc41a1abdaf7aaf6c0e3e7ddcdec489985a8 LU-56 ptlrpc: partitioned ptlrpc service Current ptlrpc service only have a per-service instance, all service threads share locks and request queue of this instance, this causes many perforamnce issues like heavy lock contention and data/threads migration between CPUs/NUMA nodes. This patch created per-partition data for ptlrpc service, each service have locks/request-queues on each partition, also, service will have CPT (CPU partition) affinity threads on each partition, threads bond on a CPT will only access data on local partition, this feature should decrease lock contention and data/thread migration and improve server side performance. Another change is we use cfs_hash to replace big array fo_iobuf_pool in obdfilter, filter_iobuf can be found by from the hash by thread ID. The reason we made this change is because we removed absolute limit of ptlrpc threads number, which means we have no idea how big the fo_iobuf_pool should be. Also, even we have obsolute limit of threads number, it's still dangerous to use a pre-allocated array because it's difficult to guarantee thread ID to be contiguous if we want to shrink threads in the future. Signed-off-by: Liang Zhen Change-Id: I5f8dce7bcf389f9f076f5ce2d4685a03f910260b Reviewed-on: http://review.whamcloud.com/3133 Tested-by: Hudson Tested-by: Maloo Reviewed-by: wangdi Reviewed-by: Andreas Dilger --- diff --git a/libcfs/include/libcfs/libcfs_string.h b/libcfs/include/libcfs/libcfs_string.h index 76b7bec..e4b0c7d9 100644 --- a/libcfs/include/libcfs/libcfs_string.h +++ b/libcfs/include/libcfs/libcfs_string.h @@ -114,6 +114,15 @@ int cfs_range_expr_parse(struct cfs_lstr *src, unsigned min, unsigned max, int cfs_expr_list_match(__u32 value, struct cfs_expr_list *expr_list); int cfs_expr_list_values(struct cfs_expr_list *expr_list, int max, __u32 **values); +static inline void +cfs_expr_list_values_free(__u32 *values, int num) +{ + /* This array is allocated by LIBCFS_ALLOC(), so it shouldn't be freed + * by OBD_FREE() if it's called by module other than libcfs & LNet, + * otherwise we will see fake memory leak */ + LIBCFS_FREE(values, num * sizeof(values[0])); +} + void cfs_expr_list_free(struct cfs_expr_list *expr_list); void cfs_expr_list_print(struct cfs_expr_list *expr_list); int cfs_expr_list_parse(char *str, int len, unsigned min, unsigned max, diff --git a/lnet/lnet/config.c b/lnet/lnet/config.c index 0c9eb64..c3dd0a7 100644 --- a/lnet/lnet/config.c +++ b/lnet/lnet/config.c @@ -100,10 +100,8 @@ lnet_ni_free(struct lnet_ni *ni) if (ni->ni_tx_queues != NULL) cfs_percpt_free(ni->ni_tx_queues); - if (ni->ni_cpts != NULL) { - LIBCFS_FREE(ni->ni_cpts, - sizeof(ni->ni_cpts[0] * ni->ni_ncpts)); - } + if (ni->ni_cpts != NULL) + cfs_expr_list_values_free(ni->ni_cpts, ni->ni_ncpts); #ifndef __KERNEL__ # ifdef HAVE_LIBPTHREAD diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 039effa..85ea98d 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -106,11 +106,12 @@ # endif #endif /* __KERNEL__ */ -#define PTLRPC_NTHRS_MIN 2 +#define PTLRPC_NTHRS_INIT 2 /** - * The following constants determine how memory is used to buffer incoming - * service requests. + * Buffer Constants + * + * Constants determine how memory is used to buffer incoming service requests. * * ?_NBUFS # buffers to allocate when growing the pool * ?_BUFSIZE # bytes in a single request buffer @@ -122,21 +123,170 @@ * Messages larger than ?_MAXREQSIZE are dropped. Request buffers are * considered full when less than ?_MAXREQSIZE is left in them. */ -#define LDLM_THREADS_AUTO_MIN (2) -#define LDLM_THREADS_AUTO_MAX min_t(unsigned, cfs_num_online_cpus() * \ - cfs_num_online_cpus() * 32, 128) -#define LDLM_BL_THREADS LDLM_THREADS_AUTO_MIN +/** + * Thread Constants + * + * Constants determine how threads are created for ptlrpc service. + * + * ?_NTHRS_INIT # threads to create for each service partition on + * initializing. If it's non-affinity service and + * there is only one partition, it's the overall # + * threads for the service while initializing. + * ?_NTHRS_BASE # threads should be created at least for each + * ptlrpc partition to keep the service healthy. + * It's the low-water mark of threads upper-limit + * for each partition. + * ?_THR_FACTOR # threads can be added on threads upper-limit for + * each CPU core. This factor is only for reference, + * we might decrease value of factor if number of cores + * per CPT is above a limit. + * ?_NTHRS_MAX # overall threads can be created for a service, + * it's a soft limit because if service is running + * on machine with hundreds of cores and tens of + * CPU partitions, we need to guarantee each partition + * has ?_NTHRS_BASE threads, which means total threads + * will be ?_NTHRS_BASE * number_of_cpts which can + * exceed ?_NTHRS_MAX. + * + * Examples + * + * #define MDT_NTHRS_INIT 2 + * #define MDT_NTHRS_BASE 64 + * #define MDT_NTHRS_FACTOR 8 + * #define MDT_NTHRS_MAX 1024 + * + * Example 1): + * --------------------------------------------------------------------- + * Server(A) has 16 cores, user configured it to 4 partitions so each + * partition has 4 cores, then actual number of service threads on each + * partition is: + * MDT_NTHRS_BASE(64) + cores(4) * MDT_NTHRS_FACTOR(8) = 96 + * + * Total number of threads for the service is: + * 96 * partitions(4) = 384 + * + * Example 2): + * --------------------------------------------------------------------- + * Server(B) has 32 cores, user configured it to 4 partitions so each + * partition has 8 cores, then actual number of service threads on each + * partition is: + * MDT_NTHRS_BASE(64) + cores(8) * MDT_NTHRS_FACTOR(8) = 128 + * + * Total number of threads for the service is: + * 128 * partitions(4) = 512 + * + * Example 3): + * --------------------------------------------------------------------- + * Server(B) has 96 cores, user configured it to 8 partitions so each + * partition has 12 cores, then actual number of service threads on each + * partition is: + * MDT_NTHRS_BASE(64) + cores(12) * MDT_NTHRS_FACTOR(8) = 160 + * + * Total number of threads for the service is: + * 160 * partitions(8) = 1280 + * + * However, it's above the soft limit MDT_NTHRS_MAX, so we choose this number + * as upper limit of threads number for each partition: + * MDT_NTHRS_MAX(1024) / partitions(8) = 128 + * + * Example 4): + * --------------------------------------------------------------------- + * Server(C) have a thousand of cores and user configured it to 32 partitions + * MDT_NTHRS_BASE(64) * 32 = 2048 + * + * which is already above soft limit MDT_NTHRS_MAX(1024), but we still need + * to guarantee that each partition has at least MDT_NTHRS_BASE(64) threads + * to keep service healthy, so total number of threads will just be 2048. + * + * NB: we don't suggest to choose server with that many cores because backend + * filesystem itself, buffer cache, or underlying network stack might + * have some SMP scalability issues at that large scale. + * + * If user already has a fat machine with hundreds or thousands of cores, + * there are two choices for configuration: + * a) create CPU table from subset of all CPUs and run Lustre on + * top of this subset + * b) bind service threads on a few partitions, see modparameters of + * MDS and OSS for details +* + * NB: these calculations (and examples below) are simplified to help + * understanding, the real implementation is a little more complex, + * please see ptlrpc_server_nthreads_check() for details. + * + */ + + /* + * LDLM threads constants: + * + * Given 8 as factor and 24 as base threads number + * + * example 1) + * On 4-core machine we will have 24 + 8 * 4 = 56 threads. + * + * example 2) + * On 8-core machine with 2 partitions we will have 24 + 4 * 8 = 56 + * threads for each partition and total threads number will be 112. + * + * example 3) + * On 64-core machine with 8 partitions we will need LDLM_NTHRS_BASE(24) + * threads for each partition to keep service healthy, so total threads + * number should be 24 * 8 = 192. + * + * So with these constants, threads number wil be at the similar level + * of old versions, unless target machine has over a hundred cores + */ +#define LDLM_THR_FACTOR 8 +#define LDLM_NTHRS_INIT PTLRPC_NTHRS_INIT +#define LDLM_NTHRS_BASE 24 +#define LDLM_NTHRS_MAX (cfs_num_online_cpus() == 1 ? 64 : 128) + +#define LDLM_BL_THREADS LDLM_NTHRS_AUTO_INIT #define LDLM_NBUFS (64 * cfs_num_online_cpus()) #define LDLM_BUFSIZE (8 * 1024) #define LDLM_MAXREQSIZE (5 * 1024) #define LDLM_MAXREPSIZE (1024) -/** Absolute limits */ + /* + * MDS threads constants: + * + * Please see examples in "Thread Constants", MDS threads number will be at + * the comparable level of old versions, unless the server has many cores. + */ #ifndef MDT_MAX_THREADS -#define MDT_MIN_THREADS PTLRPC_NTHRS_MIN -#define MDT_MAX_THREADS 512UL +#define MDT_MAX_THREADS 1024 +#define MDT_MAX_OTHR_THREADS 256 + +#else /* MDT_MAX_THREADS */ +#if MDT_MAX_THREADS < PTLRPC_NTHRS_INIT +#undef MDT_MAX_THREADS +#define MDT_MAX_THREADS PTLRPC_NTHRS_INIT +#endif +#define MDT_MAX_OTHR_THREADS max(PTLRPC_NTHRS_INIT, MDT_MAX_THREADS / 2) #endif -#define MDS_NBUFS (64 * cfs_num_online_cpus()) + +/* default service */ +#define MDT_THR_FACTOR 8 +#define MDT_NTHRS_INIT PTLRPC_NTHRS_INIT +#define MDT_NTHRS_MAX MDT_MAX_THREADS +#define MDT_NTHRS_BASE min(64, MDT_NTHRS_MAX) + +/* read-page service */ +#define MDT_RDPG_THR_FACTOR 4 +#define MDT_RDPG_NTHRS_INIT PTLRPC_NTHRS_INIT +#define MDT_RDPG_NTHRS_MAX MDT_MAX_OTHR_THREADS +#define MDT_RDPG_NTHRS_BASE min(48, MDT_RDPG_NTHRS_MAX) + +/* these should be removed when we remove setattr service in the future */ +#define MDT_SETA_THR_FACTOR 4 +#define MDT_SETA_NTHRS_INIT PTLRPC_NTHRS_INIT +#define MDT_SETA_NTHRS_MAX MDT_MAX_OTHR_THREADS +#define MDT_SETA_NTHRS_BASE min(48, MDT_SETA_NTHRS_MAX) + +/* non-affinity threads */ +#define MDT_OTHR_NTHRS_INIT PTLRPC_NTHRS_INIT +#define MDT_OTHR_NTHRS_MAX MDT_MAX_OTHR_THREADS + +#define MDS_NBUFS (64 * cfs_num_online_cpus()) /** * Assume file name length = FNAME_MAX = 256 (true for ext3). * path name length = PATH_MAX = 4096 @@ -176,16 +326,51 @@ #define SEQ_MAXREPSIZE (152) /** MGS threads must be >= 3, see bug 22458 comment #28 */ -#define MGS_THREADS_AUTO_MIN 3 -#define MGS_THREADS_AUTO_MAX 32 +#define MGS_NTHRS_INIT (PTLRPC_NTHRS_INIT + 1) +#define MGS_NTHRS_MAX 32 + #define MGS_NBUFS (64 * cfs_num_online_cpus()) #define MGS_BUFSIZE (8 * 1024) #define MGS_MAXREQSIZE (7 * 1024) #define MGS_MAXREPSIZE (9 * 1024) -/** Absolute OSS limits */ -#define OSS_THREADS_MIN 3 /* difficult replies, HPQ, others */ -#define OSS_THREADS_MAX 512 + /* + * OSS threads constants: + * + * Given 8 as factor and 64 as base threads number + * + * example 1): + * On 8-core server configured to 2 partitions, we will have + * 64 + 8 * 4 = 96 threads for each partition, 192 total threads. + * + * example 2): + * On 32-core machine configured to 4 partitions, we will have + * 64 + 8 * 8 = 112 threads for each partition, so total threads number + * will be 112 * 4 = 448. + * + * example 3): + * On 64-core machine configured to 4 partitions, we will have + * 64 + 16 * 8 = 192 threads for each partition, so total threads number + * will be 192 * 4 = 768 which is above limit OSS_NTHRS_MAX(512), so we + * cut off the value to OSS_NTHRS_MAX(512) / 4 which is 128 threads + * for each partition. + * + * So we can see that with these constants, threads number wil be at the + * similar level of old versions, unless the server has many cores. + */ + /* depress threads factor for VM with small memory size */ +#define OSS_THR_FACTOR min_t(int, 8, \ + CFS_NUM_CACHEPAGES >> (28 - CFS_PAGE_SHIFT)) +#define OSS_NTHRS_INIT (PTLRPC_NTHRS_INIT + 1) +#define OSS_NTHRS_BASE 64 +#define OSS_NTHRS_MAX 512 + +/* threads for handling "create" request */ +#define OSS_CR_THR_FACTOR 1 +#define OSS_CR_NTHRS_INIT PTLRPC_NTHRS_INIT +#define OSS_CR_NTHRS_BASE 8 +#define OSS_CR_NTHRS_MAX 64 + #define OST_NBUFS (64 * cfs_num_online_cpus()) #define OST_BUFSIZE (8 * 1024) @@ -1148,10 +1333,10 @@ struct ptlrpc_service { char *srv_thread_name; /** service thread list */ cfs_list_t srv_threads; - /** threads to start at beginning of service */ - int srv_threads_min; - /** thread upper limit */ - int srv_threads_max; + /** threads # should be created for each partition on initializing */ + int srv_nthrs_cpt_init; + /** limit of threads number for each partition */ + int srv_nthrs_cpt_limit; /** Root of /proc dir tree for this service */ cfs_proc_dir_entry_t *srv_procroot; /** Pointer to statistic data for this service */ @@ -1177,21 +1362,23 @@ struct ptlrpc_service { __u32 srv_ctx_tags; /** soft watchdog timeout multiplier */ int srv_watchdog_factor; - /** bind threads to CPUs */ - unsigned srv_cpu_affinity:1; /** under unregister_service */ unsigned srv_is_stopping:1; + /** max # request buffers in history per partition */ + int srv_hist_nrqbds_cpt_max; + /** number of CPTs this service bound on */ + int srv_ncpts; + /** CPTs array this service bound on */ + __u32 *srv_cpts; + /** 2^srv_cptab_bits >= cfs_cpt_numbert(srv_cptable) */ + int srv_cpt_bits; + /** CPT table this service is running over */ + struct cfs_cpt_table *srv_cptable; /** - * max # request buffers in history, it needs to be convert into - * per-partition value when we have multiple partitions - */ - int srv_max_history_rqbds; - /** - * partition data for ptlrpc service, only one instance so far, - * instance per CPT will come soon + * partition data for ptlrpc service */ - struct ptlrpc_service_part *srv_part; + struct ptlrpc_service_part *srv_parts[0]; }; /** @@ -1320,6 +1507,12 @@ struct ptlrpc_service_part { cfs_atomic_t scp_nreps_difficult; }; +#define ptlrpc_service_for_each_part(part, i, svc) \ + for (i = 0; \ + i < (svc)->srv_ncpts && \ + (svc)->srv_parts != NULL && \ + ((part) = (svc)->srv_parts[i]) != NULL; i++) + /** * Declaration of ptlrpcd control structure */ @@ -1611,9 +1804,18 @@ struct ptlrpc_service_buf_conf { struct ptlrpc_service_thr_conf { /* threadname should be 8 characters or less - 6 will be added on */ char *tc_thr_name; - /* min number of service threads to start */ - unsigned int tc_nthrs_min; - /* max number of service threads to start */ + /* threads increasing factor for each CPU */ + unsigned int tc_thr_factor; + /* service threads # to start on each partition while initializing */ + unsigned int tc_nthrs_init; + /* + * low water of threads # upper-limit on each partition while running, + * service availability may be impacted if threads number is lower + * than this value. It can be ZERO if the service doesn't require + * CPU affinity or there is only one partition. + */ + unsigned int tc_nthrs_base; + /* "soft" limit for total threads number */ unsigned int tc_nthrs_max; /* user specified threads number, it will be validated due to * other members of this structure. */ @@ -1624,6 +1826,12 @@ struct ptlrpc_service_thr_conf { __u32 tc_ctx_tags; }; +struct ptlrpc_service_cpt_conf { + struct cfs_cpt_table *cc_cptable; + /* string pattern to describe CPTs for a service */ + char *cc_pattern; +}; + struct ptlrpc_service_conf { /* service name */ char *psc_name; @@ -1633,6 +1841,8 @@ struct ptlrpc_service_conf { struct ptlrpc_service_buf_conf psc_buf; /* thread information */ struct ptlrpc_service_thr_conf psc_thr; + /* CPU partition information */ + struct ptlrpc_service_cpt_conf psc_cpt; /* function table */ struct ptlrpc_service_ops psc_ops; }; diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 2780fcf..4272774 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -361,20 +361,18 @@ struct filter_obd { cfs_atomic_t fo_r_in_flight; cfs_atomic_t fo_w_in_flight; - /* - * per-filter pool of kiobuf's allocated by filter_common_setup() and - * torn down by filter_cleanup(). Contains OST_NUM_THREADS elements of - * which ->fo_iobuf_count were allocated. - * - * This pool contains kiobuf used by - * filter_{prep,commit}rw_{read,write}() and is shared by all OST - * threads. - * - * Locking: none, each OST thread uses only one element, determined by - * its "ordinal number", ->t_id. - */ - struct filter_iobuf **fo_iobuf_pool; - int fo_iobuf_count; + /* + * per-filter pool of kiobuf's allocated by filter_common_setup() and + * torn down by filter_cleanup(). + * + * This pool contains kiobuf used by + * filter_{prep,commit}rw_{read,write}() and is shared by all OST + * threads. + * + * Locking: protected by internal lock of cfs_hash, pool can be + * found from this hash table by t_id of ptlrpc_thread. + */ + struct cfs_hash *fo_iobuf_hash; cfs_list_t fo_llog_list; cfs_spinlock_t fo_llog_list_lock; diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index b4198b4..7a601e8 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -56,6 +56,10 @@ static int ldlm_num_threads; CFS_MODULE_PARM(ldlm_num_threads, "i", int, 0444, "number of DLM service threads to start"); +static char *ldlm_cpts; +CFS_MODULE_PARM(ldlm_cpts, "s", charp, 0444, + "CPU partitions ldlm threads should run on"); + extern cfs_mem_cache_t *ldlm_resource_slab; extern cfs_mem_cache_t *ldlm_lock_slab; static cfs_mutex_t ldlm_ref_mutex; @@ -2588,11 +2592,16 @@ static int ldlm_setup(void) }, .psc_thr = { .tc_thr_name = "ldlm_cb", - .tc_nthrs_min = LDLM_THREADS_AUTO_MIN, - .tc_nthrs_max = LDLM_THREADS_AUTO_MAX, + .tc_thr_factor = LDLM_THR_FACTOR, + .tc_nthrs_init = LDLM_NTHRS_INIT, + .tc_nthrs_base = LDLM_NTHRS_BASE, + .tc_nthrs_max = LDLM_NTHRS_MAX, .tc_nthrs_user = ldlm_num_threads, - .tc_ctx_tags = LCT_MD_THREAD | \ - LCT_DT_THREAD, + .tc_cpu_affinity = 1, + .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD, + }, + .psc_cpt = { + .cc_pattern = ldlm_cpts, }, .psc_ops = { .so_req_handler = ldlm_callback_handler, @@ -2623,13 +2632,19 @@ static int ldlm_setup(void) }, .psc_thr = { .tc_thr_name = "ldlm_cn", - .tc_nthrs_min = LDLM_THREADS_AUTO_MIN, - .tc_nthrs_max = LDLM_THREADS_AUTO_MAX, + .tc_thr_factor = LDLM_THR_FACTOR, + .tc_nthrs_init = LDLM_NTHRS_INIT, + .tc_nthrs_base = LDLM_NTHRS_BASE, + .tc_nthrs_max = LDLM_NTHRS_MAX, .tc_nthrs_user = ldlm_num_threads, + .tc_cpu_affinity = 1, .tc_ctx_tags = LCT_MD_THREAD | \ LCT_DT_THREAD | \ LCT_CL_THREAD, }, + .psc_cpt = { + .cc_pattern = ldlm_cpts, + }, .psc_ops = { .so_req_handler = ldlm_cancel_handler, .so_hpreq_handler = ldlm_hpreq_handler, @@ -2659,20 +2674,19 @@ static int ldlm_setup(void) #ifdef __KERNEL__ if (ldlm_num_threads == 0) { - blp->blp_min_threads = LDLM_THREADS_AUTO_MIN; - blp->blp_max_threads = LDLM_THREADS_AUTO_MAX; + blp->blp_min_threads = LDLM_NTHRS_INIT; + blp->blp_max_threads = LDLM_NTHRS_MAX; } else { blp->blp_min_threads = blp->blp_max_threads = \ - min_t(int, LDLM_THREADS_AUTO_MAX, - max_t(int, LDLM_THREADS_AUTO_MIN, - ldlm_num_threads)); + min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT, + ldlm_num_threads)); } - for (i = 0; i < blp->blp_min_threads; i++) { - rc = ldlm_bl_thread_start(blp); - if (rc < 0) + for (i = 0; i < blp->blp_min_threads; i++) { + rc = ldlm_bl_thread_start(blp); + if (rc < 0) GOTO(out, rc); - } + } # ifdef HAVE_SERVER_SUPPORT CFS_INIT_LIST_HEAD(&expired_lock_thread.elt_expired_locks); @@ -2684,24 +2698,26 @@ static int ldlm_setup(void) cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0); rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS); - if (rc < 0) { - CERROR("Cannot start ldlm expired-lock thread: %d\n", rc); + if (rc < 0) { + CERROR("Cannot start ldlm expired-lock thread: %d\n", rc); GOTO(out, rc); - } + } cfs_wait_event(expired_lock_thread.elt_waitq, expired_lock_thread.elt_state == ELT_READY); # endif /* HAVE_SERVER_SUPPORT */ - rc = ldlm_pools_init(); - if (rc) + rc = ldlm_pools_init(); + if (rc) { + CERROR("Failed to initialize LDLM pools: %d\n", rc); GOTO(out, rc); + } #endif - RETURN(0); + RETURN(0); out: ldlm_cleanup(); - return rc; + RETURN(rc); } static int ldlm_cleanup(void) diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 97d3bed..f25a5e1 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -97,6 +97,29 @@ ldlm_mode_t mdt_dlm_lock_modes[] = { * Initialized in mdt_mod_init(). */ static unsigned long mdt_num_threads; +CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444, + "number of mdt service threads to start"); + +static char *mdt_cpts; +CFS_MODULE_PARM(mdt_cpts, "c", charp, 0444, + "CPU partitions MDT threads should run on"); + +static unsigned long mdt_rdpg_num_threads; +CFS_MODULE_PARM(mdt_rdpg_num_threads, "ul", ulong, 0444, + "number of mdt readpage service threads to start"); + +static char *mdt_rdpg_cpts; +CFS_MODULE_PARM(mdt_rdpg_cpts, "c", charp, 0444, + "CPU partitions MDT readpage threads should run on"); + +/* NB: these two should be removed along with setattr service in the future */ +static unsigned long mdt_attr_num_threads; +CFS_MODULE_PARM(mdt_attr_num_threads, "ul", ulong, 0444, + "number of mdt setattr service threads to start"); + +static char *mdt_attr_cpts; +CFS_MODULE_PARM(mdt_attr_cpts, "c", charp, 0444, + "CPU partitions MDT setattr threads should run on"); /* ptlrpc request handler for MDT. All handlers are * grouped into several slices - struct mdt_opc_slice, @@ -3954,11 +3977,17 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m) */ .psc_thr = { .tc_thr_name = LUSTRE_MDT_NAME, - .tc_nthrs_min = MDT_MIN_THREADS, - .tc_nthrs_max = MDT_MAX_THREADS, + .tc_thr_factor = MDT_THR_FACTOR, + .tc_nthrs_init = MDT_NTHRS_INIT, + .tc_nthrs_base = MDT_NTHRS_BASE, + .tc_nthrs_max = MDT_NTHRS_MAX, .tc_nthrs_user = mdt_num_threads, + .tc_cpu_affinity = 1, .tc_ctx_tags = LCT_MD_THREAD, }, + .psc_cpt = { + .cc_pattern = mdt_cpts, + }, .psc_ops = { .so_req_handler = mdt_regular_handle, .so_req_printer = target_print_req, @@ -3991,11 +4020,17 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m) }, .psc_thr = { .tc_thr_name = "mdt_rdpg", - .tc_nthrs_min = MDT_MIN_THREADS, - .tc_nthrs_max = MDT_MAX_THREADS, - .tc_nthrs_user = mdt_num_threads, + .tc_thr_factor = MDT_RDPG_THR_FACTOR, + .tc_nthrs_init = MDT_RDPG_NTHRS_INIT, + .tc_nthrs_base = MDT_RDPG_NTHRS_BASE, + .tc_nthrs_max = MDT_RDPG_NTHRS_MAX, + .tc_nthrs_user = mdt_rdpg_num_threads, + .tc_cpu_affinity = 1, .tc_ctx_tags = LCT_MD_THREAD, }, + .psc_cpt = { + .cc_pattern = mdt_rdpg_cpts, + }, .psc_ops = { .so_req_handler = mdt_readpage_handle, .so_req_printer = target_print_req, @@ -4031,11 +4066,17 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m) }, .psc_thr = { .tc_thr_name = "mdt_attr", - .tc_nthrs_min = MDT_MIN_THREADS, - .tc_nthrs_max = MDT_MAX_THREADS, - .tc_nthrs_user = mdt_num_threads, + .tc_thr_factor = MDT_SETA_THR_FACTOR, + .tc_nthrs_init = MDT_SETA_NTHRS_INIT, + .tc_nthrs_base = MDT_SETA_NTHRS_BASE, + .tc_nthrs_max = MDT_SETA_NTHRS_MAX, + .tc_nthrs_user = mdt_attr_num_threads, + .tc_cpu_affinity = 1, .tc_ctx_tags = LCT_MD_THREAD, }, + .psc_cpt = { + .cc_pattern = mdt_attr_cpts, + }, .psc_ops = { .so_req_handler = mdt_regular_handle, .so_req_printer = target_print_req, @@ -4067,9 +4108,8 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m) }, .psc_thr = { .tc_thr_name = "mdt_mdsc", - .tc_nthrs_min = MDT_MIN_THREADS, - .tc_nthrs_max = MDT_MAX_THREADS, - .tc_nthrs_user = mdt_num_threads, + .tc_nthrs_init = MDT_OTHR_NTHRS_INIT, + .tc_nthrs_max = MDT_OTHR_NTHRS_MAX, .tc_ctx_tags = LCT_MD_THREAD, }, .psc_ops = { @@ -4103,9 +4143,8 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m) }, .psc_thr = { .tc_thr_name = "mdt_mdss", - .tc_nthrs_min = MDT_MIN_THREADS, - .tc_nthrs_max = MDT_MAX_THREADS, - .tc_nthrs_user = mdt_num_threads, + .tc_nthrs_init = MDT_OTHR_NTHRS_INIT, + .tc_nthrs_max = MDT_OTHR_NTHRS_MAX, .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD }, .psc_ops = { @@ -4141,9 +4180,8 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m) }, .psc_thr = { .tc_thr_name = "mdt_dtss", - .tc_nthrs_min = MDT_MIN_THREADS, - .tc_nthrs_max = MDT_MAX_THREADS, - .tc_nthrs_user = mdt_num_threads, + .tc_nthrs_init = MDT_OTHR_NTHRS_INIT, + .tc_nthrs_max = MDT_OTHR_NTHRS_MAX, .tc_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD }, .psc_ops = { @@ -4175,9 +4213,8 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m) }, .psc_thr = { .tc_thr_name = "mdt_fld", - .tc_nthrs_min = MDT_MIN_THREADS, - .tc_nthrs_max = MDT_MAX_THREADS, - .tc_nthrs_user = mdt_num_threads, + .tc_nthrs_init = MDT_OTHR_NTHRS_INIT, + .tc_nthrs_max = MDT_OTHR_NTHRS_MAX, .tc_ctx_tags = LCT_DT_THREAD | LCT_MD_THREAD }, .psc_ops = { @@ -4212,9 +4249,8 @@ static int mdt_start_ptlrpc_service(struct mdt_device *m) }, .psc_thr = { .tc_thr_name = "mdt_mds", - .tc_nthrs_min = MDT_MIN_THREADS, - .tc_nthrs_max = MDT_MAX_THREADS, - .tc_nthrs_user = mdt_num_threads, + .tc_nthrs_init = MDT_OTHR_NTHRS_INIT, + .tc_nthrs_max = MDT_OTHR_NTHRS_MAX, .tc_ctx_tags = LCT_MD_THREAD, }, .psc_ops = { @@ -5380,7 +5416,7 @@ static int mdt_init_export(struct obd_export *exp) err_free: lut_client_free(exp); err: - CERROR("%s: Error %d while initializing export\n", + CERROR("%s: Failed to initialize export: rc = %d\n", exp->exp_obd->obd_name, rc); return rc; } @@ -6180,7 +6216,4 @@ MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")"); MODULE_LICENSE("GPL"); -CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444, - "number of mdt service threads to start"); - cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit); diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index 4e7068b..22a0b6f 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -251,8 +251,8 @@ static int mgs_setup(struct obd_device *obd, struct lustre_cfg *lcfg) }, .psc_thr = { .tc_thr_name = "ll_mgs", - .tc_nthrs_min = MGS_THREADS_AUTO_MIN, - .tc_nthrs_max = MGS_THREADS_AUTO_MAX, + .tc_nthrs_init = MGS_NTHRS_INIT, + .tc_nthrs_max = MGS_NTHRS_MAX, .tc_ctx_tags = LCT_MD_THREAD, }, .psc_ops = { diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 70e8d52..f0bba1e 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1848,15 +1848,16 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * at the OST layer there are only (potentially) multiple obd_device of type * unknown at the time of OST thread creation. * - * Instead array of iobuf's is attached to struct filter_obd (->fo_iobuf_pool - * field). This array has size OST_MAX_THREADS, so that each OST thread uses - * it's very own iobuf. + * We create a cfs_hash for struct filter_obd (->fo_iobuf_hash field) on + * initializing, each OST thread will create it's own iobuf on the first + * access and insert it into ->fo_iobuf_hash with thread ID as key, + * so the iobuf can be found again by thread ID. * * Functions below * - * filter_kiobuf_pool_init() + * filter_iobuf_pool_init() * - * filter_kiobuf_pool_done() + * filter_iobuf_pool_done() * * filter_iobuf_get() * @@ -1869,21 +1870,13 @@ static int filter_intent_policy(struct ldlm_namespace *ns, */ static void filter_iobuf_pool_done(struct filter_obd *filter) { - struct filter_iobuf **pool; - int i; - - ENTRY; + ENTRY; - pool = filter->fo_iobuf_pool; - if (pool != NULL) { - for (i = 0; i < filter->fo_iobuf_count; ++ i) { - if (pool[i] != NULL) - filter_free_iobuf(pool[i]); - } - OBD_FREE(pool, filter->fo_iobuf_count * sizeof pool[0]); - filter->fo_iobuf_pool = NULL; - } - EXIT; + if (filter->fo_iobuf_hash != NULL) { + cfs_hash_putref(filter->fo_iobuf_hash); + filter->fo_iobuf_hash = NULL; + } + EXIT; } static int filter_adapt_sptlrpc_conf(struct obd_device *obd, int initial) @@ -1910,50 +1903,126 @@ static int filter_adapt_sptlrpc_conf(struct obd_device *obd, int initial) return 0; } -/* - * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write(). - */ -static int filter_iobuf_pool_init(struct filter_obd *filter) +static unsigned +filter_iobuf_hop_hash(cfs_hash_t *hs, const void *key, unsigned mask) { - void **pool; + __u64 val = *((__u64 *)key); - ENTRY; + return cfs_hash_long(val, hs->hs_cur_bits); +} +static void * +filter_iobuf_hop_key(cfs_hlist_node_t *hnode) +{ + struct filter_iobuf *pool; - OBD_ALLOC_GFP(filter->fo_iobuf_pool, OSS_THREADS_MAX * sizeof(*pool), - CFS_ALLOC_KERNEL); - if (filter->fo_iobuf_pool == NULL) - RETURN(-ENOMEM); + pool = cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist); + return &pool->dr_hkey; +} - filter->fo_iobuf_count = OSS_THREADS_MAX; +static int +filter_iobuf_hop_keycmp(const void *key, cfs_hlist_node_t *hnode) +{ + struct filter_iobuf *pool; - RETURN(0); + pool = cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist); + return pool->dr_hkey == *((__u64 *)key); } -/* Return iobuf allocated for @thread_id. We don't know in advance how - * many threads there will be so we allocate a large empty array and only - * fill in those slots that are actually in use. - * If we haven't allocated a pool entry for this thread before, do so now. */ -void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti) +static void * +filter_iobuf_hop_object(cfs_hlist_node_t *hnode) { - int thread_id = (oti && oti->oti_thread) ? - oti->oti_thread->t_id : -1; - struct filter_iobuf *pool = NULL; - struct filter_iobuf **pool_place = NULL; + return cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist); +} - if (thread_id >= 0) { - LASSERT(thread_id < filter->fo_iobuf_count); - pool = *(pool_place = &filter->fo_iobuf_pool[thread_id]); - } +static void +filter_iobuf_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + /* dummy, required by cfs_hash */ +} - if (unlikely(pool == NULL)) { - pool = filter_alloc_iobuf(filter, OBD_BRW_WRITE, - PTLRPC_MAX_BRW_PAGES); - if (pool_place != NULL) - *pool_place = pool; - } +static void +filter_iobuf_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + /* dummy, required by cfs_hash */ +} + +static void +filter_iobuf_hop_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode) +{ + struct filter_iobuf *pool; + + pool = cfs_hlist_entry(hnode, struct filter_iobuf, dr_hlist); + filter_free_iobuf(pool); +} + +static struct cfs_hash_ops filter_iobuf_hops = { + .hs_hash = filter_iobuf_hop_hash, + .hs_key = filter_iobuf_hop_key, + .hs_keycmp = filter_iobuf_hop_keycmp, + .hs_object = filter_iobuf_hop_object, + .hs_get = filter_iobuf_hop_get, + .hs_put_locked = filter_iobuf_hop_put_locked, + .hs_exit = filter_iobuf_hop_exit +}; - return pool; +#define FILTER_IOBUF_HASH_BITS 9 +#define FILTER_IOBUF_HBKT_BITS 4 + +/* + * pre-allocate pool of iobuf's to be used by filter_{prep,commit}rw_write(). + */ +static int filter_iobuf_pool_init(struct filter_obd *filter) +{ + filter->fo_iobuf_hash = cfs_hash_create("filter_iobuf", + FILTER_IOBUF_HASH_BITS, + FILTER_IOBUF_HASH_BITS, + FILTER_IOBUF_HBKT_BITS, 0, + CFS_HASH_MIN_THETA, + CFS_HASH_MAX_THETA, + &filter_iobuf_hops, + CFS_HASH_RW_BKTLOCK | + CFS_HASH_NO_ITEMREF); + + return filter->fo_iobuf_hash != NULL ? 0 : -ENOMEM; +} + +/* Return iobuf allocated for @thread_id. + * If we haven't allocated a pool entry for this thread before, do so now and + * insert it into fo_iobuf_hash, otherwise we can find it from fo_iobuf_hash */ +void *filter_iobuf_get(struct filter_obd *filter, struct obd_trans_info *oti) +{ + struct filter_iobuf *pool = NULL; + __u64 key = 0; + int thread_id; + int rc; + + thread_id = (oti && oti->oti_thread) ? oti->oti_thread->t_id : -1; + if (thread_id >= 0) { + struct ptlrpc_service_part *svcpt; + + svcpt = oti->oti_thread->t_svcpt; + LASSERT(svcpt != NULL); + + key = (__u64)(svcpt->scp_cpt) << 32 | thread_id; + pool = cfs_hash_lookup(filter->fo_iobuf_hash, &key); + if (pool != NULL) + return pool; + } + + pool = filter_alloc_iobuf(filter, OBD_BRW_WRITE, PTLRPC_MAX_BRW_PAGES); + if (pool == NULL) + return NULL; + + if (thread_id >= 0) { + pool->dr_hkey = key; + rc = cfs_hash_add_unique(filter->fo_iobuf_hash, + &key, &pool->dr_hlist); + /* ptlrpc service thould guarantee thread ID is unique */ + LASSERT(rc != -EALREADY); + } + + return pool; } /* mount the file system (secretly). lustre_cfg parameters are: diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 67f40a7..5d1e119 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -194,7 +194,21 @@ void filter_release_cache(struct obd_device *, struct obd_ioobj *, struct niobuf_remote *, struct inode *); /* filter_io_*.c */ -struct filter_iobuf; +struct filter_iobuf { + cfs_hlist_node_t dr_hlist; + __u64 dr_hkey; + /* number of reqs being processed */ + cfs_atomic_t dr_numreqs; + cfs_waitq_t dr_wait; + int dr_max_pages; + int dr_npages; + int dr_error; + unsigned int dr_ignore_quota:1; + struct page **dr_pages; + unsigned long *dr_blocks; + struct filter_obd *dr_filter; +}; + int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, struct niobuf_remote *, int, struct niobuf_local *res, struct obd_trans_info *oti, diff --git a/lustre/obdfilter/filter_io_26.c b/lustre/obdfilter/filter_io_26.c index 6feac45..68a9a7f 100644 --- a/lustre/obdfilter/filter_io_26.c +++ b/lustre/obdfilter/filter_io_26.c @@ -57,17 +57,6 @@ /* 512byte block min */ #define MAX_BLOCKS_PER_PAGE (CFS_PAGE_SIZE / 512) -struct filter_iobuf { - cfs_atomic_t dr_numreqs; /* number of reqs being processed */ - cfs_waitq_t dr_wait; - int dr_max_pages; - int dr_npages; - int dr_error; - struct page **dr_pages; - unsigned long *dr_blocks; - unsigned int dr_ignore_quota:1; - struct filter_obd *dr_filter; -}; static void record_start_io(struct filter_iobuf *iobuf, int rw, int size, struct obd_export *exp) @@ -233,6 +222,7 @@ struct filter_iobuf *filter_alloc_iobuf(struct filter_obd *filter, if (iobuf->dr_blocks == NULL) goto failed_2; + CFS_INIT_HLIST_NODE(&iobuf->dr_hlist); iobuf->dr_filter = filter; cfs_waitq_init(&iobuf->dr_wait); cfs_atomic_set(&iobuf->dr_numreqs, 0); @@ -260,10 +250,11 @@ static void filter_clear_iobuf(struct filter_iobuf *iobuf) void filter_free_iobuf(struct filter_iobuf *iobuf) { - int num_pages = iobuf->dr_max_pages; + int num_pages = iobuf->dr_max_pages; - filter_clear_iobuf(iobuf); + filter_clear_iobuf(iobuf); + LASSERT(cfs_hlist_unhashed(&iobuf->dr_hlist)); OBD_FREE(iobuf->dr_blocks, MAX_BLOCKS_PER_PAGE * num_pages * sizeof(*iobuf->dr_blocks)); OBD_FREE(iobuf->dr_pages, @@ -282,9 +273,6 @@ void filter_iobuf_put(struct filter_obd *filter, struct filter_iobuf *iobuf, return; } - LASSERTF(filter->fo_iobuf_pool[thread_id] == iobuf, - "iobuf mismatch for thread %d: pool %p iobuf %p\n", - thread_id, filter->fo_iobuf_pool[thread_id], iobuf); filter_clear_iobuf(iobuf); } diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 179b127..ec56759 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -66,6 +66,14 @@ static int oss_num_create_threads; CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444, "number of OSS create threads to start"); +static char *oss_cpts; +CFS_MODULE_PARM(oss_cpts, "s", charp, 0444, + "CPU partitions OSS threads should run on"); + +static char *oss_io_cpts; +CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444, + "CPU partitions OSS IO threads should run on"); + /** * Do not return server-side uid/gid to remote client */ @@ -2414,7 +2422,6 @@ static int ost_thread_init(struct ptlrpc_thread *thread) LASSERT(thread != NULL); LASSERT(thread->t_data == NULL); - LASSERTF(thread->t_id <= OSS_THREADS_MAX, "%u\n", thread->t_id); OBD_ALLOC_PTR(tls); if (tls == NULL) @@ -2425,14 +2432,15 @@ static int ost_thread_init(struct ptlrpc_thread *thread) #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000) +static struct cfs_cpt_table *ost_io_cptable; + /* Sigh - really, this is an OSS, the _server_, not the _target_ */ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) { static struct ptlrpc_service_conf svc_conf; struct ost_obd *ost = &obd->u.ost; struct lprocfs_static_vars lvars; - int oss_min_threads = OSS_THREADS_MIN; - int oss_max_threads = OSS_THREADS_MAX; + nodemask_t *mask; int rc; ENTRY; @@ -2445,19 +2453,6 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) cfs_mutex_init(&ost->ost_health_mutex); - if (oss_num_threads == 0) { - /* Base min threads on memory and cpus */ - oss_min_threads = - cfs_num_online_cpus() * CFS_NUM_CACHEPAGES >> - (27 - CFS_PAGE_SHIFT); - if (oss_min_threads < OSS_THREADS_MIN) - oss_min_threads = OSS_THREADS_MIN; - /* Insure a 4x range for dynamic threads */ - if (oss_min_threads > OSS_THREADS_MAX / 4) - oss_min_threads = OSS_THREADS_MAX / 4; - oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1); - } - svc_conf = (typeof(svc_conf)) { .psc_name = LUSTRE_OSS_NAME, .psc_watchdog_factor = OSS_SERVICE_WATCHDOG_FACTOR, @@ -2471,11 +2466,17 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) }, .psc_thr = { .tc_thr_name = "ll_ost", - .tc_nthrs_min = oss_min_threads, - .tc_nthrs_max = oss_max_threads, + .tc_thr_factor = OSS_THR_FACTOR, + .tc_nthrs_init = OSS_NTHRS_INIT, + .tc_nthrs_base = OSS_NTHRS_BASE, + .tc_nthrs_max = OSS_NTHRS_MAX, .tc_nthrs_user = oss_num_threads, + .tc_cpu_affinity = 1, .tc_ctx_tags = LCT_DT_THREAD, }, + .psc_cpt = { + .cc_pattern = oss_cpts, + }, .psc_ops = { .so_req_handler = ost_handle, .so_req_printer = target_print_req, @@ -2503,11 +2504,17 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) }, .psc_thr = { .tc_thr_name = "ll_ost_create", - .tc_nthrs_min = OSS_CR_THREADS_MIN, - .tc_nthrs_max = OSS_CR_THREADS_MAX, + .tc_thr_factor = OSS_CR_THR_FACTOR, + .tc_nthrs_init = OSS_CR_NTHRS_INIT, + .tc_nthrs_base = OSS_CR_NTHRS_BASE, + .tc_nthrs_max = OSS_CR_NTHRS_MAX, .tc_nthrs_user = oss_num_create_threads, + .tc_cpu_affinity = 1, .tc_ctx_tags = LCT_DT_THREAD, }, + .psc_cpt = { + .cc_pattern = oss_cpts, + }, .psc_ops = { .so_req_handler = ost_handle, .so_req_printer = target_print_req, @@ -2521,6 +2528,31 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) GOTO(out_service, rc); } + mask = cfs_cpt_table->ctb_nodemask; + /* event CPT feature is disabled in libcfs level by set partition + * number to 1, we still want to set node affinity for io service */ + if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) { + int cpt = 0; + int i; + + ost_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask)); + for_each_node_mask(i, *mask) { + if (ost_io_cptable == NULL) { + CWARN("OSS failed to create CPT table\n"); + break; + } + + rc = cfs_cpt_set_node(ost_io_cptable, cpt++, i); + if (!rc) { + CWARN("OSS Failed to set node %d for" + "IO CPT table\n", i); + cfs_cpt_table_free(ost_io_cptable); + ost_io_cptable = NULL; + break; + } + } + } + memset(&svc_conf, 0, sizeof(svc_conf)); svc_conf = (typeof(svc_conf)) { .psc_name = "ost_io", @@ -2535,12 +2567,19 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg) }, .psc_thr = { .tc_thr_name = "ll_ost_io", - .tc_nthrs_min = oss_min_threads, - .tc_nthrs_max = oss_max_threads, + .tc_thr_factor = OSS_THR_FACTOR, + .tc_nthrs_init = OSS_NTHRS_INIT, + .tc_nthrs_base = OSS_NTHRS_BASE, + .tc_nthrs_max = OSS_NTHRS_MAX, .tc_nthrs_user = oss_num_threads, .tc_cpu_affinity = 1, .tc_ctx_tags = LCT_DT_THREAD, }, + .psc_cpt = { + .cc_cptable = ost_io_cptable, + .cc_pattern = ost_io_cptable == NULL ? + oss_io_cpts : NULL, + }, .psc_ops = { .so_thr_init = ost_thread_init, .so_thr_done = ost_thread_done, @@ -2592,11 +2631,16 @@ static int ost_cleanup(struct obd_device *obd) ost->ost_create_service = NULL; ost->ost_io_service = NULL; - cfs_mutex_unlock(&ost->ost_health_mutex); + cfs_mutex_unlock(&ost->ost_health_mutex); - lprocfs_obd_cleanup(obd); + lprocfs_obd_cleanup(obd); + + if (ost_io_cptable != NULL) { + cfs_cpt_table_free(ost_io_cptable); + ost_io_cptable = NULL; + } - RETURN(err); + RETURN(err); } static int ost_health_check(const struct lu_env *env, struct obd_device *obd) diff --git a/lustre/ost/ost_internal.h b/lustre/ost/ost_internal.h index a751b43..ff77a52 100644 --- a/lustre/ost/ost_internal.h +++ b/lustre/ost/ost_internal.h @@ -62,10 +62,6 @@ struct ost_thread_local_cache { struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r); -/* threads for handling "create" request */ -#define OSS_CR_THREADS_MIN 2UL -#define OSS_CR_THREADS_MAX 16UL - /* Quota stuff */ extern quota_interface_t *quota_interface; diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 5565049..a35e3c5 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -235,17 +235,12 @@ void client_bulk_callback (lnet_event_t *ev) * * it might not be precise but should be good enough. */ -#define REQS_ALL_BITS(svcpt) ((int)(sizeof((svcpt)->scp_hist_seq) * 8)) -#define REQS_SEC_BITS 32 -#define REQS_USEC_BITS 16 -/* will be replaced by bits for total service partition number soon */ -#define REQS_CPT_BITS(svcpt) 0 -#define REQS_SEQ_BITS(svcpt) (REQS_ALL_BITS(svcpt) - REQS_CPT_BITS(svcpt) -\ - REQS_SEC_BITS - REQS_USEC_BITS) - -#define REQS_SEQ_SHIFT(svcpt) (REQS_CPT_BITS(svcpt)) -#define REQS_USEC_SHIFT(svcpt) (REQS_SEQ_SHIFT(svcpt) + REQS_SEQ_BITS(svcpt)) -#define REQS_SEC_SHIFT(svcpt) (REQS_USEC_SHIFT(svcpt) + REQS_USEC_BITS) + +#define REQS_CPT_BITS(svcpt) ((svcpt)->scp_service->srv_cpt_bits) + +#define REQS_SEC_SHIFT 32 +#define REQS_USEC_SHIFT 16 +#define REQS_SEQ_SHIFT(svcpt) REQS_CPT_BITS(svcpt) static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt, struct ptlrpc_request *req) @@ -257,21 +252,20 @@ static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt, /* set sequence ID for request and add it to history list, * it must be called with hold svcpt::scp_lock */ - LASSERT(REQS_SEQ_BITS(svcpt) > 0); - - new_seq = (sec << REQS_SEC_SHIFT(svcpt)) | - (usec << REQS_USEC_SHIFT(svcpt)) | svcpt->scp_cpt; + new_seq = (sec << REQS_SEC_SHIFT) | + (usec << REQS_USEC_SHIFT) | svcpt->scp_cpt; if (new_seq > svcpt->scp_hist_seq) { /* This handles the initial case of scp_hist_seq == 0 or * we just jumped into a new time window */ svcpt->scp_hist_seq = new_seq; } else { + LASSERT(REQS_SEQ_SHIFT(svcpt) < REQS_USEC_SHIFT); /* NB: increase sequence number in current usec bucket, * however, it's possible that we used up all bits for * sequence and jumped into the next usec bucket (future time), * then we hope there will be less RPCs per bucket at some * point, and sequence will catch up again */ - svcpt->scp_hist_seq += (1U << REQS_CPT_BITS(svcpt)); + svcpt->scp_hist_seq += (1U << REQS_SEQ_SHIFT(svcpt)); new_seq = svcpt->scp_hist_seq; } diff --git a/lustre/ptlrpc/lproc_ptlrpc.c b/lustre/ptlrpc/lproc_ptlrpc.c index 0d6ceec..cec0417 100644 --- a/lustre/ptlrpc/lproc_ptlrpc.c +++ b/lustre/ptlrpc/lproc_ptlrpc.c @@ -251,30 +251,44 @@ ptlrpc_lprocfs_read_req_history_len(char *page, char **start, off_t off, int count, int *eof, void *data) { struct ptlrpc_service *svc = data; + struct ptlrpc_service_part *svcpt; + int total = 0; + int i; *eof = 1; - return snprintf(page, count, "%d\n", svc->srv_part->scp_hist_nrqbds); + + ptlrpc_service_for_each_part(svcpt, i, svc) + total += svcpt->scp_hist_nrqbds; + + return snprintf(page, count, "%d\n", total); } static int ptlrpc_lprocfs_read_req_history_max(char *page, char **start, off_t off, int count, int *eof, void *data) { - struct ptlrpc_service *svc = data; + struct ptlrpc_service *svc = data; + struct ptlrpc_service_part *svcpt; + int total = 0; + int i; - *eof = 1; - return snprintf(page, count, "%d\n", svc->srv_max_history_rqbds); + *eof = 1; + ptlrpc_service_for_each_part(svcpt, i, svc) + total += svc->srv_hist_nrqbds_cpt_max; + + return snprintf(page, count, "%d\n", total); } static int ptlrpc_lprocfs_write_req_history_max(struct file *file, const char *buffer, unsigned long count, void *data) { - struct ptlrpc_service *svc = data; - int bufpages; - int val; - int rc = lprocfs_write_helper(buffer, count, &val); + struct ptlrpc_service *svc = data; + int bufpages; + int val; + int rc; + rc = lprocfs_write_helper(buffer, count, &val); if (rc < 0) return rc; @@ -289,7 +303,12 @@ ptlrpc_lprocfs_write_req_history_max(struct file *file, const char *buffer, return -ERANGE; cfs_spin_lock(&svc->srv_lock); - svc->srv_max_history_rqbds = val; + + if (val == 0) + svc->srv_hist_nrqbds_cpt_max = 0; + else + svc->srv_hist_nrqbds_cpt_max = max(1, (val / svc->srv_ncpts)); + cfs_spin_unlock(&svc->srv_lock); return count; @@ -297,34 +316,36 @@ ptlrpc_lprocfs_write_req_history_max(struct file *file, const char *buffer, static int ptlrpc_lprocfs_rd_threads_min(char *page, char **start, off_t off, - int count, int *eof, void *data) + int count, int *eof, void *data) { - struct ptlrpc_service *svc = data; + struct ptlrpc_service *svc = data; - return snprintf(page, count, "%d\n", svc->srv_threads_min); + return snprintf(page, count, "%d\n", + svc->srv_nthrs_cpt_init * svc->srv_ncpts); } static int ptlrpc_lprocfs_wr_threads_min(struct file *file, const char *buffer, unsigned long count, void *data) { - struct ptlrpc_service *svc = data; - int val; - int rc = lprocfs_write_helper(buffer, count, &val); + struct ptlrpc_service *svc = data; + int val; + int rc = lprocfs_write_helper(buffer, count, &val); - if (rc < 0) - return rc; + if (rc < 0) + return rc; - if (val < 2) - return -ERANGE; + if (val / svc->srv_ncpts < PTLRPC_NTHRS_INIT) + return -ERANGE; cfs_spin_lock(&svc->srv_lock); - if (val > svc->srv_threads_max) { + if (val > svc->srv_nthrs_cpt_limit * svc->srv_ncpts) { cfs_spin_unlock(&svc->srv_lock); return -ERANGE; } - svc->srv_threads_min = val; + svc->srv_nthrs_cpt_init = val / svc->srv_ncpts; + cfs_spin_unlock(&svc->srv_lock); return count; @@ -335,50 +356,58 @@ ptlrpc_lprocfs_rd_threads_started(char *page, char **start, off_t off, int count, int *eof, void *data) { struct ptlrpc_service *svc = data; + struct ptlrpc_service_part *svcpt; + int total = 0; + int i; - LASSERT(svc->srv_part != NULL); - return snprintf(page, count, "%d\n", - svc->srv_part->scp_nthrs_running); + LASSERT(svc->srv_parts != NULL); + ptlrpc_service_for_each_part(svcpt, i, svc) + total += svcpt->scp_nthrs_running; + + return snprintf(page, count, "%d\n", total); } static int ptlrpc_lprocfs_rd_threads_max(char *page, char **start, off_t off, - int count, int *eof, void *data) + int count, int *eof, void *data) { - struct ptlrpc_service *svc = data; + struct ptlrpc_service *svc = data; - return snprintf(page, count, "%d\n", svc->srv_threads_max); + return snprintf(page, count, "%d\n", + svc->srv_nthrs_cpt_limit * svc->srv_ncpts); } static int ptlrpc_lprocfs_wr_threads_max(struct file *file, const char *buffer, - unsigned long count, void *data) + unsigned long count, void *data) { - struct ptlrpc_service *svc = data; - int val; - int rc = lprocfs_write_helper(buffer, count, &val); + struct ptlrpc_service *svc = data; + int val; + int rc = lprocfs_write_helper(buffer, count, &val); - if (rc < 0) - return rc; + if (rc < 0) + return rc; - if (val < 2) - return -ERANGE; + if (val / svc->srv_ncpts < PTLRPC_NTHRS_INIT) + return -ERANGE; cfs_spin_lock(&svc->srv_lock); - if (val < svc->srv_threads_min) { + if (val < svc->srv_nthrs_cpt_init * svc->srv_ncpts) { cfs_spin_unlock(&svc->srv_lock); return -ERANGE; } - svc->srv_threads_max = val; + svc->srv_nthrs_cpt_limit = val / svc->srv_ncpts; + cfs_spin_unlock(&svc->srv_lock); return count; } struct ptlrpc_srh_iterator { - __u64 srhi_seq; - struct ptlrpc_request *srhi_req; + int srhi_idx; + __u64 srhi_seq; + struct ptlrpc_request *srhi_req; }; int @@ -423,28 +452,33 @@ ptlrpc_lprocfs_svc_req_history_seek(struct ptlrpc_service_part *svcpt, static void * ptlrpc_lprocfs_svc_req_history_start(struct seq_file *s, loff_t *pos) { - struct ptlrpc_service *svc = s->private; - struct ptlrpc_srh_iterator *srhi; - int rc; + struct ptlrpc_service *svc = s->private; + struct ptlrpc_service_part *svcpt; + struct ptlrpc_srh_iterator *srhi; + int rc; + int i; - OBD_ALLOC(srhi, sizeof(*srhi)); - if (srhi == NULL) - return NULL; + OBD_ALLOC(srhi, sizeof(*srhi)); + if (srhi == NULL) + return NULL; - srhi->srhi_seq = 0; - srhi->srhi_req = NULL; + srhi->srhi_seq = 0; + srhi->srhi_req = NULL; - cfs_spin_lock(&svc->srv_part->scp_lock); - rc = ptlrpc_lprocfs_svc_req_history_seek(svc->srv_part, srhi, *pos); - cfs_spin_unlock(&svc->srv_part->scp_lock); + ptlrpc_service_for_each_part(svcpt, i, svc) { + srhi->srhi_idx = i; - if (rc == 0) { - *pos = srhi->srhi_seq; - return srhi; - } + cfs_spin_lock(&svcpt->scp_lock); + rc = ptlrpc_lprocfs_svc_req_history_seek(svcpt, srhi, *pos); + cfs_spin_unlock(&svcpt->scp_lock); + if (rc == 0) { + *pos = srhi->srhi_seq; + return srhi; + } + } - OBD_FREE(srhi, sizeof(*srhi)); - return NULL; + OBD_FREE(srhi, sizeof(*srhi)); + return NULL; } static void @@ -461,13 +495,22 @@ ptlrpc_lprocfs_svc_req_history_next(struct seq_file *s, void *iter, loff_t *pos) { struct ptlrpc_service *svc = s->private; - struct ptlrpc_service_part *svcpt = svc->srv_part; struct ptlrpc_srh_iterator *srhi = iter; - int rc; + struct ptlrpc_service_part *svcpt; + int rc = 0; + int i; - cfs_spin_lock(&svcpt->scp_lock); - rc = ptlrpc_lprocfs_svc_req_history_seek(svcpt, srhi, *pos + 1); - cfs_spin_unlock(&svcpt->scp_lock); + for (i = srhi->srhi_idx; i < svc->srv_ncpts; i++) { + svcpt = svc->srv_parts[i]; + + srhi->srhi_idx = i; + + cfs_spin_lock(&svcpt->scp_lock); + rc = ptlrpc_lprocfs_svc_req_history_seek(svcpt, srhi, *pos + 1); + cfs_spin_unlock(&svcpt->scp_lock); + if (rc == 0) + break; + } if (rc != 0) { OBD_FREE(srhi, sizeof(*srhi)); @@ -511,11 +554,15 @@ EXPORT_SYMBOL(target_print_req); static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter) { struct ptlrpc_service *svc = s->private; - struct ptlrpc_service_part *svcpt = svc->srv_part; struct ptlrpc_srh_iterator *srhi = iter; + struct ptlrpc_service_part *svcpt; struct ptlrpc_request *req; int rc; + LASSERT(srhi->srhi_idx < svc->srv_ncpts); + + svcpt = svc->srv_parts[srhi->srhi_idx]; + cfs_spin_lock(&svcpt->scp_lock); rc = ptlrpc_lprocfs_svc_req_history_seek(svcpt, srhi, srhi->srhi_seq); @@ -571,6 +618,8 @@ ptlrpc_lprocfs_svc_req_history_open(struct inode *inode, struct file *file) return 0; } +#define PTLRPC_AT_LINE_SIZE 128 + /* See also lprocfs_rd_timeouts */ static int ptlrpc_lprocfs_rd_timeouts(char *page, char **start, off_t off, int count, int *eof, void *data) @@ -581,25 +630,53 @@ static int ptlrpc_lprocfs_rd_timeouts(char *page, char **start, off_t off, time_t worstt; unsigned int cur; unsigned int worst; + int nob = 0; int rc = 0; + int cpt; + int i; - svcpt = svc->srv_part; - LASSERT(svcpt != NULL); + LASSERT(svc->srv_parts != NULL); + + if (AT_OFF) { + rc += snprintf(page + rc, count - rc, + "adaptive timeouts off, using obd_timeout %u\n", + obd_timeout); + *eof = 1; + return rc; + } + + cpt = ((unsigned)off) / PTLRPC_AT_LINE_SIZE; + + ptlrpc_service_for_each_part(svcpt, i, svc) { + if (i < cpt) + continue; + + cur = at_get(&svcpt->scp_at_estimate); + worst = svcpt->scp_at_estimate.at_worst_ever; + worstt = svcpt->scp_at_estimate.at_worst_time; + s2dhms(&ts, cfs_time_current_sec() - worstt); + + nob = snprintf(page + rc, count - rc, + "%10s : cur %3u worst %3u (at %ld, " + DHMS_FMT" ago) ", "service", + cur, worst, worstt, DHMS_VARS(&ts)); + + nob += lprocfs_at_hist_helper(page, count, rc + nob, + &svcpt->scp_at_estimate); + LASSERT(nob < PTLRPC_AT_LINE_SIZE); + /* fill the whole line with spaces, so we can locate + * partition by offset on the next call... */ + memset(page + rc + nob, ' ', PTLRPC_AT_LINE_SIZE - nob); + page[rc + PTLRPC_AT_LINE_SIZE - 1] = '\n'; + rc += PTLRPC_AT_LINE_SIZE; + + if (count - rc < PTLRPC_AT_LINE_SIZE) + break; + } + + if (i == svc->srv_ncpts - 1) + *eof = 1; - *eof = 1; - cur = at_get(&svcpt->scp_at_estimate); - worst = svcpt->scp_at_estimate.at_worst_ever; - worstt = svcpt->scp_at_estimate.at_worst_time; - s2dhms(&ts, cfs_time_current_sec() - worstt); - if (AT_OFF) - rc += snprintf(page + rc, count - rc, - "adaptive timeouts off, using obd_timeout %u\n", - obd_timeout); - rc += snprintf(page + rc, count - rc, - "%10s : cur %3u worst %3u (at %ld, "DHMS_FMT" ago) ", - "service", cur, worst, worstt, - DHMS_VARS(&ts)); - rc = lprocfs_at_hist_helper(page, count, rc, &svcpt->scp_at_estimate); return rc; } @@ -614,14 +691,16 @@ static int ptlrpc_lprocfs_rd_hp_ratio(char *page, char **start, off_t off, static int ptlrpc_lprocfs_wr_hp_ratio(struct file *file, const char *buffer, unsigned long count, void *data) { - struct ptlrpc_service *svc = data; - int rc, val; + struct ptlrpc_service *svc = data; + int rc; + int val; - rc = lprocfs_write_helper(buffer, count, &val); - if (rc < 0) - return rc; - if (val < 0) - return -ERANGE; + rc = lprocfs_write_helper(buffer, count, &val); + if (rc < 0) + return rc; + + if (val < 0) + return -ERANGE; cfs_spin_lock(&svc->srv_lock); svc->srv_hpreq_ratio = val; diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 69c616f..179a8a8 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -775,6 +775,8 @@ int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd) if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_RQBD)) return (-ENOMEM); + /* NB: We need to replace LNET_INS_AFTER with LNET_INS_LOCAL + * after LNet SMP patches landed */ rc = LNetMEAttach(service->srv_req_portal, match_id, 0, ~0, LNET_UNLINK, LNET_INS_AFTER, &me_h); if (rc != 0) { diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 84f3474..d601924 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -75,21 +75,21 @@ ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt) struct ptlrpc_service *svc = svcpt->scp_service; struct ptlrpc_request_buffer_desc *rqbd; - OBD_ALLOC_PTR(rqbd); + OBD_CPT_ALLOC_PTR(rqbd, svc->srv_cptable, svcpt->scp_cpt); if (rqbd == NULL) return NULL; rqbd->rqbd_svcpt = svcpt; - rqbd->rqbd_refcount = 0; - rqbd->rqbd_cbid.cbid_fn = request_in_callback; - rqbd->rqbd_cbid.cbid_arg = rqbd; - CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs); - OBD_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size); - - if (rqbd->rqbd_buffer == NULL) { - OBD_FREE_PTR(rqbd); - return (NULL); - } + rqbd->rqbd_refcount = 0; + rqbd->rqbd_cbid.cbid_fn = request_in_callback; + rqbd->rqbd_cbid.cbid_arg = rqbd; + CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs); + OBD_CPT_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_cptable, + svcpt->scp_cpt, svc->srv_buf_size); + if (rqbd->rqbd_buffer == NULL) { + OBD_FREE_PTR(rqbd); + return NULL; + } cfs_spin_lock(&svcpt->scp_lock); cfs_list_add(&rqbd->rqbd_list, &svcpt->scp_rqbd_idle); @@ -117,7 +117,7 @@ ptlrpc_free_rqbd(struct ptlrpc_request_buffer_desc *rqbd) } int -ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt) +ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post) { struct ptlrpc_service *svc = svcpt->scp_service; struct ptlrpc_request_buffer_desc *rqbd; @@ -138,17 +138,15 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt) rc = -ENOMEM; break; } - - if (ptlrpc_server_post_idle_rqbds(svcpt) < 0) { - rc = -EAGAIN; - break; - } } CDEBUG(D_RPCTRACE, "%s: allocate %d new %d-byte reqbufs (%d/%d left), rc = %d\n", - svc->srv_name, i, svc->srv_buf_size, - svcpt->scp_nrqbds_posted, svcpt->scp_nrqbds_total, rc); + svc->srv_name, i, svc->srv_buf_size, svcpt->scp_nrqbds_posted, + svcpt->scp_nrqbds_total, rc); + + if (post && rc == 0) + rc = ptlrpc_server_post_idle_rqbds(svcpt); return rc; } @@ -451,49 +449,106 @@ static void ptlrpc_at_timer(unsigned long castmeharder) } static void -ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf, - int *min_p, int *max_p) +ptlrpc_server_nthreads_check(struct ptlrpc_service *svc, + struct ptlrpc_service_conf *conf) { #ifdef __KERNEL__ struct ptlrpc_service_thr_conf *tc = &conf->psc_thr; - int nthrs_min; - int nthrs; + unsigned init; + unsigned total; + unsigned nthrs; + int weight; - nthrs_min = PTLRPC_NTHRS_MIN + (conf->psc_ops.so_hpreq_handler != NULL); - nthrs_min = max_t(int, nthrs_min, tc->tc_nthrs_min); + /* + * Common code for estimating & validating threads number. + * CPT affinity service could have percpt thread-pool instead + * of a global thread-pool, which means user might not always + * get the threads number they give it in conf::tc_nthrs_user + * even they did set. It's because we need to validate threads + * number for each CPT to guarantee each pool will have enough + * threads to keep the service healthy. + */ + init = PTLRPC_NTHRS_INIT + (svc->srv_ops.so_hpreq_handler != NULL); + init = max_t(int, init, tc->tc_nthrs_init); + + /* NB: please see comments in lustre_lnet.h for definition + * details of these members */ + LASSERT(tc->tc_nthrs_max != 0); + + if (tc->tc_nthrs_user != 0) { + /* In case there is a reason to test a service with many + * threads, we give a less strict check here, it can + * be up to 8 * nthrs_max */ + total = min(tc->tc_nthrs_max * 8, tc->tc_nthrs_user); + nthrs = total / svc->srv_ncpts; + init = max(init, nthrs); + goto out; + } - nthrs = tc->tc_nthrs_user; - if (nthrs != 0) { /* validate it */ - nthrs = min_t(int, nthrs, tc->tc_nthrs_max); - nthrs = max_t(int, nthrs, nthrs_min); - *min_p = *max_p = nthrs; - return; + total = tc->tc_nthrs_max; + if (tc->tc_nthrs_base == 0) { + /* don't care about base threads number per partition, + * this is most for non-affinity service */ + nthrs = total / svc->srv_ncpts; + goto out; } - /* - * NB: we will add some common code here for estimating, for example: - * add a new member ptlrpc_service_thr_conf::tc_factor, and estimate - * threads number based on: - * (online_cpus * conf::tc_factor) + conf::tc_nthrs_base. - * - * So we can remove code block like estimation in ost_setup, also, - * we might estimate MDS threads number as well instead of using - * absolute number, and have more threads on fat servers to improve - * availability of service. - * - * Also, we will need to validate threads number at here for - * CPT affinity service (CPU ParTion) in the future. - * A service can have percpt thread-pool instead of a global thread - * pool for each service, which means user might not always get the - * threads number they want even they set it in conf::tc_nthrs_user, - * because we need to adjust threads number for each CPT, instead of - * just use (conf::tc_nthrs_user / NCPTS), to make sure each pool - * will be healthy. - */ - *max_p = tc->tc_nthrs_max; - *min_p = nthrs_min; -#else /* __KERNEL__ */ - *max_p = *min_p = 1; /* whatever */ + nthrs = tc->tc_nthrs_base; + if (svc->srv_ncpts == 1) { + int i; + + /* NB: Increase the base number if it's single partition + * and total number of cores/HTs is larger or equal to 4. + * result will always < 2 * nthrs_base */ + weight = cfs_cpt_weight(svc->srv_cptable, CFS_CPT_ANY); + for (i = 1; (weight >> (i + 1)) != 0 && /* >= 4 cores/HTs */ + (tc->tc_nthrs_base >> i) != 0; i++) + nthrs += tc->tc_nthrs_base >> i; + } + + if (tc->tc_thr_factor != 0) { + int factor = tc->tc_thr_factor; + const int fade = 4; + + /* + * User wants to increase number of threads with for + * each CPU core/HT, most likely the factor is larger then + * one thread/core because service threads are supposed to + * be blocked by lock or wait for IO. + */ + /* + * Amdahl's law says that adding processors wouldn't give + * a linear increasing of parallelism, so it's nonsense to + * have too many threads no matter how many cores/HTs + * there are. + */ + if (cfs_cpu_ht_nsiblings(0) > 1) { /* weight is # of HTs */ + /* depress thread factor for hyper-thread */ + factor = factor - (factor >> 1) + (factor >> 3); + } + + weight = cfs_cpt_weight(svc->srv_cptable, 0); + LASSERT(weight > 0); + + for (; factor > 0 && weight > 0; factor--, weight -= fade) + nthrs += min(weight, fade) * factor; + } + + if (nthrs * svc->srv_ncpts > tc->tc_nthrs_max) { + nthrs = max(tc->tc_nthrs_base, + tc->tc_nthrs_max / svc->srv_ncpts); + } + out: + nthrs = max(nthrs, tc->tc_nthrs_init); + svc->srv_nthrs_cpt_limit = nthrs; + svc->srv_nthrs_cpt_init = init; + + if (nthrs * svc->srv_ncpts > tc->tc_nthrs_max) { + LCONSOLE_WARN("%s: This service may have more threads (%d) " + "than the given soft limit (%d)\n", + svc->srv_name, nthrs * svc->srv_ncpts, + tc->tc_nthrs_max); + } #endif } @@ -502,13 +557,14 @@ ptlrpc_server_nthreads_check(struct ptlrpc_service_conf *conf, */ static int ptlrpc_service_part_init(struct ptlrpc_service *svc, - struct ptlrpc_service_part *svcpt) + struct ptlrpc_service_part *svcpt, int cpt) { struct ptlrpc_at_array *array; int size; int index; int rc; + svcpt->scp_cpt = cpt; CFS_INIT_LIST_HEAD(&svcpt->scp_threads); /* rqbd and incoming request queue */ @@ -546,14 +602,16 @@ ptlrpc_service_part_init(struct ptlrpc_service *svc, array->paa_deadline = -1; /* allocate memory for scp_at_array (ptlrpc_at_array) */ - OBD_ALLOC(array->paa_reqs_array, sizeof(cfs_list_t) * size); + OBD_CPT_ALLOC(array->paa_reqs_array, + svc->srv_cptable, cpt, sizeof(cfs_list_t) * size); if (array->paa_reqs_array == NULL) return -ENOMEM; for (index = 0; index < size; index++) CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]); - OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size); + OBD_CPT_ALLOC(array->paa_reqs_count, + svc->srv_cptable, cpt, sizeof(__u32) * size); if (array->paa_reqs_count == NULL) goto failed; @@ -565,7 +623,7 @@ ptlrpc_service_part_init(struct ptlrpc_service *svc, /* assign this before call ptlrpc_grow_req_bufs */ svcpt->scp_service = svc; /* Now allocate the request buffers, but don't post them now */ - rc = ptlrpc_grow_req_bufs(svcpt); + rc = ptlrpc_grow_req_bufs(svcpt, 0); /* We shouldn't be under memory pressure at startup, so * fail if we can't allocate all our buffers at this time. */ if (rc != 0) @@ -597,8 +655,15 @@ struct ptlrpc_service * ptlrpc_register_service(struct ptlrpc_service_conf *conf, cfs_proc_dir_entry_t *proc_entry) { + struct ptlrpc_service_cpt_conf *cconf = &conf->psc_cpt; struct ptlrpc_service *service; + struct ptlrpc_service_part *svcpt; + struct cfs_cpt_table *cptable; + __u32 *cpts = NULL; + int ncpts; + int cpt; int rc; + int i; ENTRY; LASSERT(conf->psc_buf.bc_nbufs > 0); @@ -606,9 +671,51 @@ ptlrpc_register_service(struct ptlrpc_service_conf *conf, conf->psc_buf.bc_req_max_size + SPTLRPC_MAX_PAYLOAD); LASSERT(conf->psc_thr.tc_ctx_tags != 0); - OBD_ALLOC_PTR(service); - if (service == NULL) + cptable = cconf->cc_cptable; + if (cptable == NULL) + cptable = cfs_cpt_table; + + if (!conf->psc_thr.tc_cpu_affinity) { + ncpts = 1; + } else { + ncpts = cfs_cpt_number(cptable); + if (cconf->cc_pattern != NULL) { + struct cfs_expr_list *el; + + rc = cfs_expr_list_parse(cconf->cc_pattern, + strlen(cconf->cc_pattern), + 0, ncpts - 1, &el); + if (rc != 0) { + CERROR("%s: invalid CPT pattern string: %s", + conf->psc_name, cconf->cc_pattern); + RETURN(ERR_PTR(-EINVAL)); + } + + rc = cfs_expr_list_values(el, ncpts, &cpts); + cfs_expr_list_free(el); + if (rc <= 0) { + CERROR("%s: failed to parse CPT array %s: %d\n", + conf->psc_name, cconf->cc_pattern, rc); + RETURN(ERR_PTR(rc < 0 ? rc : -EINVAL)); + } + ncpts = rc; + } + } + + OBD_ALLOC(service, offsetof(struct ptlrpc_service, srv_parts[ncpts])); + if (service == NULL) { + if (cpts != NULL) + OBD_FREE(cpts, sizeof(*cpts) * ncpts); RETURN(ERR_PTR(-ENOMEM)); + } + + service->srv_cptable = cptable; + service->srv_cpts = cpts; + service->srv_ncpts = ncpts; + + service->srv_cpt_bits = 0; /* it's zero already, easy to read... */ + while ((1 << service->srv_cpt_bits) < cfs_cpt_number(cptable)) + service->srv_cpt_bits++; /* public members */ cfs_spin_lock_init(&service->srv_lock); @@ -617,8 +724,9 @@ ptlrpc_register_service(struct ptlrpc_service_conf *conf, CFS_INIT_LIST_HEAD(&service->srv_list); /* for safty of cleanup */ /* buffer configuration */ - service->srv_nbuf_per_group = test_req_buffer_pressure ? - 1 : conf->psc_buf.bc_nbufs; + service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 : + max(conf->psc_buf.bc_nbufs / + service->srv_ncpts, 1U); service->srv_max_req_size = conf->psc_buf.bc_req_max_size + SPTLRPC_MAX_PAYLOAD; service->srv_buf_size = conf->psc_buf.bc_buf_size; @@ -631,22 +739,28 @@ ptlrpc_register_service(struct ptlrpc_service_conf *conf, conf->psc_buf.bc_rep_max_size + SPTLRPC_MAX_PAYLOAD) service->srv_max_reply_size <<= 1; - ptlrpc_server_nthreads_check(conf, &service->srv_threads_min, - &service->srv_threads_max); - service->srv_thread_name = conf->psc_thr.tc_thr_name; service->srv_ctx_tags = conf->psc_thr.tc_ctx_tags; - service->srv_cpu_affinity = !!conf->psc_thr.tc_cpu_affinity; service->srv_hpreq_ratio = PTLRPC_SVC_HP_RATIO; service->srv_ops = conf->psc_ops; - OBD_ALLOC_PTR(service->srv_part); - if (service->srv_part == NULL) - GOTO(failed, rc = -ENOMEM); + for (i = 0; i < ncpts; i++) { + if (!conf->psc_thr.tc_cpu_affinity) + cpt = CFS_CPT_ANY; + else + cpt = cpts != NULL ? cpts[i] : i; - rc = ptlrpc_service_part_init(service, service->srv_part); - if (rc != 0) - GOTO(failed, rc); + OBD_CPT_ALLOC(svcpt, cptable, cpt, sizeof(*svcpt)); + if (svcpt == NULL) + GOTO(failed, rc = -ENOMEM); + + service->srv_parts[i] = svcpt; + rc = ptlrpc_service_part_init(service, svcpt, cpt); + if (rc != 0) + GOTO(failed, rc); + } + + ptlrpc_server_nthreads_check(service, conf); rc = LNetSetLazyPortal(service->srv_req_portal); LASSERT(rc == 0); @@ -754,7 +868,7 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req) /* cull some history? * I expect only about 1 or 2 rqbds need to be recycled here */ - while (svcpt->scp_hist_nrqbds > svc->srv_max_history_rqbds) { + while (svcpt->scp_hist_nrqbds > svc->srv_hist_nrqbds_cpt_max) { rqbd = cfs_list_entry(svcpt->scp_hist_rqbds.next, struct ptlrpc_request_buffer_desc, rqbd_list); @@ -2008,7 +2122,8 @@ liblustre_check_services (void *arg) cfs_list_entry (tmp, struct ptlrpc_service, srv_list); struct ptlrpc_service_part *svcpt; - svcpt = svc->srv_part; + LASSERT(svc->srv_ncpts == 1); + svcpt = svc->srv_parts[0]; if (svcpt->scp_nthrs_running != 0) /* I've recursed */ continue; @@ -2053,7 +2168,7 @@ ptlrpc_check_rqbd_pool(struct ptlrpc_service_part *svcpt) * space. */ if (avail <= low_water) - ptlrpc_grow_req_bufs(svcpt); + ptlrpc_grow_req_bufs(svcpt, 1); if (svcpt->scp_service->srv_stats) { lprocfs_counter_add(svcpt->scp_service->srv_stats, @@ -2087,7 +2202,8 @@ static inline int ptlrpc_threads_increasable(struct ptlrpc_service_part *svcpt) { return svcpt->scp_nthrs_running + - svcpt->scp_nthrs_starting < svcpt->scp_service->srv_threads_max; + svcpt->scp_nthrs_starting < + svcpt->scp_service->srv_nthrs_cpt_limit; } /** @@ -2180,24 +2296,14 @@ static int ptlrpc_main(void *arg) thread->t_pid = cfs_curproc_pid(); cfs_daemonize_ctxt(thread->t_name); -#if defined(HAVE_NODE_TO_CPUMASK) && defined(CONFIG_NUMA) - /* we need to do this before any per-thread allocation is done so that - * we get the per-thread allocations on local node. bug 7342 */ - if (svc->srv_cpu_affinity) { - int cpu, num_cpu; - - for (cpu = 0, num_cpu = 0; cpu < cfs_num_possible_cpus(); - cpu++) { - if (!cpu_online(cpu)) - continue; - if (num_cpu == thread->t_id % cfs_num_online_cpus()) - break; - num_cpu++; - } - cfs_set_cpus_allowed(cfs_current(), - node_to_cpumask(cpu_to_node(cpu))); - } -#endif + /* NB: we will call cfs_cpt_bind() for all threads, because we + * might want to run lustre server only on a subset of system CPUs, + * in that case ->scp_cpt is CFS_CPT_ANY */ + rc = cfs_cpt_bind(svc->srv_cptable, svcpt->scp_cpt); + if (rc != 0) { + CWARN("%s: failed to bind %s on CPT %d\n", + svc->srv_name, thread->t_name, svcpt->scp_cpt); + } #ifdef WITH_GROUP_INFO ginfo = cfs_groups_alloc(0); @@ -2231,6 +2337,16 @@ static int ptlrpc_main(void *arg) env->le_ctx.lc_thread = thread; env->le_ctx.lc_cookie = 0x6; + while (!cfs_list_empty(&svcpt->scp_rqbd_idle)) { + rc = ptlrpc_server_post_idle_rqbds(svcpt); + if (rc >= 0) + continue; + + CERROR("Failed to post rqbd for %s on CPT %d: %d\n", + svc->srv_name, svcpt->scp_cpt, rc); + goto out_srv_fini; + } + /* Alloc reply state structure for this one */ OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size); if (!rs) { @@ -2541,36 +2657,47 @@ static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt) */ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) { + struct ptlrpc_service_part *svcpt; + int i; ENTRY; - if (svc != NULL && svc->srv_part != NULL) - ptlrpc_svcpt_stop_threads(svc->srv_part); + ptlrpc_service_for_each_part(svcpt, i, svc) { + if (svcpt->scp_service != NULL) + ptlrpc_svcpt_stop_threads(svcpt); + } + EXIT; } int ptlrpc_start_threads(struct ptlrpc_service *svc) { - int i, rc = 0; - ENTRY; + int rc = 0; + int i; + int j; + ENTRY; - /* We require 2 threads min - see note in - ptlrpc_server_handle_request */ - LASSERT(svc->srv_threads_min >= 2); - for (i = 0; i < svc->srv_threads_min; i++) { - rc = ptlrpc_start_thread(svc->srv_part, 1); - /* We have enough threads, don't start more. b=15759 */ - if (rc == -EMFILE) { - rc = 0; - break; - } - if (rc) { - CERROR("cannot start %s thread #%d: rc %d\n", - svc->srv_thread_name, i, rc); - ptlrpc_stop_all_threads(svc); - break; - } - } - RETURN(rc); + /* We require 2 threads min, see note in ptlrpc_server_handle_request */ + LASSERT(svc->srv_nthrs_cpt_init >= PTLRPC_NTHRS_INIT); + + for (i = 0; i < svc->srv_ncpts; i++) { + for (j = 0; j < svc->srv_nthrs_cpt_init; j++) { + rc = ptlrpc_start_thread(svc->srv_parts[i], 1); + if (rc == 0) + continue; + + if (rc != -EMFILE) + goto failed; + /* We have enough threads, don't start more. b=15759 */ + break; + } + } + + RETURN(0); + failed: + CERROR("cannot start %s thread #%d_%d: rc %d\n", + svc->srv_thread_name, i, j, rc); + ptlrpc_stop_all_threads(svc); + RETURN(rc); } int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait) @@ -2583,9 +2710,9 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait) LASSERT(svcpt != NULL); - CDEBUG(D_RPCTRACE, "%s started %d min %d max %d\n", - svc->srv_name, svcpt->scp_nthrs_running, - svc->srv_threads_min, svc->srv_threads_max); + CDEBUG(D_RPCTRACE, "%s[%d] started %d min %d max %d\n", + svc->srv_name, svcpt->scp_cpt, svcpt->scp_nthrs_running, + svc->srv_nthrs_cpt_init, svc->srv_nthrs_cpt_limit); again: if (unlikely(svc->srv_is_stopping)) @@ -2593,10 +2720,10 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait) if (!ptlrpc_threads_increasable(svcpt) || (OBD_FAIL_CHECK(OBD_FAIL_TGT_TOOMANY_THREADS) && - svcpt->scp_nthrs_running == svc->srv_threads_min - 1)) + svcpt->scp_nthrs_running == svc->srv_nthrs_cpt_init - 1)) RETURN(-EMFILE); - OBD_ALLOC_PTR(thread); + OBD_CPT_ALLOC_PTR(thread, svc->srv_cptable, svcpt->scp_cpt); if (thread == NULL) RETURN(-ENOMEM); cfs_waitq_init(&thread->t_ctl_waitq); @@ -2634,8 +2761,13 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait) cfs_list_add(&thread->t_link, &svcpt->scp_threads); cfs_spin_unlock(&svcpt->scp_lock); - snprintf(thread->t_name, PTLRPC_THR_NAME_LEN, - "%s_%02d", svc->srv_thread_name, thread->t_id); + if (svcpt->scp_cpt >= 0) { + snprintf(thread->t_name, PTLRPC_THR_NAME_LEN, "%s%02d_%03d", + svc->srv_thread_name, svcpt->scp_cpt, thread->t_id); + } else { + snprintf(thread->t_name, PTLRPC_THR_NAME_LEN, "%s_%04d", + svc->srv_thread_name, thread->t_id); + } CDEBUG(D_RPCTRACE, "starting thread '%s'\n", thread->t_name); /* @@ -2734,16 +2866,14 @@ static void ptlrpc_wait_replies(struct ptlrpc_service_part *svcpt) static void ptlrpc_service_del_atimer(struct ptlrpc_service *svc) { - struct ptlrpc_service_part *svcpt; + struct ptlrpc_service_part *svcpt; + int i; /* early disarm AT timer... */ - do { /* iterrate over multiple partitions in the future */ - svcpt = svc->srv_part; - if (svcpt == NULL || svcpt->scp_service == NULL) - break; - - cfs_timer_disarm(&svcpt->scp_at_timer); - } while (0); + ptlrpc_service_for_each_part(svcpt, i, svc) { + if (svcpt->scp_service != NULL) + cfs_timer_disarm(&svcpt->scp_at_timer); + } } static void @@ -2753,17 +2883,17 @@ ptlrpc_service_unlink_rqbd(struct ptlrpc_service *svc) struct ptlrpc_request_buffer_desc *rqbd; struct l_wait_info lwi; int rc; + int i; - /* All history will be culled when the next request buffer is + /* All history will be culled when the next request buffer is * freed in ptlrpc_service_purge_all() */ - svc->srv_max_history_rqbds = 0; + svc->srv_hist_nrqbds_cpt_max = 0; rc = LNetClearLazyPortal(svc->srv_req_portal); LASSERT(rc == 0); - do { /* iterrate over multiple partitions in the future */ - svcpt = svc->srv_part; - if (svcpt == NULL || svcpt->scp_service == NULL) + ptlrpc_service_for_each_part(svcpt, i, svc) { + if (svcpt->scp_service == NULL) break; /* Unlink all the request buffers. This forces a 'final' @@ -2773,11 +2903,10 @@ ptlrpc_service_unlink_rqbd(struct ptlrpc_service *svc) rc = LNetMDUnlink(rqbd->rqbd_md_h); LASSERT(rc == 0 || rc == -ENOENT); } - } while (0); + } - do { /* iterrate over multiple partitions in the future */ - svcpt = svc->srv_part; - if (svcpt == NULL || svcpt->scp_service == NULL) + ptlrpc_service_for_each_part(svcpt, i, svc) { + if (svcpt->scp_service == NULL) break; /* Wait for the network to release any buffers @@ -2801,7 +2930,7 @@ ptlrpc_service_unlink_rqbd(struct ptlrpc_service *svc) cfs_spin_lock(&svcpt->scp_lock); } cfs_spin_unlock(&svcpt->scp_lock); - } while (0); + } } static void @@ -2811,11 +2940,10 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc) struct ptlrpc_request_buffer_desc *rqbd; struct ptlrpc_request *req; struct ptlrpc_reply_state *rs; + int i; - do { /* iterrate over multiple partitions in the future */ - /* schedule all outstanding replies to terminate them */ - svcpt = svc->srv_part; - if (svcpt == NULL || svcpt->scp_service == NULL) + ptlrpc_service_for_each_part(svcpt, i, svc) { + if (svcpt->scp_service == NULL) break; cfs_spin_lock(&svcpt->scp_rep_lock); @@ -2874,7 +3002,7 @@ ptlrpc_service_purge_all(struct ptlrpc_service *svc) cfs_list_del(&rs->rs_list); OBD_FREE_LARGE(rs, svc->srv_max_reply_size); } - } while (0); + } } static void @@ -2882,10 +3010,10 @@ ptlrpc_service_free(struct ptlrpc_service *svc) { struct ptlrpc_service_part *svcpt; struct ptlrpc_at_array *array; + int i; - do { /* iterrate over multiple partitions in the future */ - svcpt = svc->srv_part; - if (svcpt == NULL || svcpt->scp_service == NULL) + ptlrpc_service_for_each_part(svcpt, i, svc) { + if (svcpt->scp_service == NULL) break; /* In case somebody rearmed this in the meantime */ @@ -2903,16 +3031,16 @@ ptlrpc_service_free(struct ptlrpc_service *svc) sizeof(__u32) * array->paa_size); array->paa_reqs_count = NULL; } - svcpt->scp_service = NULL; - } while (0); + } + + ptlrpc_service_for_each_part(svcpt, i, svc) + OBD_FREE_PTR(svcpt); - do { /* iterrate over multiple partitions in the future */ - svcpt = svc->srv_part; - if (svcpt != NULL) - OBD_FREE_PTR(svcpt); - } while (0); + if (svc->srv_cpts != NULL) + cfs_expr_list_values_free(svc->srv_cpts, svc->srv_ncpts); - OBD_FREE_PTR(svc); + OBD_FREE(svc, offsetof(struct ptlrpc_service, + srv_parts[svc->srv_ncpts])); } int ptlrpc_unregister_service(struct ptlrpc_service *service) @@ -2945,19 +3073,14 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) * Right now, it just checks to make sure that requests aren't languishing * in the queue. We'll use this health check to govern whether a node needs * to be shot, so it's intentionally non-aggressive. */ -int ptlrpc_service_health_check(struct ptlrpc_service *svc) +int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt) { - struct ptlrpc_service_part *svcpt; struct ptlrpc_request *request; struct timeval right_now; long timediff; - if (svc == NULL || svc->srv_part == NULL) - return 0; - cfs_gettimeofday(&right_now); - svcpt = svc->srv_part; cfs_spin_lock(&svcpt->scp_req_lock); if (!ptlrpc_server_request_pending(svcpt, 1)) { cfs_spin_unlock(&svcpt->scp_req_lock); @@ -2985,3 +3108,21 @@ int ptlrpc_service_health_check(struct ptlrpc_service *svc) return 0; } + +int +ptlrpc_service_health_check(struct ptlrpc_service *svc) +{ + struct ptlrpc_service_part *svcpt; + int i; + + if (svc == NULL || svc->srv_parts == NULL) + return 0; + + ptlrpc_service_for_each_part(svcpt, i, svc) { + int rc = ptlrpc_svcpt_health_check(svcpt); + + if (rc != 0) + return rc; + } + return 0; +}