From 8bbd62a7c0d2fc48d8f11e78d92bb42809968bba Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Tue, 19 Jun 2012 15:56:45 +0800 Subject: [PATCH] LU-56 ptlrpc: CPT affinity ptlrpc RS handlers This patch covered a couple of things: - reimplement RS handler by using CPU partition APIs - Instead of always round-robin choose RS handler thread, this patch directly choose RS handler thread on partition of rs::rs_svcpt Signed-off-by: Liang Zhen Change-Id: I5fdebb116630d073d41b39fc4271c4cebb429965 Reviewed-on: http://review.whamcloud.com/3135 Reviewed-by: wangdi Tested-by: Hudson Reviewed-by: Andreas Dilger Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/ptlrpc/service.c | 386 ++++++++++++++++++++++++++---------------------- 1 file changed, 208 insertions(+), 178 deletions(-) diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index d601924..b07dbe8 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -178,22 +178,44 @@ ptlrpc_save_lock(struct ptlrpc_request *req, #ifdef __KERNEL__ -#define HRT_RUNNING 0 -#define HRT_STOPPING 1 +struct ptlrpc_hr_partition; struct ptlrpc_hr_thread { - cfs_spinlock_t hrt_lock; - unsigned long hrt_flags; - cfs_waitq_t hrt_wait; - cfs_list_t hrt_queue; - cfs_completion_t hrt_completion; + int hrt_id; /* thread ID */ + cfs_spinlock_t hrt_lock; + cfs_waitq_t hrt_waitq; + cfs_list_t hrt_queue; /* RS queue */ + struct ptlrpc_hr_partition *hrt_partition; +}; + +struct ptlrpc_hr_partition { + /* # of started threads */ + cfs_atomic_t hrp_nstarted; + /* # of stopped threads */ + cfs_atomic_t hrp_nstopped; + /* cpu partition id */ + int hrp_cpt; + /* round-robin rotor for choosing thread */ + int hrp_rotor; + /* total number of threads on this partition */ + int hrp_nthrs; + /* threads table */ + struct ptlrpc_hr_thread *hrp_thrs; }; +#define HRT_RUNNING 0 +#define HRT_STOPPING 1 + struct ptlrpc_hr_service { - int hr_index; - int hr_n_threads; - int hr_size; - struct ptlrpc_hr_thread hr_threads[0]; + /* CPU partition table, it's just cfs_cpt_table for now */ + struct cfs_cpt_table *hr_cpt_table; + /** controller sleep waitq */ + cfs_waitq_t hr_waitq; + unsigned int hr_stopping; + /** roundrobin rotor for non-affinity service */ + unsigned int hr_rotor; + /* partition data */ + struct ptlrpc_hr_partition **hr_partitions; }; struct rs_batch { @@ -202,10 +224,8 @@ struct rs_batch { struct ptlrpc_service_part *rsb_svcpt; }; -/** - * A pointer to per-node reply handling service. - */ -static struct ptlrpc_hr_service *ptlrpc_hr = NULL; +/** reply handling service. */ +static struct ptlrpc_hr_service ptlrpc_hr; /** * maximum mumber of replies scheduled in one batch @@ -226,17 +246,26 @@ static void rs_batch_init(struct rs_batch *b) /** * Choose an hr thread to dispatch requests to. */ -static unsigned int get_hr_thread_index(struct ptlrpc_hr_service *hr) +static struct ptlrpc_hr_thread * +ptlrpc_hr_select(struct ptlrpc_service_part *svcpt) { - unsigned int idx; + struct ptlrpc_hr_partition *hrp; + unsigned int rotor; + + if (svcpt->scp_cpt >= 0 && + svcpt->scp_service->srv_cptable == ptlrpc_hr.hr_cpt_table) { + /* directly match partition */ + hrp = ptlrpc_hr.hr_partitions[svcpt->scp_cpt]; - /* Concurrent modification of hr_index w/o any spinlock - protection is harmless as long as the result fits - [0..(hr_n_threads-1)] range and each thread gets near equal - load. */ - idx = hr->hr_index; - hr->hr_index = (idx >= hr->hr_n_threads - 1) ? 0 : idx + 1; - return idx; + } else { + rotor = ptlrpc_hr.hr_rotor++; + rotor %= cfs_cpt_number(ptlrpc_hr.hr_cpt_table); + + hrp = ptlrpc_hr.hr_partitions[rotor]; + } + + rotor = hrp->hrp_rotor++; + return &hrp->hrp_thrs[rotor % hrp->hrp_nthrs]; } /** @@ -247,19 +276,18 @@ static unsigned int get_hr_thread_index(struct ptlrpc_hr_service *hr) */ static void rs_batch_dispatch(struct rs_batch *b) { - if (b->rsb_n_replies != 0) { - struct ptlrpc_hr_service *hr = ptlrpc_hr; - int idx; + if (b->rsb_n_replies != 0) { + struct ptlrpc_hr_thread *hrt; - idx = get_hr_thread_index(hr); + hrt = ptlrpc_hr_select(b->rsb_svcpt); - cfs_spin_lock(&hr->hr_threads[idx].hrt_lock); - cfs_list_splice_init(&b->rsb_replies, - &hr->hr_threads[idx].hrt_queue); - cfs_spin_unlock(&hr->hr_threads[idx].hrt_lock); - cfs_waitq_signal(&hr->hr_threads[idx].hrt_wait); - b->rsb_n_replies = 0; - } + cfs_spin_lock(&hrt->hrt_lock); + cfs_list_splice_init(&b->rsb_replies, &hrt->hrt_queue); + cfs_spin_unlock(&hrt->hrt_lock); + + cfs_waitq_signal(&hrt->hrt_waitq); + b->rsb_n_replies = 0; + } } /** @@ -325,18 +353,19 @@ static void rs_batch_fini(struct rs_batch *b) void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs) { #ifdef __KERNEL__ - struct ptlrpc_hr_service *hr = ptlrpc_hr; - int idx; - ENTRY; + struct ptlrpc_hr_thread *hrt; + ENTRY; - LASSERT(cfs_list_empty(&rs->rs_list)); + LASSERT(cfs_list_empty(&rs->rs_list)); - idx = get_hr_thread_index(hr); - cfs_spin_lock(&hr->hr_threads[idx].hrt_lock); - cfs_list_add_tail(&rs->rs_list, &hr->hr_threads[idx].hrt_queue); - cfs_spin_unlock(&hr->hr_threads[idx].hrt_lock); - cfs_waitq_signal(&hr->hr_threads[idx].hrt_wait); - EXIT; + hrt = ptlrpc_hr_select(rs->rs_svcpt); + + cfs_spin_lock(&hrt->hrt_lock); + cfs_list_add_tail(&rs->rs_list, &hrt->hrt_queue); + cfs_spin_unlock(&hrt->hrt_lock); + + cfs_waitq_signal(&hrt->hrt_waitq); + EXIT; #else cfs_list_add_tail(&rs->rs_list, &rs->rs_svcpt->scp_rep_queue); #endif @@ -2461,23 +2490,18 @@ out: return rc; } -struct ptlrpc_hr_args { - int thread_index; - int cpu_index; - struct ptlrpc_hr_service *hrs; -}; - -static int hrt_dont_sleep(struct ptlrpc_hr_thread *t, - cfs_list_t *replies) +static int hrt_dont_sleep(struct ptlrpc_hr_thread *hrt, + cfs_list_t *replies) { - int result; + int result; - cfs_spin_lock(&t->hrt_lock); - cfs_list_splice_init(&t->hrt_queue, replies); - result = cfs_test_bit(HRT_STOPPING, &t->hrt_flags) || - !cfs_list_empty(replies); - cfs_spin_unlock(&t->hrt_lock); - return result; + cfs_spin_lock(&hrt->hrt_lock); + + cfs_list_splice_init(&hrt->hrt_queue, replies); + result = ptlrpc_hr.hr_stopping || !cfs_list_empty(replies); + + cfs_spin_unlock(&hrt->hrt_lock); + return result; } /** @@ -2486,26 +2510,28 @@ static int hrt_dont_sleep(struct ptlrpc_hr_thread *t, */ static int ptlrpc_hr_main(void *arg) { - struct ptlrpc_hr_args * hr_args = arg; - struct ptlrpc_hr_service *hr = hr_args->hrs; - struct ptlrpc_hr_thread *t = &hr->hr_threads[hr_args->thread_index]; - char threadname[20]; - CFS_LIST_HEAD(replies); + struct ptlrpc_hr_thread *hrt = (struct ptlrpc_hr_thread *)arg; + struct ptlrpc_hr_partition *hrp = hrt->hrt_partition; + CFS_LIST_HEAD (replies); + char threadname[20]; + int rc; - snprintf(threadname, sizeof(threadname), - "ptlrpc_hr_%d", hr_args->thread_index); + snprintf(threadname, sizeof(threadname), "ptlrpc_hr%02d_%03d", + hrp->hrp_cpt, hrt->hrt_id); + cfs_daemonize_ctxt(threadname); - cfs_daemonize_ctxt(threadname); -#if defined(CONFIG_NUMA) && defined(HAVE_NODE_TO_CPUMASK) - cfs_set_cpus_allowed(cfs_current(), - node_to_cpumask(cpu_to_node(hr_args->cpu_index))); -#endif - cfs_set_bit(HRT_RUNNING, &t->hrt_flags); - cfs_waitq_signal(&t->hrt_wait); + rc = cfs_cpt_bind(ptlrpc_hr.hr_cpt_table, hrp->hrp_cpt); + if (rc != 0) { + CWARN("Failed to bind %s on CPT %d of CPT table %p: rc = %d\n", + threadname, hrp->hrp_cpt, ptlrpc_hr.hr_cpt_table, rc); + } + + cfs_atomic_inc(&hrp->hrp_nstarted); + cfs_waitq_signal(&ptlrpc_hr.hr_waitq); - while (!cfs_test_bit(HRT_STOPPING, &t->hrt_flags)) { + while (!ptlrpc_hr.hr_stopping) { + l_wait_condition(hrt->hrt_waitq, hrt_dont_sleep(hrt, &replies)); - l_wait_condition(t->hrt_wait, hrt_dont_sleep(t, &replies)); while (!cfs_list_empty(&replies)) { struct ptlrpc_reply_state *rs; @@ -2517,89 +2543,64 @@ static int ptlrpc_hr_main(void *arg) } } - cfs_clear_bit(HRT_RUNNING, &t->hrt_flags); - cfs_complete(&t->hrt_completion); + cfs_atomic_inc(&hrp->hrp_nstopped); + cfs_waitq_signal(&ptlrpc_hr.hr_waitq); - return 0; + return 0; } -static int ptlrpc_start_hr_thread(struct ptlrpc_hr_service *hr, int n, int cpu) +static void ptlrpc_stop_hr_threads(void) { - struct ptlrpc_hr_thread *t = &hr->hr_threads[n]; - struct ptlrpc_hr_args args; - int rc; - ENTRY; - - args.thread_index = n; - args.cpu_index = cpu; - args.hrs = hr; - - rc = cfs_create_thread(ptlrpc_hr_main, (void*)&args, CFS_DAEMON_FLAGS); - if (rc < 0) { - cfs_complete(&t->hrt_completion); - GOTO(out, rc); - } - l_wait_condition(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags)); - RETURN(0); - out: - return rc; -} + struct ptlrpc_hr_partition *hrp; + int i; + int j; -static void ptlrpc_stop_hr_thread(struct ptlrpc_hr_thread *t) -{ - ENTRY; + ptlrpc_hr.hr_stopping = 1; - cfs_set_bit(HRT_STOPPING, &t->hrt_flags); - cfs_waitq_signal(&t->hrt_wait); - cfs_wait_for_completion(&t->hrt_completion); + cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) { + if (hrp->hrp_thrs == NULL) + continue; /* uninitialized */ + for (j = 0; j < hrp->hrp_nthrs; j++) + cfs_waitq_broadcast(&hrp->hrp_thrs[j].hrt_waitq); + } - EXIT; + cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) { + if (hrp->hrp_thrs == NULL) + continue; /* uninitialized */ + cfs_wait_event(ptlrpc_hr.hr_waitq, + cfs_atomic_read(&hrp->hrp_nstopped) == + cfs_atomic_read(&hrp->hrp_nstarted)); + } } -static void ptlrpc_stop_hr_threads(struct ptlrpc_hr_service *hrs) +static int ptlrpc_start_hr_threads(void) { - int n; - ENTRY; - - for (n = 0; n < hrs->hr_n_threads; n++) - ptlrpc_stop_hr_thread(&hrs->hr_threads[n]); - - EXIT; -} + struct ptlrpc_hr_partition *hrp; + int i; + int j; + ENTRY; -static int ptlrpc_start_hr_threads(struct ptlrpc_hr_service *hr) -{ - int rc = -ENOMEM; - int n, cpu, threads_started = 0; - ENTRY; + cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) { + int rc = 0; - LASSERT(hr != NULL); - LASSERT(hr->hr_n_threads > 0); + for (j = 0; j < hrp->hrp_nthrs; j++) { + rc = cfs_create_thread(ptlrpc_hr_main, + &hrp->hrp_thrs[j], + CLONE_VM | CLONE_FILES); + if (rc < 0) + break; + } + cfs_wait_event(ptlrpc_hr.hr_waitq, + cfs_atomic_read(&hrp->hrp_nstarted) == j); + if (rc >= 0) + continue; - for (n = 0, cpu = 0; n < hr->hr_n_threads; n++) { -#if defined(CONFIG_SMP) && defined(HAVE_NODE_TO_CPUMASK) - while (!cpu_online(cpu)) { - cpu++; - if (cpu >= cfs_num_possible_cpus()) - cpu = 0; - } -#endif - rc = ptlrpc_start_hr_thread(hr, n, cpu); - if (rc != 0) - break; - threads_started++; - cpu++; - } - if (threads_started == 0) { - CERROR("No reply handling threads started\n"); - RETURN(-ESRCH); - } - if (threads_started < hr->hr_n_threads) { - CWARN("Started only %d reply handling threads from %d\n", - threads_started, hr->hr_n_threads); - hr->hr_n_threads = threads_started; - } - RETURN(0); + CERROR("Reply handling thread %d:%d Failed on starting: " + "rc = %d\n", i, j, rc); + ptlrpc_stop_hr_threads(); + RETURN(rc); + } + RETURN(0); } static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt) @@ -2800,46 +2801,75 @@ int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait) int ptlrpc_hr_init(void) { - int i; - int n_cpus = cfs_num_online_cpus(); - struct ptlrpc_hr_service *hr; - int size; - int rc; - ENTRY; + struct ptlrpc_hr_partition *hrp; + struct ptlrpc_hr_thread *hrt; + int rc; + int i; + int j; + ENTRY; - LASSERT(ptlrpc_hr == NULL); + memset(&ptlrpc_hr, 0, sizeof(ptlrpc_hr)); + ptlrpc_hr.hr_cpt_table = cfs_cpt_table; - size = offsetof(struct ptlrpc_hr_service, hr_threads[n_cpus]); - OBD_ALLOC(hr, size); - if (hr == NULL) - RETURN(-ENOMEM); - for (i = 0; i < n_cpus; i++) { - struct ptlrpc_hr_thread *t = &hr->hr_threads[i]; + ptlrpc_hr.hr_partitions = cfs_percpt_alloc(ptlrpc_hr.hr_cpt_table, + sizeof(*hrp)); + if (ptlrpc_hr.hr_partitions == NULL) + RETURN(-ENOMEM); - cfs_spin_lock_init(&t->hrt_lock); - cfs_waitq_init(&t->hrt_wait); - CFS_INIT_LIST_HEAD(&t->hrt_queue); - cfs_init_completion(&t->hrt_completion); - } - hr->hr_n_threads = n_cpus; - hr->hr_size = size; - ptlrpc_hr = hr; + cfs_waitq_init(&ptlrpc_hr.hr_waitq); - rc = ptlrpc_start_hr_threads(hr); - if (rc) { - OBD_FREE(hr, hr->hr_size); - ptlrpc_hr = NULL; - } - RETURN(rc); + cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) { + hrp->hrp_cpt = i; + + cfs_atomic_set(&hrp->hrp_nstarted, 0); + cfs_atomic_set(&hrp->hrp_nstopped, 0); + + hrp->hrp_nthrs = cfs_cpt_weight(ptlrpc_hr.hr_cpt_table, i); + hrp->hrp_nthrs /= cfs_cpu_ht_nsiblings(0); + + LASSERT(hrp->hrp_nthrs > 0); + OBD_CPT_ALLOC(hrp->hrp_thrs, ptlrpc_hr.hr_cpt_table, i, + hrp->hrp_nthrs * sizeof(*hrt)); + if (hrp->hrp_thrs == NULL) + GOTO(out, rc = -ENOMEM); + + for (j = 0; j < hrp->hrp_nthrs; j++) { + hrt = &hrp->hrp_thrs[j]; + + hrt->hrt_id = j; + hrt->hrt_partition = hrp; + cfs_waitq_init(&hrt->hrt_waitq); + cfs_spin_lock_init(&hrt->hrt_lock); + CFS_INIT_LIST_HEAD(&hrt->hrt_queue); + } + } + + rc = ptlrpc_start_hr_threads(); +out: + if (rc != 0) + ptlrpc_hr_fini(); + RETURN(rc); } void ptlrpc_hr_fini(void) { - if (ptlrpc_hr != NULL) { - ptlrpc_stop_hr_threads(ptlrpc_hr); - OBD_FREE(ptlrpc_hr, ptlrpc_hr->hr_size); - ptlrpc_hr = NULL; - } + struct ptlrpc_hr_partition *hrp; + int i; + + if (ptlrpc_hr.hr_partitions == NULL) + return; + + ptlrpc_stop_hr_threads(); + + cfs_percpt_for_each(hrp, i, ptlrpc_hr.hr_partitions) { + if (hrp->hrp_thrs != NULL) { + OBD_FREE(hrp->hrp_thrs, + hrp->hrp_nthrs * sizeof(hrp->hrp_thrs[0])); + } + } + + cfs_percpt_free(ptlrpc_hr.hr_partitions); + ptlrpc_hr.hr_partitions = NULL; } #endif /* __KERNEL__ */ -- 1.8.3.1