From: Liang Zhen Date: Mon, 14 May 2012 07:27:41 +0000 (+0800) Subject: LU-56 libcfs: CPT affinity workitem scheduler X-Git-Tag: 2.2.59~29 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=c48a869557fe7663f4f3370b130d4c248958180e LU-56 libcfs: CPT affinity workitem scheduler this patch covered multipled changes: - flexible APIs for creating WI schedulers a) therioticall user can create any number of WI schedulers, each scheduler can have its own threads pool b) user can create CPT affinity WI schedulers for each CPT, it's reserved for LNet selftest. - rehashing and LNet selftest will not share WI schedulers anymore - libcfs will only start a WI scheduler with small number of threads for cfs_hash rehashing - LNet selftest will create its own schedulers on starting of module, and destroy schedulers on shutting down of module Signed-off-by: Liang Zhen Change-Id: Idf66a83817fe847ed29e052e0ddc2a4fed498f1a Reviewed-on: http://review.whamcloud.com/2729 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Bobi Jam Reviewed-by: Doug Oucharek Reviewed-by: Oleg Drokin --- diff --git a/libcfs/include/libcfs/libcfs_workitem.h b/libcfs/include/libcfs/libcfs_workitem.h index 29bbd6d..42c5289 100644 --- a/libcfs/include/libcfs/libcfs_workitem.h +++ b/libcfs/include/libcfs/libcfs_workitem.h @@ -62,6 +62,12 @@ #ifndef __LIBCFS_WORKITEM_H__ #define __LIBCFS_WORKITEM_H__ +struct cfs_wi_sched; + +void cfs_wi_sched_destroy(struct cfs_wi_sched *); +int cfs_wi_sched_create(char *name, struct cfs_cpt_table *cptab, int cpt, + int nthrs, struct cfs_wi_sched **); + struct cfs_workitem; typedef int (*cfs_wi_action_t) (struct cfs_workitem *); @@ -72,37 +78,27 @@ typedef struct cfs_workitem { cfs_wi_action_t wi_action; /** arg for working function */ void *wi_data; - /** scheduler id, can be negative */ - short wi_sched_id; /** in running */ unsigned short wi_running:1; /** scheduled */ unsigned short wi_scheduled:1; } cfs_workitem_t; -/** - * positive values are reserved as CPU id of future implementation of - * per-cpu scheduler, so user can "bind" workitem on specific CPU. - */ -#define CFS_WI_SCHED_ANY (-1) -#define CFS_WI_SCHED_SERIAL (-2) - static inline void -cfs_wi_init(cfs_workitem_t *wi, void *data, - cfs_wi_action_t action, short sched_id) +cfs_wi_init(cfs_workitem_t *wi, void *data, cfs_wi_action_t action) { CFS_INIT_LIST_HEAD(&wi->wi_list); - wi->wi_sched_id = sched_id; wi->wi_running = 0; wi->wi_scheduled = 0; wi->wi_data = data; wi->wi_action = action; } -void cfs_wi_exit(cfs_workitem_t *wi); -int cfs_wi_cancel(cfs_workitem_t *wi); -void cfs_wi_schedule(cfs_workitem_t *wi); +void cfs_wi_schedule(struct cfs_wi_sched *sched, cfs_workitem_t *wi); +int cfs_wi_deschedule(struct cfs_wi_sched *sched, cfs_workitem_t *wi); +void cfs_wi_exit(struct cfs_wi_sched *sched, cfs_workitem_t *wi); + int cfs_wi_startup(void); void cfs_wi_shutdown(void); diff --git a/libcfs/libcfs/hash.c b/libcfs/libcfs/hash.c index d3df598..cb260a4 100644 --- a/libcfs/libcfs/hash.c +++ b/libcfs/libcfs/hash.c @@ -115,6 +115,8 @@ CFS_MODULE_PARM(warn_on_depth, "i", uint, 0644, "warning when hash depth is high."); #endif +struct cfs_wi_sched *cfs_sched_rehash; + static inline void cfs_hash_nl_lock(cfs_hash_lock_t *lock, int exclusive) {} @@ -511,7 +513,7 @@ cfs_hash_bd_dep_record(cfs_hash_t *hs, cfs_hash_bd_t *bd, int dep_cur) hs->hs_dep_bits = hs->hs_cur_bits; cfs_spin_unlock(&hs->hs_dep_lock); - cfs_wi_schedule(&hs->hs_dep_wi); + cfs_wi_schedule(cfs_sched_rehash, &hs->hs_dep_wi); # endif } @@ -981,14 +983,13 @@ static int cfs_hash_dep_print(cfs_workitem_t *wi) static void cfs_hash_depth_wi_init(cfs_hash_t *hs) { - cfs_spin_lock_init(&hs->hs_dep_lock); - cfs_wi_init(&hs->hs_dep_wi, hs, - cfs_hash_dep_print, CFS_WI_SCHED_ANY); + cfs_spin_lock_init(&hs->hs_dep_lock); + cfs_wi_init(&hs->hs_dep_wi, hs, cfs_hash_dep_print); } static void cfs_hash_depth_wi_cancel(cfs_hash_t *hs) { - if (cfs_wi_cancel(&hs->hs_dep_wi)) + if (cfs_wi_deschedule(cfs_sched_rehash, &hs->hs_dep_wi)) return; cfs_spin_lock(&hs->hs_dep_lock); @@ -1065,8 +1066,7 @@ cfs_hash_create(char *name, unsigned cur_bits, unsigned max_bits, hs->hs_ops = ops; hs->hs_extra_bytes = extra_bytes; hs->hs_rehash_bits = 0; - cfs_wi_init(&hs->hs_rehash_wi, hs, - cfs_hash_rehash_worker, CFS_WI_SCHED_ANY); + cfs_wi_init(&hs->hs_rehash_wi, hs, cfs_hash_rehash_worker); cfs_hash_depth_wi_init(hs); if (cfs_hash_with_rehash(hs)) @@ -1784,7 +1784,7 @@ cfs_hash_rehash_cancel_locked(cfs_hash_t *hs) if (!cfs_hash_is_rehashing(hs)) return; - if (cfs_wi_cancel(&hs->hs_rehash_wi)) { + if (cfs_wi_deschedule(cfs_sched_rehash, &hs->hs_rehash_wi)) { hs->hs_rehash_bits = 0; return; } @@ -1828,7 +1828,7 @@ cfs_hash_rehash(cfs_hash_t *hs, int do_rehash) hs->hs_rehash_bits = rc; if (!do_rehash) { /* launch and return */ - cfs_wi_schedule(&hs->hs_rehash_wi); + cfs_wi_schedule(cfs_sched_rehash, &hs->hs_rehash_wi); cfs_hash_unlock(hs, 1); return 0; } @@ -1958,8 +1958,8 @@ cfs_hash_rehash_worker(cfs_workitem_t *wi) hs->hs_cur_bits = hs->hs_rehash_bits; out: hs->hs_rehash_bits = 0; - if (rc == -ESRCH) - cfs_wi_exit(wi); /* never be scheduled again */ + if (rc == -ESRCH) /* never be scheduled again */ + cfs_wi_exit(cfs_sched_rehash, wi); bsize = cfs_hash_bkt_size(hs); cfs_hash_unlock(hs, 1); /* can't refer to @hs anymore because it could be destroyed */ @@ -1967,8 +1967,8 @@ cfs_hash_rehash_worker(cfs_workitem_t *wi) cfs_hash_buckets_free(bkts, bsize, new_size, old_size); if (rc != 0) CDEBUG(D_INFO, "early quit of of rehashing: %d\n", rc); - /* cfs_workitem require us to always return 0 */ - return 0; + /* return 1 only if cfs_wi_exit is called */ + return rc == -ESRCH; } /** diff --git a/libcfs/libcfs/module.c b/libcfs/libcfs/module.c index c9763c7..6640ab1 100644 --- a/libcfs/libcfs/module.c +++ b/libcfs/libcfs/module.c @@ -366,6 +366,7 @@ MODULE_LICENSE("GPL"); extern cfs_psdev_t libcfs_dev; extern cfs_rw_semaphore_t cfs_tracefile_sem; extern cfs_mutex_t cfs_trace_thread_mutex; +extern struct cfs_wi_sched *cfs_sched_rehash; extern void libcfs_init_nidstrings(void); extern int libcfs_arch_init(void); @@ -406,11 +407,20 @@ static int init_libcfs_module(void) goto cleanup_lwt; } - rc = cfs_wi_startup(); - if (rc) { - CERROR("startup workitem: error %d\n", rc); - goto cleanup_deregister; - } + rc = cfs_wi_startup(); + if (rc) { + CERROR("initialize workitem: error %d\n", rc); + goto cleanup_deregister; + } + + /* max to 4 threads, should be enough for rehash */ + rc = min(cfs_cpt_weight(cfs_cpt_table, CFS_CPT_ANY), 4); + rc = cfs_wi_sched_create("cfs_rh", cfs_cpt_table, CFS_CPT_ANY, + rc, &cfs_sched_rehash); + if (rc != 0) { + CERROR("Startup workitem scheduler: error: %d\n", rc); + goto cleanup_deregister; + } rc = insert_proc(); if (rc) { @@ -443,7 +453,13 @@ static void exit_libcfs_module(void) CDEBUG(D_MALLOC, "before Portals cleanup: kmem %d\n", cfs_atomic_read(&libcfs_kmemory)); - cfs_wi_shutdown(); + if (cfs_sched_rehash != NULL) { + cfs_wi_sched_destroy(cfs_sched_rehash); + cfs_sched_rehash = NULL; + } + + cfs_wi_shutdown(); + rc = cfs_psdev_deregister(&libcfs_dev); if (rc) CERROR("misc_deregister error %d\n", rc); diff --git a/libcfs/libcfs/workitem.c b/libcfs/libcfs/workitem.c index b2ef0d6..5bcab3f 100644 --- a/libcfs/libcfs/workitem.c +++ b/libcfs/libcfs/workitem.c @@ -43,61 +43,51 @@ #include +#define CFS_WS_NAME_LEN 16 + typedef struct cfs_wi_sched { + cfs_list_t ws_list; /* chain on global list */ #ifdef __KERNEL__ - /** serialised workitems */ - cfs_spinlock_t ws_lock; - /** where schedulers sleep */ - cfs_waitq_t ws_waitq; + /** serialised workitems */ + cfs_spinlock_t ws_lock; + /** where schedulers sleep */ + cfs_waitq_t ws_waitq; #endif - /** concurrent workitems */ - cfs_list_t ws_runq; - /** rescheduled running-workitems */ - cfs_list_t ws_rerunq; - /** shutting down */ - int ws_shuttingdown; + /** concurrent workitems */ + cfs_list_t ws_runq; + /** rescheduled running-workitems, a workitem can be rescheduled + * while running in wi_action(), but we don't to execute it again + * unless it returns from wi_action(), so we put it on ws_rerunq + * while rescheduling, and move it to runq after it returns + * from wi_action() */ + cfs_list_t ws_rerunq; + /** CPT-table for this scheduler */ + struct cfs_cpt_table *ws_cptab; + /** CPT id for affinity */ + int ws_cpt; + /** number of scheduled workitems */ + int ws_nscheduled; + /** started scheduler thread, protected by cfs_wi_data::wi_glock */ + unsigned int ws_nthreads:30; + /** shutting down, protected by cfs_wi_data::wi_glock */ + unsigned int ws_stopping:1; + /** serialize starting thread, protected by cfs_wi_data::wi_glock */ + unsigned int ws_starting:1; + /** scheduler name */ + char ws_name[CFS_WS_NAME_LEN]; } cfs_wi_sched_t; -#ifdef __KERNEL__ -/** - * we have 2 cfs_wi_sched_t so far: - * one for CFS_WI_SCHED_ANY, another for CFS_WI_SCHED_SERIAL - * per-cpu implementation will be added for SMP scalability - */ - -#define CFS_WI_NSCHED 2 -#else -/** always 2 for userspace */ -#define CFS_WI_NSCHED 2 -#endif /* __KERNEL__ */ - struct cfs_workitem_data { - /** serialize */ - cfs_spinlock_t wi_glock; - /** number of cfs_wi_sched_t */ - int wi_nsched; - /** number of threads (all schedulers) */ - int wi_nthreads; - /** default scheduler */ - cfs_wi_sched_t *wi_scheds; + /** serialize */ + cfs_spinlock_t wi_glock; + /** list of all schedulers */ + cfs_list_t wi_scheds; + /** WI module is initialized */ + int wi_init; + /** shutting down the whole WI module */ + int wi_stopping; } cfs_wi_data; -static inline cfs_wi_sched_t * -cfs_wi_to_sched(cfs_workitem_t *wi) -{ - LASSERT(wi->wi_sched_id == CFS_WI_SCHED_ANY || - wi->wi_sched_id == CFS_WI_SCHED_SERIAL || - (wi->wi_sched_id >= 0 && - wi->wi_sched_id < cfs_wi_data.wi_nsched)); - - if (wi->wi_sched_id == CFS_WI_SCHED_ANY) - return &cfs_wi_data.wi_scheds[0]; - if (wi->wi_sched_id == CFS_WI_SCHED_SERIAL) - return &cfs_wi_data.wi_scheds[cfs_wi_data.wi_nsched - 1]; - - return &cfs_wi_data.wi_scheds[wi->wi_sched_id]; -} - #ifdef __KERNEL__ static inline void cfs_wi_sched_lock(cfs_wi_sched_t *sched) @@ -114,8 +104,8 @@ cfs_wi_sched_unlock(cfs_wi_sched_t *sched) static inline int cfs_wi_sched_cansleep(cfs_wi_sched_t *sched) { - cfs_wi_sched_lock(sched); - if (sched->ws_shuttingdown) { + cfs_wi_sched_lock(sched); + if (sched->ws_stopping) { cfs_wi_sched_unlock(sched); return 0; } @@ -128,7 +118,7 @@ cfs_wi_sched_cansleep(cfs_wi_sched_t *sched) return 1; } -#else +#else /* !__KERNEL__ */ static inline void cfs_wi_sched_lock(cfs_wi_sched_t *sched) @@ -142,61 +132,67 @@ cfs_wi_sched_unlock(cfs_wi_sched_t *sched) cfs_spin_unlock(&cfs_wi_data.wi_glock); } -#endif +#endif /* __KERNEL__ */ /* XXX: * 0. it only works when called from wi->wi_action. * 1. when it returns no one shall try to schedule the workitem. */ void -cfs_wi_exit(cfs_workitem_t *wi) +cfs_wi_exit(struct cfs_wi_sched *sched, cfs_workitem_t *wi) { - cfs_wi_sched_t *sched = cfs_wi_to_sched(wi); + LASSERT(!cfs_in_interrupt()); /* because we use plain spinlock */ + LASSERT(!sched->ws_stopping); - LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */ - LASSERT (!sched->ws_shuttingdown); - - cfs_wi_sched_lock(sched); + cfs_wi_sched_lock(sched); #ifdef __KERNEL__ - LASSERT (wi->wi_running); + LASSERT(wi->wi_running); #endif - if (wi->wi_scheduled) { /* cancel pending schedules */ - LASSERT (!cfs_list_empty(&wi->wi_list)); - cfs_list_del_init(&wi->wi_list); - } + if (wi->wi_scheduled) { /* cancel pending schedules */ + LASSERT(!cfs_list_empty(&wi->wi_list)); + cfs_list_del_init(&wi->wi_list); - LASSERT (cfs_list_empty(&wi->wi_list)); - wi->wi_scheduled = 1; /* LBUG future schedule attempts */ + LASSERT(sched->ws_nscheduled > 0); + sched->ws_nscheduled--; + } - cfs_wi_sched_unlock(sched); - return; + LASSERT(cfs_list_empty(&wi->wi_list)); + + wi->wi_scheduled = 1; /* LBUG future schedule attempts */ + cfs_wi_sched_unlock(sched); + + return; } -CFS_EXPORT_SYMBOL(cfs_wi_exit); +EXPORT_SYMBOL(cfs_wi_exit); /** - * cancel a workitem: + * cancel schedule request of workitem \a wi */ int -cfs_wi_cancel (cfs_workitem_t *wi) +cfs_wi_deschedule(struct cfs_wi_sched *sched, cfs_workitem_t *wi) { - cfs_wi_sched_t *sched = cfs_wi_to_sched(wi); - int rc; + int rc; - LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */ - LASSERT (!sched->ws_shuttingdown); + LASSERT(!cfs_in_interrupt()); /* because we use plain spinlock */ + LASSERT(!sched->ws_stopping); - cfs_wi_sched_lock(sched); /* * return 0 if it's running already, otherwise return 1, which * means the workitem will not be scheduled and will not have * any race with wi_action. */ - rc = !(wi->wi_running); + cfs_wi_sched_lock(sched); + + rc = !(wi->wi_running); + + if (wi->wi_scheduled) { /* cancel pending schedules */ + LASSERT(!cfs_list_empty(&wi->wi_list)); + cfs_list_del_init(&wi->wi_list); + + LASSERT(sched->ws_nscheduled > 0); + sched->ws_nscheduled--; - if (wi->wi_scheduled) { /* cancel pending schedules */ - LASSERT (!cfs_list_empty(&wi->wi_list)); - cfs_list_del_init(&wi->wi_list); wi->wi_scheduled = 0; } @@ -205,8 +201,7 @@ cfs_wi_cancel (cfs_workitem_t *wi) cfs_wi_sched_unlock(sched); return rc; } - -CFS_EXPORT_SYMBOL(cfs_wi_cancel); +EXPORT_SYMBOL(cfs_wi_deschedule); /* * Workitem scheduled with (serial == 1) is strictly serialised not only with @@ -216,12 +211,10 @@ CFS_EXPORT_SYMBOL(cfs_wi_cancel); * be added, and even dynamic creation of serialised queues might be supported. */ void -cfs_wi_schedule(cfs_workitem_t *wi) +cfs_wi_schedule(struct cfs_wi_sched *sched, cfs_workitem_t *wi) { - cfs_wi_sched_t *sched = cfs_wi_to_sched(wi); - - LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */ - LASSERT (!sched->ws_shuttingdown); + LASSERT(!cfs_in_interrupt()); /* because we use plain spinlock */ + LASSERT(!sched->ws_stopping); cfs_wi_sched_lock(sched); @@ -229,6 +222,7 @@ cfs_wi_schedule(cfs_workitem_t *wi) LASSERT (cfs_list_empty(&wi->wi_list)); wi->wi_scheduled = 1; + sched->ws_nscheduled++; if (!wi->wi_running) { cfs_list_add_tail(&wi->wi_list, &sched->ws_runq); #ifdef __KERNEL__ @@ -243,34 +237,42 @@ cfs_wi_schedule(cfs_workitem_t *wi) cfs_wi_sched_unlock(sched); return; } - -CFS_EXPORT_SYMBOL(cfs_wi_schedule); +EXPORT_SYMBOL(cfs_wi_schedule); #ifdef __KERNEL__ static int cfs_wi_scheduler (void *arg) { - int id = (int)(long_ptr_t) arg; - int serial = (id == -1); - char name[24]; - cfs_wi_sched_t *sched; - - if (serial) { - sched = &cfs_wi_data.wi_scheds[cfs_wi_data.wi_nsched - 1]; - cfs_daemonize("wi_serial_sd"); - } else { - /* will be sched = &cfs_wi_data.wi_scheds[id] in the future */ - sched = &cfs_wi_data.wi_scheds[0]; - snprintf(name, sizeof(name), "cfs_wi_sd%03d", id); - cfs_daemonize(name); - } + struct cfs_wi_sched *sched = (cfs_wi_sched_t *)arg; + char name[16]; - cfs_block_allsigs(); + if (sched->ws_cptab != NULL && sched->ws_cpt >= 0) { + snprintf(name, sizeof(name), "%s_%02d_%02d", + sched->ws_name, sched->ws_cpt, sched->ws_nthreads); + } else { + snprintf(name, sizeof(name), "%s_%02d", + sched->ws_name, sched->ws_nthreads); + } - cfs_wi_sched_lock(sched); + cfs_daemonize(name); + cfs_block_allsigs(); + + /* CPT affinity scheduler? */ + if (sched->ws_cptab != NULL) + cfs_cpt_bind(sched->ws_cptab, sched->ws_cpt); + + cfs_spin_lock(&cfs_wi_data.wi_glock); + + LASSERT(sched->ws_starting == 1); + sched->ws_starting--; + sched->ws_nthreads++; + + cfs_spin_unlock(&cfs_wi_data.wi_glock); - while (!sched->ws_shuttingdown) { + cfs_wi_sched_lock(sched); + + while (!sched->ws_stopping) { int nloops = 0; int rc; cfs_workitem_t *wi; @@ -279,12 +281,17 @@ cfs_wi_scheduler (void *arg) nloops < CFS_WI_RESCHED) { wi = cfs_list_entry(sched->ws_runq.next, cfs_workitem_t, wi_list); - LASSERT (wi->wi_scheduled && !wi->wi_running); + LASSERT(wi->wi_scheduled && !wi->wi_running); + + cfs_list_del_init(&wi->wi_list); - cfs_list_del_init(&wi->wi_list); + LASSERT(sched->ws_nscheduled > 0); + sched->ws_nscheduled--; wi->wi_running = 1; wi->wi_scheduled = 0; + + cfs_wi_sched_unlock(sched); nloops++; @@ -298,7 +305,7 @@ cfs_wi_scheduler (void *arg) if (cfs_list_empty(&wi->wi_list)) continue; - LASSERT (wi->wi_scheduled); + LASSERT(wi->wi_scheduled); /* wi is rescheduled, should be on rerunq now, we * move it to runq so it can run action now */ cfs_list_move_tail(&wi->wi_list, &sched->ws_runq); @@ -321,24 +328,10 @@ cfs_wi_scheduler (void *arg) cfs_wi_sched_unlock(sched); - cfs_spin_lock(&cfs_wi_data.wi_glock); - cfs_wi_data.wi_nthreads--; - cfs_spin_unlock(&cfs_wi_data.wi_glock); - return 0; -} - -static int -cfs_wi_start_thread (int (*func) (void*), void *arg) -{ - long pid; - - pid = cfs_create_thread(func, arg, 0); - if (pid < 0) - return (int)pid; + cfs_spin_lock(&cfs_wi_data.wi_glock); + sched->ws_nthreads--; + cfs_spin_unlock(&cfs_wi_data.wi_glock); - cfs_spin_lock(&cfs_wi_data.wi_glock); - cfs_wi_data.wi_nthreads++; - cfs_spin_unlock(&cfs_wi_data.wi_glock); return 0; } @@ -349,21 +342,31 @@ cfs_wi_check_events (void) { int n = 0; cfs_workitem_t *wi; - cfs_list_t *q; cfs_spin_lock(&cfs_wi_data.wi_glock); for (;;) { + struct cfs_wi_sched *sched = NULL; + struct cfs_wi_sched *tmp; + /** rerunq is always empty for userspace */ - if (!cfs_list_empty(&cfs_wi_data.wi_scheds[1].ws_runq)) - q = &cfs_wi_data.wi_scheds[1].ws_runq; - else if (!cfs_list_empty(&cfs_wi_data.wi_scheds[0].ws_runq)) - q = &cfs_wi_data.wi_scheds[0].ws_runq; - else - break; + cfs_list_for_each_entry(tmp, + &cfs_wi_data.wi_scheds, ws_list) { + if (!cfs_list_empty(&tmp->ws_runq)) { + sched = tmp; + break; + } + } + + if (sched == NULL) + break; + + wi = cfs_list_entry(sched->ws_runq.next, + cfs_workitem_t, wi_list); + cfs_list_del_init(&wi->wi_list); - wi = cfs_list_entry(q->next, cfs_workitem_t, wi_list); - cfs_list_del_init(&wi->wi_list); + LASSERT(sched->ws_nscheduled > 0); + sched->ws_nscheduled--; LASSERT (wi->wi_scheduled); wi->wi_scheduled = 0; @@ -381,98 +384,169 @@ cfs_wi_check_events (void) #endif -static void -cfs_wi_sched_init(cfs_wi_sched_t *sched) +void +cfs_wi_sched_destroy(struct cfs_wi_sched *sched) { - sched->ws_shuttingdown = 0; + int i; + + LASSERT(cfs_wi_data.wi_init); + LASSERT(!cfs_wi_data.wi_stopping); + + cfs_spin_lock(&cfs_wi_data.wi_glock); + if (sched->ws_stopping) { + CDEBUG(D_INFO, "%s is in progress of stopping\n", + sched->ws_name); + cfs_spin_unlock(&cfs_wi_data.wi_glock); + return; + } + + LASSERT(!cfs_list_empty(&sched->ws_list)); + sched->ws_stopping = 1; + + cfs_spin_unlock(&cfs_wi_data.wi_glock); + + i = 2; #ifdef __KERNEL__ - cfs_spin_lock_init(&sched->ws_lock); - cfs_waitq_init(&sched->ws_waitq); + cfs_waitq_broadcast(&sched->ws_waitq); + + cfs_spin_lock(&cfs_wi_data.wi_glock); + while (sched->ws_nthreads > 0) { + CDEBUG(IS_PO2(++i) ? D_WARNING : D_NET, + "waiting for %d threads of WI sched[%s] to terminate\n", + sched->ws_nthreads, sched->ws_name); + + cfs_spin_unlock(&cfs_wi_data.wi_glock); + cfs_pause(cfs_time_seconds(1) / 20); + cfs_spin_lock(&cfs_wi_data.wi_glock); + } + + cfs_list_del(&sched->ws_list); + + cfs_spin_unlock(&cfs_wi_data.wi_glock); #endif - CFS_INIT_LIST_HEAD(&sched->ws_runq); - CFS_INIT_LIST_HEAD(&sched->ws_rerunq); + LASSERT(sched->ws_nscheduled == 0); + + LIBCFS_FREE(sched, sizeof(*sched)); } +EXPORT_SYMBOL(cfs_wi_sched_destroy); -static void -cfs_wi_sched_shutdown(cfs_wi_sched_t *sched) +int +cfs_wi_sched_create(char *name, struct cfs_cpt_table *cptab, + int cpt, int nthrs, struct cfs_wi_sched **sched_pp) { - cfs_wi_sched_lock(sched); + struct cfs_wi_sched *sched; + int rc; + + LASSERT(cfs_wi_data.wi_init); + LASSERT(!cfs_wi_data.wi_stopping); + LASSERT(cptab == NULL || cpt == CFS_CPT_ANY || + (cpt >= 0 && cpt < cfs_cpt_number(cptab))); - LASSERT(cfs_list_empty(&sched->ws_runq)); - LASSERT(cfs_list_empty(&sched->ws_rerunq)); + LIBCFS_ALLOC(sched, sizeof(*sched)); + if (sched == NULL) + return -ENOMEM; - sched->ws_shuttingdown = 1; + strncpy(sched->ws_name, name, CFS_WS_NAME_LEN); + sched->ws_cptab = cptab; + sched->ws_cpt = cpt; #ifdef __KERNEL__ - cfs_waitq_broadcast(&sched->ws_waitq); + cfs_spin_lock_init(&sched->ws_lock); + cfs_waitq_init(&sched->ws_waitq); #endif - cfs_wi_sched_unlock(sched); -} + CFS_INIT_LIST_HEAD(&sched->ws_runq); + CFS_INIT_LIST_HEAD(&sched->ws_rerunq); + CFS_INIT_LIST_HEAD(&sched->ws_list); + rc = 0; +#ifdef __KERNEL__ + while (nthrs > 0) { + cfs_spin_lock(&cfs_wi_data.wi_glock); + while (sched->ws_starting > 0) { + cfs_spin_unlock(&cfs_wi_data.wi_glock); + cfs_schedule(); + cfs_spin_lock(&cfs_wi_data.wi_glock); + } -int -cfs_wi_startup (void) -{ - int i; - int n, rc; + sched->ws_starting++; + cfs_spin_unlock(&cfs_wi_data.wi_glock); - cfs_wi_data.wi_nthreads = 0; - cfs_wi_data.wi_nsched = CFS_WI_NSCHED; - LIBCFS_ALLOC(cfs_wi_data.wi_scheds, - cfs_wi_data.wi_nsched * sizeof(cfs_wi_sched_t)); - if (cfs_wi_data.wi_scheds == NULL) - return -ENOMEM; + rc = cfs_create_thread(cfs_wi_scheduler, sched, 0); + if (rc >= 0) { + nthrs--; + continue; + } - cfs_spin_lock_init(&cfs_wi_data.wi_glock); - for (i = 0; i < cfs_wi_data.wi_nsched; i++) - cfs_wi_sched_init(&cfs_wi_data.wi_scheds[i]); + CERROR("Failed to create thread for WI scheduler %s: %d\n", + name, rc); -#ifdef __KERNEL__ - n = cfs_num_online_cpus(); - for (i = 0; i <= n; i++) { - rc = cfs_wi_start_thread(cfs_wi_scheduler, - (void *)(long_ptr_t)(i == n ? -1 : i)); - if (rc != 0) { - CERROR ("Can't spawn workitem scheduler: %d\n", rc); - cfs_wi_shutdown(); - return rc; - } - } -#else - SET_BUT_UNUSED(rc); - SET_BUT_UNUSED(n); + cfs_spin_lock(&cfs_wi_data.wi_glock); + + /* make up for cfs_wi_sched_destroy */ + cfs_list_add(&sched->ws_list, &cfs_wi_data.wi_scheds); + sched->ws_starting--; + + cfs_spin_unlock(&cfs_wi_data.wi_glock); + + cfs_wi_sched_destroy(sched); + return rc; + } #endif + cfs_spin_lock(&cfs_wi_data.wi_glock); + cfs_list_add(&sched->ws_list, &cfs_wi_data.wi_scheds); + cfs_spin_unlock(&cfs_wi_data.wi_glock); - return 0; + *sched_pp = sched; + return 0; } +EXPORT_SYMBOL(cfs_wi_sched_create); -void -cfs_wi_shutdown (void) +int +cfs_wi_startup(void) { - int i; + memset(&cfs_wi_data, 0, sizeof(cfs_wi_data)); - if (cfs_wi_data.wi_scheds == NULL) - return; + cfs_spin_lock_init(&cfs_wi_data.wi_glock); + CFS_INIT_LIST_HEAD(&cfs_wi_data.wi_scheds); + cfs_wi_data.wi_init = 1; - for (i = 0; i < cfs_wi_data.wi_nsched; i++) - cfs_wi_sched_shutdown(&cfs_wi_data.wi_scheds[i]); + return 0; +} -#ifdef __KERNEL__ - cfs_spin_lock(&cfs_wi_data.wi_glock); - i = 2; - while (cfs_wi_data.wi_nthreads != 0) { - CDEBUG(IS_PO2(++i) ? D_WARNING : D_NET, - "waiting for %d threads to terminate\n", - cfs_wi_data.wi_nthreads); - cfs_spin_unlock(&cfs_wi_data.wi_glock); +void +cfs_wi_shutdown (void) +{ + struct cfs_wi_sched *sched; - cfs_pause(cfs_time_seconds(1)); + cfs_spin_lock(&cfs_wi_data.wi_glock); + cfs_wi_data.wi_stopping = 1; + cfs_spin_unlock(&cfs_wi_data.wi_glock); - cfs_spin_lock(&cfs_wi_data.wi_glock); - } - cfs_spin_unlock(&cfs_wi_data.wi_glock); +#ifdef __KERNEL__ + /* nobody should contend on this list */ + cfs_list_for_each_entry(sched, &cfs_wi_data.wi_scheds, ws_list) { + sched->ws_stopping = 1; + cfs_waitq_broadcast(&sched->ws_waitq); + } + + cfs_list_for_each_entry(sched, &cfs_wi_data.wi_scheds, ws_list) { + cfs_spin_lock(&cfs_wi_data.wi_glock); + + while (sched->ws_nthreads != 0) { + cfs_spin_unlock(&cfs_wi_data.wi_glock); + cfs_pause(cfs_time_seconds(1) / 20); + cfs_spin_lock(&cfs_wi_data.wi_glock); + } + cfs_spin_unlock(&cfs_wi_data.wi_glock); + } #endif - LIBCFS_FREE(cfs_wi_data.wi_scheds, - cfs_wi_data.wi_nsched * sizeof(cfs_wi_sched_t)); - return; + while (!cfs_list_empty(&cfs_wi_data.wi_scheds)) { + sched = cfs_list_entry(cfs_wi_data.wi_scheds.next, + struct cfs_wi_sched, ws_list); + cfs_list_del(&sched->ws_list); + LIBCFS_FREE(sched, sizeof(*sched)); + } + + cfs_wi_data.wi_stopping = 0; + cfs_wi_data.wi_init = 0; } diff --git a/lnet/selftest/framework.c b/lnet/selftest/framework.c index 4ef5679..c034665 100644 --- a/lnet/selftest/framework.c +++ b/lnet/selftest/framework.c @@ -999,8 +999,8 @@ sfw_run_batch (sfw_batch_t *tsb) cfs_atomic_inc(&tsi->tsi_nactive); tsu->tsu_loop = tsi->tsi_loop; wi = &tsu->tsu_worker; - swi_init_workitem(wi, tsu, sfw_run_test, - CFS_WI_SCHED_ANY); + swi_init_workitem(wi, tsu, sfw_run_test, + lst_sched_test); swi_schedule_workitem(wi); } } diff --git a/lnet/selftest/module.c b/lnet/selftest/module.c index eb54f01..1377b10 100644 --- a/lnet/selftest/module.c +++ b/lnet/selftest/module.c @@ -37,16 +37,22 @@ #include "selftest.h" -#define LST_INIT_NONE 0 -#define LST_INIT_RPC 1 -#define LST_INIT_FW 2 -#define LST_INIT_CONSOLE 3 +enum { + LST_INIT_NONE = 0, + LST_INIT_WI, + LST_INIT_RPC, + LST_INIT_FW, + LST_INIT_CONSOLE +}; extern int lstcon_console_init(void); extern int lstcon_console_fini(void); static int lst_init_step = LST_INIT_NONE; +struct cfs_wi_sched *lst_sched_serial; +struct cfs_wi_sched *lst_sched_test; + void lnet_selftest_fini (void) { @@ -59,6 +65,11 @@ lnet_selftest_fini (void) sfw_shutdown(); case LST_INIT_RPC: srpc_shutdown(); + case LST_INIT_WI: + cfs_wi_sched_destroy(lst_sched_serial); + cfs_wi_sched_destroy(lst_sched_test); + lst_sched_serial = NULL; + lst_sched_test = NULL; case LST_INIT_NONE: break; default: @@ -82,8 +93,24 @@ lnet_selftest_structure_assertion(void) int lnet_selftest_init (void) { + int nthrs; int rc; + rc = cfs_wi_sched_create("lst_s", lnet_cpt_table(), CFS_CPT_ANY, + 1, &lst_sched_serial); + if (rc != 0) + return rc; + + nthrs = cfs_cpt_weight(lnet_cpt_table(), CFS_CPT_ANY); + rc = cfs_wi_sched_create("lst_t", lnet_cpt_table(), CFS_CPT_ANY, + nthrs, &lst_sched_test); + if (rc != 0) { + cfs_wi_sched_destroy(lst_sched_serial); + lst_sched_serial = NULL; + return rc; + } + lst_init_step = LST_INIT_WI; + rc = srpc_startup(); if (rc != 0) { CERROR("LST can't startup rpc\n"); diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index 8413f48..5329b94 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -186,10 +186,10 @@ void srpc_init_server_rpc (srpc_server_rpc_t *rpc, srpc_service_t *sv, srpc_buffer_t *buffer) { - memset(rpc, 0, sizeof(*rpc)); - swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc, - sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID ? - CFS_WI_SCHED_SERIAL : CFS_WI_SCHED_ANY); + memset(rpc, 0, sizeof(*rpc)); + swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc, + sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID ? + lst_sched_serial : lst_sched_test); rpc->srpc_ev.ev_fired = 1; /* no event expected now */ @@ -1461,7 +1461,7 @@ srpc_startup (void) #endif if (rc < 0) { CERROR ("LNetNIInit() has failed: %d\n", rc); - return rc; + return rc; } srpc_data.rpc_state = SRPC_STATE_NI_INIT; @@ -1529,7 +1529,6 @@ srpc_shutdown (void) case SRPC_STATE_NI_INIT: LNetNIFini(); - break; } return; diff --git a/lnet/selftest/selftest.h b/lnet/selftest/selftest.h index a1f9ab4..9943fa5 100644 --- a/lnet/selftest/selftest.h +++ b/lnet/selftest/selftest.h @@ -185,6 +185,7 @@ struct swi_workitem; typedef int (*swi_action_t) (struct swi_workitem *); typedef struct swi_workitem { + struct cfs_wi_sched *swi_sched; cfs_workitem_t swi_workitem; swi_action_t swi_action; int swi_state; @@ -412,6 +413,9 @@ void srpc_service_remove_buffers(srpc_service_t *sv, int nbuffer); void srpc_get_counters(srpc_counters_t *cnt); void srpc_set_counters(const srpc_counters_t *cnt); +extern struct cfs_wi_sched *lst_sched_serial; +extern struct cfs_wi_sched *lst_sched_test; + static inline int swi_wi_action(cfs_workitem_t *wi) { @@ -421,24 +425,25 @@ swi_wi_action(cfs_workitem_t *wi) } static inline void -swi_init_workitem (swi_workitem_t *swi, void *data, - swi_action_t action, short sched_id) +swi_init_workitem(swi_workitem_t *swi, void *data, + swi_action_t action, struct cfs_wi_sched *sched) { - swi->swi_action = action; - swi->swi_state = SWI_STATE_NEWBORN; - cfs_wi_init(&swi->swi_workitem, data, swi_wi_action, sched_id); + swi->swi_sched = sched; + swi->swi_action = action; + swi->swi_state = SWI_STATE_NEWBORN; + cfs_wi_init(&swi->swi_workitem, data, swi_wi_action); } static inline void swi_schedule_workitem(swi_workitem_t *wi) { - cfs_wi_schedule(&wi->swi_workitem); + cfs_wi_schedule(wi->swi_sched, &wi->swi_workitem); } static inline void swi_kill_workitem(swi_workitem_t *swi) { - cfs_wi_exit(&swi->swi_workitem); + cfs_wi_exit(swi->swi_sched, &swi->swi_workitem); } #ifndef __KERNEL__ @@ -485,8 +490,7 @@ srpc_init_client_rpc (srpc_client_rpc_t *rpc, lnet_process_id_t peer, crpc_bulk.bk_iovs[nbulkiov])); CFS_INIT_LIST_HEAD(&rpc->crpc_list); - swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc, - CFS_WI_SCHED_ANY); + swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc, lst_sched_test); cfs_spin_lock_init(&rpc->crpc_lock); cfs_atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */