From 4c3c97ffdc68525d3096d0638ce28dbd7e91ab5f Mon Sep 17 00:00:00 2001 From: Jian Yu Date: Thu, 30 Sep 2010 21:07:07 +0800 Subject: [PATCH] b=21954 workitem to libcfs bug 21954 attachment 31455 and attachment 31665 o=zhen.liang i=he.h.huang i=maxim.patlasov --- libcfs/include/libcfs/Makefile.am | 2 +- libcfs/include/libcfs/libcfs.h | 1 + libcfs/include/libcfs/libcfs_workitem.h | 118 ++++++++ libcfs/libcfs/Makefile.in | 2 +- libcfs/libcfs/autoMakefile.am | 5 +- libcfs/libcfs/module.c | 11 +- libcfs/libcfs/workitem.c | 478 ++++++++++++++++++++++++++++++++ lnet/selftest/Makefile.in | 2 +- lnet/selftest/autoMakefile.am | 2 +- lnet/selftest/framework.c | 9 +- lnet/selftest/rpc.c | 61 ++-- lnet/selftest/selftest.h | 104 ++++--- lnet/selftest/workitem.c | 376 ------------------------- lnet/utils/lstclient.c | 12 +- 14 files changed, 701 insertions(+), 482 deletions(-) create mode 100644 libcfs/include/libcfs/libcfs_workitem.h create mode 100644 libcfs/libcfs/workitem.c delete mode 100644 lnet/selftest/workitem.c diff --git a/libcfs/include/libcfs/Makefile.am b/libcfs/include/libcfs/Makefile.am index faf9c72..d074151 100644 --- a/libcfs/include/libcfs/Makefile.am +++ b/libcfs/include/libcfs/Makefile.am @@ -10,4 +10,4 @@ EXTRA_DIST := curproc.h libcfs_private.h libcfs.h list.h lltrace.h \ libcfs_prim.h libcfs_time.h libcfs_hash.h \ libcfs_debug.h libcfsutil.h libcfs_ioctl.h \ libcfs_pack.h libcfs_unpack.h libcfs_string.h \ - libcfs_kernelcomm.h + libcfs_kernelcomm.h libcfs_workitem.h diff --git a/libcfs/include/libcfs/libcfs.h b/libcfs/include/libcfs/libcfs.h index 5ec895a..06a4a37 100644 --- a/libcfs/include/libcfs/libcfs.h +++ b/libcfs/include/libcfs/libcfs.h @@ -301,6 +301,7 @@ void cfs_get_random_bytes(void *buf, int size); #include #include #include +#include #include /* container_of depends on "likely" which is defined in libcfs_private.h */ diff --git a/libcfs/include/libcfs/libcfs_workitem.h b/libcfs/include/libcfs/libcfs_workitem.h new file mode 100644 index 0000000..7448836 --- /dev/null +++ b/libcfs/include/libcfs/libcfs_workitem.h @@ -0,0 +1,118 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * libcfs/include/libcfs/libcfs_workitem.h + * + * Author: Isaac Huang + * Liang Zhen + * + * A workitems is deferred work with these semantics: + * - a workitem always runs in thread context. + * - a workitem can be concurrent with other workitems but is strictly + * serialized with respect to itself. + * - no CPU affinity, a workitem does not necessarily run on the same CPU + * that schedules it. However, this might change in the future. + * - if a workitem is scheduled again before it has a chance to run, it + * runs only once. + * - if a workitem is scheduled while it runs, it runs again after it + * completes; this ensures that events occurring while other events are + * being processed receive due attention. This behavior also allows a + * workitem to reschedule itself. + * + * Usage notes: + * - a workitem can sleep but it should be aware of how that sleep might + * affect others. + * - a workitem runs inside a kernel thread so there's no user space to access. + * - do not use a workitem if the scheduling latency can't be tolerated. + * + * When wi_action returns non-zero, it means the workitem has either been + * freed or reused and workitem scheduler won't touch it any more. + */ + +#ifndef __LIBCFS_WORKITEM_H__ +#define __LIBCFS_WORKITEM_H__ + +struct cfs_workitem; + +typedef int (*cfs_wi_action_t) (struct cfs_workitem *); +typedef struct cfs_workitem { + /** chain on runq or rerunq */ + cfs_list_t wi_list; + /** working function */ + cfs_wi_action_t wi_action; + /** arg for working function */ + void *wi_data; + /** scheduler id, can be negative */ + short wi_sched_id; + /** in running */ + unsigned short wi_running:1; + /** scheduled */ + unsigned short wi_scheduled:1; +} cfs_workitem_t; + +/** + * positive values are reserved as CPU id of future implementation of + * per-cpu scheduler, so user can "bind" workitem on specific CPU. + */ +#define CFS_WI_SCHED_ANY (-1) +#define CFS_WI_SCHED_SERIAL (-2) + +static inline void +cfs_wi_init(cfs_workitem_t *wi, void *data, + cfs_wi_action_t action, short sched_id) +{ + CFS_INIT_LIST_HEAD(&wi->wi_list); + + wi->wi_sched_id = sched_id; + wi->wi_running = 0; + wi->wi_scheduled = 0; + wi->wi_data = data; + wi->wi_action = action; +} + +void cfs_wi_exit(cfs_workitem_t *wi); +int cfs_wi_cancel(cfs_workitem_t *wi); +void cfs_wi_schedule(cfs_workitem_t *wi); +int cfs_wi_startup(void); +void cfs_wi_shutdown(void); + +#ifdef __KERNEL__ +/** # workitem scheduler loops before reschedule */ +#define CFS_WI_RESCHED 128 +#else +int cfs_wi_check_events(void); +#endif + +#endif /* __LIBCFS_WORKITEM_H__ */ diff --git a/libcfs/libcfs/Makefile.in b/libcfs/libcfs/Makefile.in index fbeefdd..b794923 100644 --- a/libcfs/libcfs/Makefile.in +++ b/libcfs/libcfs/Makefile.in @@ -25,7 +25,7 @@ sources: endif libcfs-all-objs := debug.o nidstrings.o lwt.o module.o tracefile.o watchdog.o \ - libcfs_string.o hash.o kernel_user_comm.o prng.o + libcfs_string.o hash.o kernel_user_comm.o prng.o workitem.o libcfs-objs := $(libcfs-linux-objs) $(libcfs-all-objs) diff --git a/libcfs/libcfs/autoMakefile.am b/libcfs/libcfs/autoMakefile.am index e15394a..7712a64 100644 --- a/libcfs/libcfs/autoMakefile.am +++ b/libcfs/libcfs/autoMakefile.am @@ -43,7 +43,8 @@ DIST_SUBDIRS := linux util posix darwin if LIBLUSTRE noinst_LIBRARIES= libcfs.a libcfs_a_SOURCES= posix/posix-debug.c user-prim.c user-lock.c user-tcpip.c \ - prng.c user-bitops.c user-mem.c hash.c kernel_user_comm.c + prng.c user-bitops.c user-mem.c hash.c kernel_user_comm.c \ + workitem.c libcfs_a_CPPFLAGS = $(LLCPPFLAGS) libcfs_a_CFLAGS = $(LLCFLAGS) endif @@ -89,5 +90,5 @@ EXTRA_DIST := Info.plist MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ linux-*.c linux/*.o darwin/*.o libcfs DIST_SOURCES := $(libcfs-all-objs:%.o=%.c) tracefile.h user-prim.c prng.c \ - user-lock.c user-tcpip.c user-bitops.c\ + user-lock.c user-tcpip.c user-bitops.c workitem.c \ user-mem.c kernel_user_comm.c linux/linux-tracefile.h diff --git a/libcfs/libcfs/module.c b/libcfs/libcfs/module.c index 8e773d3..a25df4a 100644 --- a/libcfs/libcfs/module.c +++ b/libcfs/libcfs/module.c @@ -406,15 +406,23 @@ static int init_libcfs_module(void) goto cleanup_lwt; } + rc = cfs_wi_startup(); + if (rc) { + CERROR("startup workitem: error %d\n", rc); + goto cleanup_deregister; + } + rc = insert_proc(); if (rc) { CERROR("insert_proc: error %d\n", rc); - goto cleanup_deregister; + goto cleanup_wi; } CDEBUG (D_OTHER, "portals setup OK\n"); return (0); + cleanup_wi: + cfs_wi_shutdown(); cleanup_deregister: cfs_psdev_deregister(&libcfs_dev); cleanup_lwt: @@ -435,6 +443,7 @@ static void exit_libcfs_module(void) CDEBUG(D_MALLOC, "before Portals cleanup: kmem %d\n", cfs_atomic_read(&libcfs_kmemory)); + cfs_wi_shutdown(); rc = cfs_psdev_deregister(&libcfs_dev); if (rc) CERROR("misc_deregister error %d\n", rc); diff --git a/libcfs/libcfs/workitem.c b/libcfs/libcfs/workitem.c new file mode 100644 index 0000000..6533867 --- /dev/null +++ b/libcfs/libcfs/workitem.c @@ -0,0 +1,478 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * libcfs/libcfs/workitem.c + * + * Author: Isaac Huang + * Liang Zhen + */ + +#define DEBUG_SUBSYSTEM S_LNET + +#include + +typedef struct cfs_wi_sched { +#ifdef __KERNEL__ + /** serialised workitems */ + cfs_spinlock_t ws_lock; + /** where schedulers sleep */ + cfs_waitq_t ws_waitq; +#endif + /** concurrent workitems */ + cfs_list_t ws_runq; + /** rescheduled running-workitems */ + cfs_list_t ws_rerunq; + /** shutting down */ + int ws_shuttingdown; +} cfs_wi_sched_t; + +#ifdef __KERNEL__ +/** + * we have 2 cfs_wi_sched_t so far: + * one for CFS_WI_SCHED_ANY, another for CFS_WI_SCHED_SERIAL + * per-cpu implementation will be added for SMP scalability + */ + +#define CFS_WI_NSCHED 2 +#else +/** always 2 for userspace */ +#define CFS_WI_NSCHED 2 +#endif /* __KERNEL__ */ + +struct cfs_workitem_data { + /** serialize */ + cfs_spinlock_t wi_glock; + /** number of cfs_wi_sched_t */ + int wi_nsched; + /** number of threads (all schedulers) */ + int wi_nthreads; + /** default scheduler */ + cfs_wi_sched_t *wi_scheds; +} cfs_wi_data; + +static inline cfs_wi_sched_t * +cfs_wi_to_sched(cfs_workitem_t *wi) +{ + LASSERT(wi->wi_sched_id == CFS_WI_SCHED_ANY || + wi->wi_sched_id == CFS_WI_SCHED_SERIAL || + (wi->wi_sched_id >= 0 && + wi->wi_sched_id < cfs_wi_data.wi_nsched)); + + if (wi->wi_sched_id == CFS_WI_SCHED_ANY) + return &cfs_wi_data.wi_scheds[0]; + if (wi->wi_sched_id == CFS_WI_SCHED_SERIAL) + return &cfs_wi_data.wi_scheds[cfs_wi_data.wi_nsched - 1]; + + return &cfs_wi_data.wi_scheds[wi->wi_sched_id]; +} + +#ifdef __KERNEL__ +static inline void +cfs_wi_sched_lock(cfs_wi_sched_t *sched) +{ + cfs_spin_lock(&sched->ws_lock); +} + +static inline void +cfs_wi_sched_unlock(cfs_wi_sched_t *sched) +{ + cfs_spin_unlock(&sched->ws_lock); +} + +static inline int +cfs_wi_sched_cansleep(cfs_wi_sched_t *sched) +{ + cfs_wi_sched_lock(sched); + if (sched->ws_shuttingdown) { + cfs_wi_sched_unlock(sched); + return 0; + } + + if (!cfs_list_empty(&sched->ws_runq)) { + cfs_wi_sched_unlock(sched); + return 0; + } + cfs_wi_sched_unlock(sched); + return 1; +} + +#else + +static inline void +cfs_wi_sched_lock(cfs_wi_sched_t *sched) +{ + cfs_spin_lock(&cfs_wi_data.wi_glock); +} + +static inline void +cfs_wi_sched_unlock(cfs_wi_sched_t *sched) +{ + cfs_spin_unlock(&cfs_wi_data.wi_glock); +} + +#endif + +/* XXX: + * 0. it only works when called from wi->wi_action. + * 1. when it returns no one shall try to schedule the workitem. + */ +void +cfs_wi_exit(cfs_workitem_t *wi) +{ + cfs_wi_sched_t *sched = cfs_wi_to_sched(wi); + + LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */ + LASSERT (!sched->ws_shuttingdown); + + cfs_wi_sched_lock(sched); + +#ifdef __KERNEL__ + LASSERT (wi->wi_running); +#endif + if (wi->wi_scheduled) { /* cancel pending schedules */ + LASSERT (!cfs_list_empty(&wi->wi_list)); + cfs_list_del_init(&wi->wi_list); + } + + LASSERT (cfs_list_empty(&wi->wi_list)); + wi->wi_scheduled = 1; /* LBUG future schedule attempts */ + + cfs_wi_sched_unlock(sched); + return; +} +CFS_EXPORT_SYMBOL(cfs_wi_exit); + +/** + * cancel a workitem: + */ +int +cfs_wi_cancel (cfs_workitem_t *wi) +{ + cfs_wi_sched_t *sched = cfs_wi_to_sched(wi); + int rc; + + LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */ + LASSERT (!sched->ws_shuttingdown); + + cfs_wi_sched_lock(sched); + /* + * return 0 if it's running already, otherwise return 1, which + * means the workitem will not be scheduled and will not have + * any race with wi_action. + */ + rc = !(wi->wi_running); + + if (wi->wi_scheduled) { /* cancel pending schedules */ + LASSERT (!cfs_list_empty(&wi->wi_list)); + cfs_list_del_init(&wi->wi_list); + wi->wi_scheduled = 0; + } + + LASSERT (cfs_list_empty(&wi->wi_list)); + + cfs_wi_sched_unlock(sched); + return rc; +} + +CFS_EXPORT_SYMBOL(cfs_wi_cancel); + +/* + * Workitem scheduled with (serial == 1) is strictly serialised not only with + * itself, but also with others scheduled this way. + * + * Now there's only one static serialised queue, but in the future more might + * be added, and even dynamic creation of serialised queues might be supported. + */ +void +cfs_wi_schedule(cfs_workitem_t *wi) +{ + cfs_wi_sched_t *sched = cfs_wi_to_sched(wi); + + LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */ + LASSERT (!sched->ws_shuttingdown); + + cfs_wi_sched_lock(sched); + + if (!wi->wi_scheduled) { + LASSERT (cfs_list_empty(&wi->wi_list)); + + wi->wi_scheduled = 1; + if (!wi->wi_running) { + cfs_list_add_tail(&wi->wi_list, &sched->ws_runq); +#ifdef __KERNEL__ + cfs_waitq_signal(&sched->ws_waitq); +#endif + } else { + cfs_list_add(&wi->wi_list, &sched->ws_rerunq); + } + } + + LASSERT (!cfs_list_empty(&wi->wi_list)); + cfs_wi_sched_unlock(sched); + return; +} + +CFS_EXPORT_SYMBOL(cfs_wi_schedule); + +#ifdef __KERNEL__ + +static int +cfs_wi_scheduler (void *arg) +{ + int id = (int)(long_ptr_t) arg; + int serial = (id == -1); + char name[24]; + cfs_wi_sched_t *sched; + + if (serial) { + sched = &cfs_wi_data.wi_scheds[cfs_wi_data.wi_nsched - 1]; + cfs_daemonize("wi_serial_sd"); + } else { + /* will be sched = &cfs_wi_data.wi_scheds[id] in the future */ + sched = &cfs_wi_data.wi_scheds[0]; + snprintf(name, sizeof(name), "cfs_wi_sd%03d", id); + cfs_daemonize(name); + } + + cfs_block_allsigs(); + + cfs_wi_sched_lock(sched); + + while (!sched->ws_shuttingdown) { + int nloops = 0; + int rc; + cfs_workitem_t *wi; + + while (!cfs_list_empty(&sched->ws_runq) && + nloops < CFS_WI_RESCHED) { + wi = cfs_list_entry(sched->ws_runq.next, + cfs_workitem_t, wi_list); + LASSERT (wi->wi_scheduled && !wi->wi_running); + + cfs_list_del_init(&wi->wi_list); + + wi->wi_running = 1; + wi->wi_scheduled = 0; + cfs_wi_sched_unlock(sched); + nloops++; + + rc = (*wi->wi_action) (wi); + + cfs_wi_sched_lock(sched); + if (rc != 0) /* WI should be dead, even be freed! */ + continue; + + wi->wi_running = 0; + if (cfs_list_empty(&wi->wi_list)) + continue; + + LASSERT (wi->wi_scheduled); + /* wi is rescheduled, should be on rerunq now, we + * move it to runq so it can run action now */ + cfs_list_move_tail(&wi->wi_list, &sched->ws_runq); + } + + if (!cfs_list_empty(&sched->ws_runq)) { + cfs_wi_sched_unlock(sched); + /* don't sleep because some workitems still + * expect me to come back soon */ + cfs_cond_resched(); + cfs_wi_sched_lock(sched); + continue; + } + + cfs_wi_sched_unlock(sched); + cfs_wait_event_interruptible_exclusive(sched->ws_waitq, + !cfs_wi_sched_cansleep(sched), rc); + cfs_wi_sched_lock(sched); + } + + cfs_wi_sched_unlock(sched); + + cfs_spin_lock(&cfs_wi_data.wi_glock); + cfs_wi_data.wi_nthreads--; + cfs_spin_unlock(&cfs_wi_data.wi_glock); + return 0; +} + +static int +cfs_wi_start_thread (int (*func) (void*), void *arg) +{ + long pid; + + pid = cfs_kernel_thread(func, arg, 0); + if (pid < 0) + return (int)pid; + + cfs_spin_lock(&cfs_wi_data.wi_glock); + cfs_wi_data.wi_nthreads++; + cfs_spin_unlock(&cfs_wi_data.wi_glock); + return 0; +} + +#else /* __KERNEL__ */ + +int +cfs_wi_check_events (void) +{ + int n = 0; + cfs_workitem_t *wi; + cfs_list_t *q; + + cfs_spin_lock(&cfs_wi_data.wi_glock); + + for (;;) { + /** rerunq is always empty for userspace */ + if (!cfs_list_empty(&cfs_wi_data.wi_scheds[1].ws_runq)) + q = &cfs_wi_data.wi_scheds[1].ws_runq; + else if (!cfs_list_empty(&cfs_wi_data.wi_scheds[0].ws_runq)) + q = &cfs_wi_data.wi_scheds[0].ws_runq; + else + break; + + wi = cfs_list_entry(q->next, cfs_workitem_t, wi_list); + cfs_list_del_init(&wi->wi_list); + + LASSERT (wi->wi_scheduled); + wi->wi_scheduled = 0; + cfs_spin_unlock(&cfs_wi_data.wi_glock); + + n++; + (*wi->wi_action) (wi); + + cfs_spin_lock(&cfs_wi_data.wi_glock); + } + + cfs_spin_unlock(&cfs_wi_data.wi_glock); + return n; +} + +#endif + +static void +cfs_wi_sched_init(cfs_wi_sched_t *sched) +{ + sched->ws_shuttingdown = 0; +#ifdef __KERNEL__ + cfs_spin_lock_init(&sched->ws_lock); + cfs_waitq_init(&sched->ws_waitq); +#endif + CFS_INIT_LIST_HEAD(&sched->ws_runq); + CFS_INIT_LIST_HEAD(&sched->ws_rerunq); +} + +static void +cfs_wi_sched_shutdown(cfs_wi_sched_t *sched) +{ + cfs_wi_sched_lock(sched); + + LASSERT(cfs_list_empty(&sched->ws_runq)); + LASSERT(cfs_list_empty(&sched->ws_rerunq)); + + sched->ws_shuttingdown = 1; + +#ifdef __KERNEL__ + cfs_waitq_broadcast(&sched->ws_waitq); +#endif + cfs_wi_sched_unlock(sched); +} + + +int +cfs_wi_startup (void) +{ + int i; + int n; + int rc; + + cfs_wi_data.wi_nthreads = 0; + cfs_wi_data.wi_nsched = CFS_WI_NSCHED; + LIBCFS_ALLOC(cfs_wi_data.wi_scheds, + cfs_wi_data.wi_nsched * sizeof(cfs_wi_sched_t)); + if (cfs_wi_data.wi_scheds == NULL) + return -ENOMEM; + + cfs_spin_lock_init(&cfs_wi_data.wi_glock); + for (i = 0; i < cfs_wi_data.wi_nsched; i++) + cfs_wi_sched_init(&cfs_wi_data.wi_scheds[i]); + +#ifdef __KERNEL__ + n = cfs_num_online_cpus(); + for (i = 0; i <= n; i++) { + rc = cfs_wi_start_thread(cfs_wi_scheduler, + (void *)(long_ptr_t)(i == n ? -1 : i)); + if (rc != 0) { + CERROR ("Can't spawn workitem scheduler: %d\n", rc); + cfs_wi_shutdown(); + return rc; + } + } +#else + n = rc = 0; +#endif + + return 0; +} + +void +cfs_wi_shutdown (void) +{ + int i; + + if (cfs_wi_data.wi_scheds == NULL) + return; + + for (i = 0; i < cfs_wi_data.wi_nsched; i++) + cfs_wi_sched_shutdown(&cfs_wi_data.wi_scheds[i]); + +#ifdef __KERNEL__ + cfs_spin_lock(&cfs_wi_data.wi_glock); + i = 2; + while (cfs_wi_data.wi_nthreads != 0) { + CDEBUG(IS_PO2(++i) ? D_WARNING : D_NET, + "waiting for %d threads to terminate\n", + cfs_wi_data.wi_nthreads); + cfs_spin_unlock(&cfs_wi_data.wi_glock); + + cfs_pause(cfs_time_seconds(1)); + + cfs_spin_lock(&cfs_wi_data.wi_glock); + } + cfs_spin_unlock(&cfs_wi_data.wi_glock); +#endif + LIBCFS_FREE(cfs_wi_data.wi_scheds, + cfs_wi_data.wi_nsched * sizeof(cfs_wi_sched_t)); + return; +} diff --git a/lnet/selftest/Makefile.in b/lnet/selftest/Makefile.in index 8ebef75..7acc249 100644 --- a/lnet/selftest/Makefile.in +++ b/lnet/selftest/Makefile.in @@ -1,6 +1,6 @@ MODULES := lnet_selftest -lnet_selftest-objs := console.o conrpc.o conctl.o framework.o timer.o rpc.o workitem.o module.o ping_test.o brw_test.o +lnet_selftest-objs := console.o conrpc.o conctl.o framework.o timer.o rpc.o module.o ping_test.o brw_test.o default: all diff --git a/lnet/selftest/autoMakefile.am b/lnet/selftest/autoMakefile.am index 36af901..24d92bcd1 100644 --- a/lnet/selftest/autoMakefile.am +++ b/lnet/selftest/autoMakefile.am @@ -1,5 +1,5 @@ my_sources = console.c conrpc.c conctl.c console.h conrpc.h \ - framework.c timer.c rpc.c workitem.c module.c \ + framework.c timer.c rpc.c module.c \ ping_test.c brw_test.c if LIBLUSTRE diff --git a/lnet/selftest/framework.c b/lnet/selftest/framework.c index f4de757..4bf02d9 100644 --- a/lnet/selftest/framework.c +++ b/lnet/selftest/framework.c @@ -319,7 +319,7 @@ sfw_server_rpc_done (srpc_server_rpc_t *rpc) "Incoming framework RPC done: " "service %s, peer %s, status %s:%d\n", sv->sv_name, libcfs_id2str(rpc->srpc_peer), - swi_state2str(rpc->srpc_wi.wi_state), + swi_state2str(rpc->srpc_wi.swi_state), status); if (rpc->srpc_bulk != NULL) @@ -341,7 +341,7 @@ sfw_client_rpc_fini (srpc_client_rpc_t *rpc) "Outgoing framework RPC done: " "service %d, peer %s, status %s:%d:%d\n", rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), - swi_state2str(rpc->crpc_wi.wi_state), + swi_state2str(rpc->crpc_wi.swi_state), rpc->crpc_aborted, rpc->crpc_status); cfs_spin_lock(&sfw_data.fw_lock); @@ -925,7 +925,7 @@ sfw_create_test_rpc (sfw_test_unit_t *tsu, lnet_process_id_t peer, int sfw_run_test (swi_workitem_t *wi) { - sfw_test_unit_t *tsu = wi->wi_data; + sfw_test_unit_t *tsu = wi->swi_workitem.wi_data; sfw_test_instance_t *tsi = tsu->tsu_instance; srpc_client_rpc_t *rpc = NULL; @@ -1000,7 +1000,8 @@ sfw_run_batch (sfw_batch_t *tsb) cfs_atomic_inc(&tsi->tsi_nactive); tsu->tsu_loop = tsi->tsi_loop; wi = &tsu->tsu_worker; - swi_init_workitem(wi, tsu, sfw_run_test); + swi_init_workitem(wi, tsu, sfw_run_test, + CFS_WI_SCHED_ANY); swi_schedule_workitem(wi); } } diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index 64fd3e0..24af911 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -47,7 +47,6 @@ typedef enum { SRPC_STATE_NONE, SRPC_STATE_NI_INIT, SRPC_STATE_EQ_INIT, - SRPC_STATE_WI_INIT, SRPC_STATE_RUNNING, SRPC_STATE_STOPPING, } srpc_state_t; @@ -190,7 +189,9 @@ srpc_init_server_rpc (srpc_server_rpc_t *rpc, srpc_service_t *sv, srpc_buffer_t *buffer) { memset(rpc, 0, sizeof(*rpc)); - swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc); + swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc, + sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID ? + CFS_WI_SCHED_SERIAL : CFS_WI_SCHED_ANY); rpc->srpc_ev.ev_fired = 1; /* no event expected now */ @@ -520,9 +521,9 @@ srpc_finish_service (srpc_service_t *sv) "wi %s scheduled %d running %d, " "ev fired %d type %d status %d lnet %d\n", rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), - swi_state2str(rpc->srpc_wi.wi_state), - rpc->srpc_wi.wi_scheduled, - rpc->srpc_wi.wi_running, + swi_state2str(rpc->srpc_wi.swi_state), + rpc->srpc_wi.swi_workitem.wi_scheduled, + rpc->srpc_wi.swi_workitem.wi_running, rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type, rpc->srpc_ev.ev_status, @@ -584,14 +585,7 @@ free: inline void srpc_schedule_server_rpc (srpc_server_rpc_t *rpc) { - srpc_service_t *sv = rpc->srpc_service; - - if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) - swi_schedule_workitem(&rpc->srpc_wi); - else /* framework RPCs are handled one by one */ - swi_schedule_serial_workitem(&rpc->srpc_wi); - - return; + swi_schedule_workitem(&rpc->srpc_wi); } void @@ -764,14 +758,14 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status) srpc_service_t *sv = rpc->srpc_service; srpc_buffer_t *buffer; - LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE); + LASSERT (status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE); rpc->srpc_status = status; CDEBUG (status == 0 ? D_NET : D_NETERROR, "Server RPC %p done: service %s, peer %s, status %s:%d\n", rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), - swi_state2str(rpc->srpc_wi.wi_state), status); + swi_state2str(rpc->srpc_wi.swi_state), status); if (status != 0) { cfs_spin_lock(&srpc_data.rpc_glock); @@ -823,7 +817,7 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status) int srpc_handle_rpc (swi_workitem_t *wi) { - srpc_server_rpc_t *rpc = wi->wi_data; + srpc_server_rpc_t *rpc = wi->swi_workitem.wi_data; srpc_service_t *sv = rpc->srpc_service; srpc_event_t *ev = &rpc->srpc_ev; int rc = 0; @@ -848,7 +842,7 @@ srpc_handle_rpc (swi_workitem_t *wi) cfs_spin_unlock(&sv->sv_lock); - switch (wi->wi_state) { + switch (wi->swi_state) { default: LBUG (); case SWI_STATE_NEWBORN: { @@ -878,7 +872,7 @@ srpc_handle_rpc (swi_workitem_t *wi) return 1; } - wi->wi_state = SWI_STATE_BULK_STARTED; + wi->swi_state = SWI_STATE_BULK_STARTED; if (rpc->srpc_bulk != NULL) { rc = srpc_do_bulk(rpc); @@ -904,7 +898,7 @@ srpc_handle_rpc (swi_workitem_t *wi) } } - wi->wi_state = SWI_STATE_REPLY_SUBMITTED; + wi->swi_state = SWI_STATE_REPLY_SUBMITTED; rc = srpc_send_reply(rpc); if (rc == 0) return 0; /* wait for reply */ @@ -920,7 +914,7 @@ srpc_handle_rpc (swi_workitem_t *wi) LASSERT (ev->ev_fired); } - wi->wi_state = SWI_STATE_DONE; + wi->swi_state = SWI_STATE_DONE; srpc_server_rpc_done(rpc, ev->ev_status); return 1; } @@ -1000,7 +994,7 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status) { swi_workitem_t *wi = &rpc->crpc_wi; - LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE); + LASSERT (status != 0 || wi->swi_state == SWI_STATE_DONE); cfs_spin_lock(&rpc->crpc_lock); @@ -1013,7 +1007,7 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status) CDEBUG ((status == 0) ? D_NET : D_NETERROR, "Client RPC done: service %d, peer %s, status %s:%d:%d\n", rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), - swi_state2str(wi->wi_state), rpc->crpc_aborted, status); + swi_state2str(wi->swi_state), rpc->crpc_aborted, status); /* * No one can schedule me now since: @@ -1037,7 +1031,7 @@ int srpc_send_rpc (swi_workitem_t *wi) { int rc = 0; - srpc_client_rpc_t *rpc = wi->wi_data; + srpc_client_rpc_t *rpc = wi->swi_workitem.wi_data; srpc_msg_t *reply = &rpc->crpc_replymsg; int do_bulk = rpc->crpc_bulk.bk_niov > 0; @@ -1053,7 +1047,7 @@ srpc_send_rpc (swi_workitem_t *wi) cfs_spin_unlock(&rpc->crpc_lock); - switch (wi->wi_state) { + switch (wi->swi_state) { default: LBUG (); case SWI_STATE_NEWBORN: @@ -1068,7 +1062,7 @@ srpc_send_rpc (swi_workitem_t *wi) rc = srpc_prepare_bulk(rpc); if (rc != 0) break; - wi->wi_state = SWI_STATE_REQUEST_SUBMITTED; + wi->swi_state = SWI_STATE_REQUEST_SUBMITTED; rc = srpc_send_request(rpc); break; @@ -1081,7 +1075,7 @@ srpc_send_rpc (swi_workitem_t *wi) rc = rpc->crpc_reqstev.ev_status; if (rc != 0) break; - wi->wi_state = SWI_STATE_REQUEST_SENT; + wi->swi_state = SWI_STATE_REQUEST_SENT; /* perhaps more events, fall thru */ case SWI_STATE_REQUEST_SENT: { srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service); @@ -1112,7 +1106,7 @@ srpc_send_rpc (swi_workitem_t *wi) LNetMDUnlink(rpc->crpc_bulk.bk_mdh); } - wi->wi_state = SWI_STATE_REPLY_RECEIVED; + wi->swi_state = SWI_STATE_REPLY_RECEIVED; } case SWI_STATE_REPLY_RECEIVED: if (do_bulk && !rpc->crpc_bulkev.ev_fired) break; @@ -1127,7 +1121,7 @@ srpc_send_rpc (swi_workitem_t *wi) rpc->crpc_status == 0 && reply->msg_body.reply.status != 0) rc = 0; - wi->wi_state = SWI_STATE_DONE; + wi->swi_state = SWI_STATE_DONE; srpc_client_rpc_done(rpc, rc); return 1; } @@ -1183,7 +1177,7 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why) CDEBUG (D_NET, "Aborting RPC: service %d, peer %s, state %s, why %d\n", rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), - swi_state2str(rpc->crpc_wi.wi_state), why); + swi_state2str(rpc->crpc_wi.swi_state), why); rpc->crpc_aborted = 1; rpc->crpc_status = why; @@ -1491,12 +1485,6 @@ srpc_startup (void) srpc_data.rpc_state = SRPC_STATE_EQ_INIT; - rc = swi_startup(); - if (rc != 0) - goto bail; - - srpc_data.rpc_state = SRPC_STATE_WI_INIT; - rc = stt_startup(); bail: @@ -1536,9 +1524,6 @@ srpc_shutdown (void) stt_shutdown(); - case SRPC_STATE_WI_INIT: - swi_shutdown(); - case SRPC_STATE_EQ_INIT: rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL); LASSERT (rc == 0); diff --git a/lnet/selftest/selftest.h b/lnet/selftest/selftest.h index 4f883c9..9856472 100644 --- a/lnet/selftest/selftest.h +++ b/lnet/selftest/selftest.h @@ -75,58 +75,10 @@ #define SWI_STATE_DONE 10 /* forward refs */ -struct swi_workitem; struct srpc_service; struct sfw_test_unit; struct sfw_test_instance; -/* - * A workitems is deferred work with these semantics: - * - a workitem always runs in thread context. - * - a workitem can be concurrent with other workitems but is strictly - * serialized with respect to itself. - * - no CPU affinity, a workitem does not necessarily run on the same CPU - * that schedules it. However, this might change in the future. - * - if a workitem is scheduled again before it has a chance to run, it - * runs only once. - * - if a workitem is scheduled while it runs, it runs again after it - * completes; this ensures that events occurring while other events are - * being processed receive due attention. This behavior also allows a - * workitem to reschedule itself. - * - * Usage notes: - * - a workitem can sleep but it should be aware of how that sleep might - * affect others. - * - a workitem runs inside a kernel thread so there's no user space to access. - * - do not use a workitem if the scheduling latency can't be tolerated. - * - * When wi_action returns non-zero, it means the workitem has either been - * freed or reused and workitem scheduler won't touch it any more. - */ -typedef int (*swi_action_t) (struct swi_workitem *); -typedef struct swi_workitem { - cfs_list_t wi_list; /* chain on runq */ - int wi_state; - swi_action_t wi_action; - void *wi_data; - unsigned int wi_running:1; - unsigned int wi_scheduled:1; -} swi_workitem_t; - -static inline void -swi_init_workitem (swi_workitem_t *wi, void *data, swi_action_t action) -{ - CFS_INIT_LIST_HEAD(&wi->wi_list); - - wi->wi_running = 0; - wi->wi_scheduled = 0; - wi->wi_data = data; - wi->wi_action = action; - wi->wi_state = SWI_STATE_NEWBORN; -} - -#define SWI_RESCHED 128 /* # workitem scheduler loops before reschedule */ - /* services below SRPC_FRAMEWORK_SERVICE_MAX_ID are framework * services, e.g. create/modify session. */ @@ -231,6 +183,15 @@ typedef struct { lnet_process_id_t buf_peer; } srpc_buffer_t; +struct swi_workitem; +typedef int (*swi_action_t) (struct swi_workitem *); + +typedef struct swi_workitem { + cfs_workitem_t swi_workitem; + swi_action_t swi_action; + int swi_state; +} swi_workitem_t; + /* server-side state of a RPC */ typedef struct srpc_server_rpc { cfs_list_t srpc_list; /* chain on srpc_service::*_rpcq */ @@ -417,7 +378,6 @@ typedef struct { sfw_test_client_ops_t *tsc_cli_ops; /* ops of test client */ } sfw_test_case_t; - srpc_client_rpc_t * sfw_create_rpc(lnet_process_id_t peer, int service, int nbulkiov, int bulklen, void (*done) (srpc_client_rpc_t *), void *priv); @@ -453,13 +413,45 @@ void srpc_service_remove_buffers(srpc_service_t *sv, int nbuffer); void srpc_get_counters(srpc_counters_t *cnt); void srpc_set_counters(const srpc_counters_t *cnt); -void swi_kill_workitem(swi_workitem_t *wi); -void swi_schedule_workitem(swi_workitem_t *wi); -void swi_schedule_serial_workitem(swi_workitem_t *wi); -int swi_startup(void); +static inline int +swi_wi_action(cfs_workitem_t *wi) +{ + swi_workitem_t *swi = container_of(wi, swi_workitem_t, swi_workitem); + + return swi->swi_action(swi); +} + +static inline void +swi_init_workitem (swi_workitem_t *swi, void *data, + swi_action_t action, short sched_id) +{ + swi->swi_action = action; + swi->swi_state = SWI_STATE_NEWBORN; + cfs_wi_init(&swi->swi_workitem, data, swi_wi_action, sched_id); +} + +static inline void +swi_schedule_workitem(swi_workitem_t *wi) +{ + cfs_wi_schedule(&wi->swi_workitem); +} + +static inline void +swi_kill_workitem(swi_workitem_t *swi) +{ + cfs_wi_exit(&swi->swi_workitem); +} + +#ifndef __KERNEL__ +static inline int +swi_check_events(void) +{ + return cfs_wi_check_events(); +} +#endif + int sfw_startup(void); int srpc_startup(void); -void swi_shutdown(void); void sfw_shutdown(void); void srpc_shutdown(void); @@ -494,7 +486,8 @@ srpc_init_client_rpc (srpc_client_rpc_t *rpc, lnet_process_id_t peer, crpc_bulk.bk_iovs[nbulkiov])); CFS_INIT_LIST_HEAD(&rpc->crpc_list); - swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc); + swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc, + CFS_WI_SCHED_ANY); cfs_spin_lock_init(&rpc->crpc_lock); cfs_atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */ @@ -547,7 +540,6 @@ int stt_poll_interval(void); int sfw_session_removed(void); int stt_check_events(void); -int swi_check_events(void); int srpc_check_event(int timeout); int lnet_selftest_init(void); diff --git a/lnet/selftest/workitem.c b/lnet/selftest/workitem.c deleted file mode 100644 index 762b8c4..0000000 --- a/lnet/selftest/workitem.c +++ /dev/null @@ -1,376 +0,0 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * - * GPL HEADER START - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License version 2 only, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * General Public License version 2 for more details (a copy is included - * in the LICENSE file that accompanied this code). - * - * You should have received a copy of the GNU General Public License - * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - * - * GPL HEADER END - */ -/* - * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. - * Use is subject to license terms. - */ -/* - * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. - * - * lnet/selftest/workitem.c - * - * Author: Isaac Huang - */ -#define DEBUG_SUBSYSTEM S_LNET - -#include "selftest.h" - - -struct smoketest_workitem { - cfs_list_t wi_runq; /* concurrent workitems */ - cfs_list_t wi_serial_runq; /* serialised workitems */ - cfs_waitq_t wi_waitq; /* where schedulers sleep */ - cfs_waitq_t wi_serial_waitq; /* where serial scheduler sleep */ - cfs_spinlock_t wi_lock; /* serialize */ - int wi_shuttingdown; - int wi_nthreads; -} swi_data; - -static inline int -swi_sched_cansleep (cfs_list_t *q) -{ - int rc; - - cfs_spin_lock(&swi_data.wi_lock); - - rc = !swi_data.wi_shuttingdown && cfs_list_empty(q); - - cfs_spin_unlock(&swi_data.wi_lock); - return rc; -} - -/* XXX: - * 0. it only works when called from wi->wi_action. - * 1. when it returns no one shall try to schedule the workitem. - */ -void -swi_kill_workitem (swi_workitem_t *wi) -{ - LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */ - LASSERT (!swi_data.wi_shuttingdown); - - cfs_spin_lock(&swi_data.wi_lock); - -#ifdef __KERNEL__ - LASSERT (wi->wi_running); -#endif - - if (wi->wi_scheduled) { /* cancel pending schedules */ - LASSERT (!cfs_list_empty(&wi->wi_list)); - cfs_list_del_init(&wi->wi_list); - } - - LASSERT (cfs_list_empty(&wi->wi_list)); - wi->wi_scheduled = 1; /* LBUG future schedule attempts */ - - cfs_spin_unlock(&swi_data.wi_lock); - return; -} - -void -swi_schedule_workitem (swi_workitem_t *wi) -{ - LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */ - LASSERT (!swi_data.wi_shuttingdown); - - cfs_spin_lock(&swi_data.wi_lock); - - if (!wi->wi_scheduled) { - LASSERT (cfs_list_empty(&wi->wi_list)); - - wi->wi_scheduled = 1; - cfs_list_add_tail(&wi->wi_list, &swi_data.wi_runq); - cfs_waitq_signal(&swi_data.wi_waitq); - } - - LASSERT (!cfs_list_empty(&wi->wi_list)); - cfs_spin_unlock(&swi_data.wi_lock); - return; -} - -/* - * Workitem scheduled by this function is strictly serialised not only with - * itself, but also with others scheduled this way. - * - * Now there's only one static serialised queue, but in the future more might - * be added, and even dynamic creation of serialised queues might be supported. - */ -void -swi_schedule_serial_workitem (swi_workitem_t *wi) -{ - LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */ - LASSERT (!swi_data.wi_shuttingdown); - - cfs_spin_lock(&swi_data.wi_lock); - - if (!wi->wi_scheduled) { - LASSERT (cfs_list_empty(&wi->wi_list)); - - wi->wi_scheduled = 1; - cfs_list_add_tail(&wi->wi_list, &swi_data.wi_serial_runq); - cfs_waitq_signal(&swi_data.wi_serial_waitq); - } - - LASSERT (!cfs_list_empty(&wi->wi_list)); - cfs_spin_unlock(&swi_data.wi_lock); - return; -} - -#ifdef __KERNEL__ - -int -swi_scheduler_main (void *arg) -{ - int id = (int)(long_ptr_t) arg; - char name[16]; - - snprintf(name, sizeof(name), "swi_sd%03d", id); - cfs_daemonize(name); - cfs_block_allsigs(); - - cfs_spin_lock(&swi_data.wi_lock); - - while (!swi_data.wi_shuttingdown) { - int nloops = 0; - int rc; - swi_workitem_t *wi; - - while (!cfs_list_empty(&swi_data.wi_runq) && - nloops < SWI_RESCHED) { - wi = cfs_list_entry(swi_data.wi_runq.next, - swi_workitem_t, wi_list); - cfs_list_del_init(&wi->wi_list); - - LASSERT (wi->wi_scheduled); - - nloops++; - if (wi->wi_running) { - cfs_list_add_tail(&wi->wi_list, - &swi_data.wi_runq); - continue; - } - - wi->wi_running = 1; - wi->wi_scheduled = 0; - cfs_spin_unlock(&swi_data.wi_lock); - - rc = (*wi->wi_action) (wi); - - cfs_spin_lock(&swi_data.wi_lock); - if (rc == 0) /* wi still active */ - wi->wi_running = 0; - } - - cfs_spin_unlock(&swi_data.wi_lock); - - if (nloops < SWI_RESCHED) - cfs_wait_event_interruptible_exclusive( - swi_data.wi_waitq, - !swi_sched_cansleep(&swi_data.wi_runq), rc); - else - cfs_cond_resched(); - - cfs_spin_lock(&swi_data.wi_lock); - } - - swi_data.wi_nthreads--; - cfs_spin_unlock(&swi_data.wi_lock); - return 0; -} - -int -swi_serial_scheduler_main (void *arg) -{ - UNUSED (arg); - - cfs_daemonize("swi_serial_sd"); - cfs_block_allsigs(); - - cfs_spin_lock(&swi_data.wi_lock); - - while (!swi_data.wi_shuttingdown) { - int nloops = 0; - int rc; - swi_workitem_t *wi; - - while (!cfs_list_empty(&swi_data.wi_serial_runq) && - nloops < SWI_RESCHED) { - wi = cfs_list_entry(swi_data.wi_serial_runq.next, - swi_workitem_t, wi_list); - cfs_list_del_init(&wi->wi_list); - - LASSERTF (!wi->wi_running && wi->wi_scheduled, - "wi %p running %d scheduled %d\n", - wi, wi->wi_running, wi->wi_scheduled); - - nloops++; - wi->wi_running = 1; - wi->wi_scheduled = 0; - cfs_spin_unlock(&swi_data.wi_lock); - - rc = (*wi->wi_action) (wi); - - cfs_spin_lock(&swi_data.wi_lock); - if (rc == 0) /* wi still active */ - wi->wi_running = 0; - } - - cfs_spin_unlock(&swi_data.wi_lock); - - if (nloops < SWI_RESCHED) - cfs_wait_event_interruptible_exclusive( - swi_data.wi_serial_waitq, - !swi_sched_cansleep(&swi_data.wi_serial_runq), - rc); - else - cfs_cond_resched(); - - cfs_spin_lock(&swi_data.wi_lock); - } - - swi_data.wi_nthreads--; - cfs_spin_unlock(&swi_data.wi_lock); - return 0; -} - -int -swi_start_thread (int (*func) (void*), void *arg) -{ - long pid; - - LASSERT (!swi_data.wi_shuttingdown); - - pid = cfs_kernel_thread(func, arg, 0); - if (pid < 0) - return (int)pid; - - cfs_spin_lock(&swi_data.wi_lock); - swi_data.wi_nthreads++; - cfs_spin_unlock(&swi_data.wi_lock); - return 0; -} - -#else /* __KERNEL__ */ - -int -swi_check_events (void) -{ - int n = 0; - swi_workitem_t *wi; - cfs_list_t *q; - - cfs_spin_lock(&swi_data.wi_lock); - - for (;;) { - if (!cfs_list_empty(&swi_data.wi_serial_runq)) - q = &swi_data.wi_serial_runq; - else if (!cfs_list_empty(&swi_data.wi_runq)) - q = &swi_data.wi_runq; - else - break; - - wi = cfs_list_entry(q->next, swi_workitem_t, wi_list); - cfs_list_del_init(&wi->wi_list); - - LASSERT (wi->wi_scheduled); - wi->wi_scheduled = 0; - cfs_spin_unlock(&swi_data.wi_lock); - - n++; - (*wi->wi_action) (wi); - - cfs_spin_lock(&swi_data.wi_lock); - } - - cfs_spin_unlock(&swi_data.wi_lock); - return n; -} - -#endif - -int -swi_startup (void) -{ - int i; - int rc; - - swi_data.wi_nthreads = 0; - swi_data.wi_shuttingdown = 0; - cfs_spin_lock_init(&swi_data.wi_lock); - cfs_waitq_init(&swi_data.wi_waitq); - cfs_waitq_init(&swi_data.wi_serial_waitq); - CFS_INIT_LIST_HEAD(&swi_data.wi_runq); - CFS_INIT_LIST_HEAD(&swi_data.wi_serial_runq); - -#ifdef __KERNEL__ - rc = swi_start_thread(swi_serial_scheduler_main, NULL); - if (rc != 0) { - LASSERT (swi_data.wi_nthreads == 0); - CERROR ("Can't spawn serial workitem scheduler: %d\n", rc); - return rc; - } - - for (i = 0; i < cfs_num_online_cpus(); i++) { - rc = swi_start_thread(swi_scheduler_main, - (void *) (long_ptr_t) i); - if (rc != 0) { - CERROR ("Can't spawn workitem scheduler: %d\n", rc); - swi_shutdown(); - return rc; - } - } -#else - UNUSED(i); - UNUSED(rc); -#endif - - return 0; -} - -void -swi_shutdown (void) -{ - cfs_spin_lock(&swi_data.wi_lock); - - LASSERT (cfs_list_empty(&swi_data.wi_runq)); - LASSERT (cfs_list_empty(&swi_data.wi_serial_runq)); - - swi_data.wi_shuttingdown = 1; - -#ifdef __KERNEL__ - cfs_waitq_broadcast(&swi_data.wi_waitq); - cfs_waitq_broadcast(&swi_data.wi_serial_waitq); - lst_wait_until(swi_data.wi_nthreads == 0, swi_data.wi_lock, - "waiting for %d threads to terminate\n", - swi_data.wi_nthreads); -#endif - - cfs_spin_unlock(&swi_data.wi_lock); - return; -} diff --git a/lnet/utils/lstclient.c b/lnet/utils/lstclient.c index 1d72000..900164f 100644 --- a/lnet/utils/lstclient.c +++ b/lnet/utils/lstclient.c @@ -202,9 +202,17 @@ main(int argc, char **argv) return -1; } + rc = cfs_wi_startup(); + if (rc != 0) { + CERROR("cfs_wi_startup() failed: %d\n", rc); + libcfs_debug_cleanup(); + return -1; + } + rc = LNetInit(); if (rc != 0) { CERROR("LNetInit() failed: %d\n", rc); + cfs_wi_shutdown(); libcfs_debug_cleanup(); return -1; } @@ -216,8 +224,8 @@ main(int argc, char **argv) if (rc != 0) { fprintf(stderr, "Can't startup selftest\n"); LNetFini(); + cfs_wi_shutdown(); libcfs_debug_cleanup(); - return -1; } @@ -244,6 +252,8 @@ out: LNetFini(); + cfs_wi_shutdown(); + libcfs_debug_cleanup(); return rc; -- 1.8.3.1