Whamcloud - gitweb
b=21954 workitem to libcfs
authorJian Yu <Jian.Yu@sun.com>
Thu, 30 Sep 2010 13:07:07 +0000 (21:07 +0800)
committerVitaly Fertman <vitaly.fertman@sun.com>
Thu, 30 Sep 2010 14:48:55 +0000 (18:48 +0400)
bug 21954 attachment 31455 and attachment 31665

o=zhen.liang
i=he.h.huang
i=maxim.patlasov

14 files changed:
libcfs/include/libcfs/Makefile.am
libcfs/include/libcfs/libcfs.h
libcfs/include/libcfs/libcfs_workitem.h [new file with mode: 0644]
libcfs/libcfs/Makefile.in
libcfs/libcfs/autoMakefile.am
libcfs/libcfs/module.c
libcfs/libcfs/workitem.c [new file with mode: 0644]
lnet/selftest/Makefile.in
lnet/selftest/autoMakefile.am
lnet/selftest/framework.c
lnet/selftest/rpc.c
lnet/selftest/selftest.h
lnet/selftest/workitem.c [deleted file]
lnet/utils/lstclient.c

index faf9c72..d074151 100644 (file)
@@ -10,4 +10,4 @@ EXTRA_DIST := curproc.h libcfs_private.h libcfs.h list.h lltrace.h \
                libcfs_prim.h libcfs_time.h libcfs_hash.h \
                libcfs_debug.h libcfsutil.h libcfs_ioctl.h \
                libcfs_pack.h libcfs_unpack.h libcfs_string.h \
-               libcfs_kernelcomm.h
+               libcfs_kernelcomm.h libcfs_workitem.h
index 5ec895a..06a4a37 100644 (file)
@@ -301,6 +301,7 @@ void cfs_get_random_bytes(void *buf, int size);
 #include <libcfs/libcfs_time.h>
 #include <libcfs/libcfs_string.h>
 #include <libcfs/libcfs_kernelcomm.h>
+#include <libcfs/libcfs_workitem.h>
 #include <libcfs/libcfs_hash.h>
 
 /* container_of depends on "likely" which is defined in libcfs_private.h */
diff --git a/libcfs/include/libcfs/libcfs_workitem.h b/libcfs/include/libcfs/libcfs_workitem.h
new file mode 100644 (file)
index 0000000..7448836
--- /dev/null
@@ -0,0 +1,118 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * libcfs/include/libcfs/libcfs_workitem.h
+ *
+ * Author: Isaac Huang  <he.h.huang@oracle.com>
+ *         Liang Zhen   <zhen.liang@sun.com>
+ *
+ * A workitems is deferred work with these semantics:
+ * - a workitem always runs in thread context.
+ * - a workitem can be concurrent with other workitems but is strictly
+ *   serialized with respect to itself.
+ * - no CPU affinity, a workitem does not necessarily run on the same CPU
+ *   that schedules it. However, this might change in the future.
+ * - if a workitem is scheduled again before it has a chance to run, it
+ *   runs only once.
+ * - if a workitem is scheduled while it runs, it runs again after it
+ *   completes; this ensures that events occurring while other events are
+ *   being processed receive due attention. This behavior also allows a
+ *   workitem to reschedule itself.
+ *
+ * Usage notes:
+ * - a workitem can sleep but it should be aware of how that sleep might
+ *   affect others.
+ * - a workitem runs inside a kernel thread so there's no user space to access.
+ * - do not use a workitem if the scheduling latency can't be tolerated.
+ *
+ * When wi_action returns non-zero, it means the workitem has either been
+ * freed or reused and workitem scheduler won't touch it any more.
+ */
+
+#ifndef __LIBCFS_WORKITEM_H__
+#define __LIBCFS_WORKITEM_H__
+
+struct cfs_workitem;
+
+typedef int (*cfs_wi_action_t) (struct cfs_workitem *);
+typedef struct cfs_workitem {
+        /** chain on runq or rerunq */
+        cfs_list_t       wi_list;
+        /** working function */
+        cfs_wi_action_t  wi_action;
+        /** arg for working function */
+        void            *wi_data;
+        /** scheduler id, can be negative */
+        short            wi_sched_id;
+        /** in running */
+        unsigned short   wi_running:1;
+        /** scheduled */
+        unsigned short   wi_scheduled:1;
+} cfs_workitem_t;
+
+/**
+ * positive values are reserved as CPU id of future implementation of
+ * per-cpu scheduler, so user can "bind" workitem on specific CPU.
+ */
+#define CFS_WI_SCHED_ANY        (-1)
+#define CFS_WI_SCHED_SERIAL     (-2)
+
+static inline void
+cfs_wi_init(cfs_workitem_t *wi, void *data,
+            cfs_wi_action_t action, short sched_id)
+{
+        CFS_INIT_LIST_HEAD(&wi->wi_list);
+
+        wi->wi_sched_id  = sched_id;
+        wi->wi_running   = 0;
+        wi->wi_scheduled = 0;
+        wi->wi_data      = data;
+        wi->wi_action    = action;
+}
+
+void cfs_wi_exit(cfs_workitem_t *wi);
+int  cfs_wi_cancel(cfs_workitem_t *wi);
+void cfs_wi_schedule(cfs_workitem_t *wi);
+int  cfs_wi_startup(void);
+void cfs_wi_shutdown(void);
+
+#ifdef __KERNEL__
+/** # workitem scheduler loops before reschedule */
+#define CFS_WI_RESCHED    128
+#else
+int cfs_wi_check_events(void);
+#endif
+
+#endif /* __LIBCFS_WORKITEM_H__ */
index fbeefdd..b794923 100644 (file)
@@ -25,7 +25,7 @@ sources:
 endif
 
 libcfs-all-objs := debug.o nidstrings.o lwt.o module.o tracefile.o watchdog.o \
-       libcfs_string.o hash.o kernel_user_comm.o prng.o
+       libcfs_string.o hash.o kernel_user_comm.o prng.o workitem.o
 
 libcfs-objs := $(libcfs-linux-objs) $(libcfs-all-objs)
 
index e15394a..7712a64 100644 (file)
@@ -43,7 +43,8 @@ DIST_SUBDIRS := linux util posix darwin
 if LIBLUSTRE
 noinst_LIBRARIES= libcfs.a
 libcfs_a_SOURCES= posix/posix-debug.c user-prim.c user-lock.c user-tcpip.c \
-                  prng.c user-bitops.c user-mem.c hash.c kernel_user_comm.c
+                  prng.c user-bitops.c user-mem.c hash.c kernel_user_comm.c \
+                  workitem.c
 libcfs_a_CPPFLAGS = $(LLCPPFLAGS)
 libcfs_a_CFLAGS = $(LLCFLAGS)
 endif
@@ -89,5 +90,5 @@ EXTRA_DIST := Info.plist
 
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ linux-*.c linux/*.o darwin/*.o libcfs
 DIST_SOURCES := $(libcfs-all-objs:%.o=%.c) tracefile.h user-prim.c prng.c \
-      user-lock.c user-tcpip.c user-bitops.c\
+      user-lock.c user-tcpip.c user-bitops.c workitem.c \
       user-mem.c kernel_user_comm.c linux/linux-tracefile.h
index 8e773d3..a25df4a 100644 (file)
@@ -406,15 +406,23 @@ static int init_libcfs_module(void)
                 goto cleanup_lwt;
         }
 
+        rc = cfs_wi_startup();
+        if (rc) {
+                CERROR("startup workitem: error %d\n", rc);
+                goto cleanup_deregister;
+        }
+
         rc = insert_proc();
         if (rc) {
                 CERROR("insert_proc: error %d\n", rc);
-                goto cleanup_deregister;
+                goto cleanup_wi;
         }
 
         CDEBUG (D_OTHER, "portals setup OK\n");
         return (0);
 
+ cleanup_wi:
+        cfs_wi_shutdown();
  cleanup_deregister:
         cfs_psdev_deregister(&libcfs_dev);
  cleanup_lwt:
@@ -435,6 +443,7 @@ static void exit_libcfs_module(void)
         CDEBUG(D_MALLOC, "before Portals cleanup: kmem %d\n",
                cfs_atomic_read(&libcfs_kmemory));
 
+        cfs_wi_shutdown();
         rc = cfs_psdev_deregister(&libcfs_dev);
         if (rc)
                 CERROR("misc_deregister error %d\n", rc);
diff --git a/libcfs/libcfs/workitem.c b/libcfs/libcfs/workitem.c
new file mode 100644 (file)
index 0000000..6533867
--- /dev/null
@@ -0,0 +1,478 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * libcfs/libcfs/workitem.c
+ *
+ * Author: Isaac Huang <isaac@clusterfs.com>
+ *         Liang Zhen  <zhen.liang@sun.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_LNET
+
+#include <libcfs/libcfs.h>
+
+typedef struct cfs_wi_sched {
+#ifdef __KERNEL__
+        /** serialised workitems */
+        cfs_spinlock_t  ws_lock;
+        /** where schedulers sleep */
+        cfs_waitq_t     ws_waitq;
+#endif
+        /** concurrent workitems */
+        cfs_list_t      ws_runq;
+        /** rescheduled running-workitems */
+        cfs_list_t      ws_rerunq;
+        /** shutting down */
+        int             ws_shuttingdown;
+} cfs_wi_sched_t;
+
+#ifdef __KERNEL__
+/**
+ * we have 2 cfs_wi_sched_t so far:
+ * one for CFS_WI_SCHED_ANY, another for CFS_WI_SCHED_SERIAL
+ * per-cpu implementation will be added for SMP scalability
+ */
+
+#define CFS_WI_NSCHED   2
+#else
+/** always 2 for userspace */
+#define CFS_WI_NSCHED   2
+#endif /* __KERNEL__ */
+
+struct cfs_workitem_data {
+        /** serialize */
+        cfs_spinlock_t  wi_glock;
+        /** number of cfs_wi_sched_t */
+        int             wi_nsched;
+        /** number of threads (all schedulers) */
+        int             wi_nthreads;
+        /** default scheduler */
+        cfs_wi_sched_t *wi_scheds;
+} cfs_wi_data;
+
+static inline cfs_wi_sched_t *
+cfs_wi_to_sched(cfs_workitem_t *wi)
+{
+        LASSERT(wi->wi_sched_id == CFS_WI_SCHED_ANY ||
+                wi->wi_sched_id == CFS_WI_SCHED_SERIAL ||
+                (wi->wi_sched_id >= 0 &&
+                 wi->wi_sched_id < cfs_wi_data.wi_nsched));
+
+        if (wi->wi_sched_id == CFS_WI_SCHED_ANY)
+                return &cfs_wi_data.wi_scheds[0];
+        if (wi->wi_sched_id == CFS_WI_SCHED_SERIAL)
+                return &cfs_wi_data.wi_scheds[cfs_wi_data.wi_nsched - 1];
+
+        return &cfs_wi_data.wi_scheds[wi->wi_sched_id];
+}
+
+#ifdef __KERNEL__
+static inline void
+cfs_wi_sched_lock(cfs_wi_sched_t *sched)
+{
+        cfs_spin_lock(&sched->ws_lock);
+}
+
+static inline void
+cfs_wi_sched_unlock(cfs_wi_sched_t *sched)
+{
+        cfs_spin_unlock(&sched->ws_lock);
+}
+
+static inline int
+cfs_wi_sched_cansleep(cfs_wi_sched_t *sched)
+{
+        cfs_wi_sched_lock(sched);
+        if (sched->ws_shuttingdown) {
+                cfs_wi_sched_unlock(sched);
+                return 0;
+        }
+
+        if (!cfs_list_empty(&sched->ws_runq)) {
+                cfs_wi_sched_unlock(sched);
+                return 0;
+        }
+        cfs_wi_sched_unlock(sched);
+        return 1;
+}
+
+#else
+
+static inline void
+cfs_wi_sched_lock(cfs_wi_sched_t *sched)
+{
+        cfs_spin_lock(&cfs_wi_data.wi_glock);
+}
+
+static inline void
+cfs_wi_sched_unlock(cfs_wi_sched_t *sched)
+{
+        cfs_spin_unlock(&cfs_wi_data.wi_glock);
+}
+
+#endif
+
+/* XXX:
+ * 0. it only works when called from wi->wi_action.
+ * 1. when it returns no one shall try to schedule the workitem.
+ */
+void
+cfs_wi_exit(cfs_workitem_t *wi)
+{
+        cfs_wi_sched_t *sched = cfs_wi_to_sched(wi);
+
+        LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
+        LASSERT (!sched->ws_shuttingdown);
+
+        cfs_wi_sched_lock(sched);
+
+#ifdef __KERNEL__
+        LASSERT (wi->wi_running);
+#endif
+        if (wi->wi_scheduled) { /* cancel pending schedules */
+                LASSERT (!cfs_list_empty(&wi->wi_list));
+                cfs_list_del_init(&wi->wi_list);
+        }
+
+        LASSERT (cfs_list_empty(&wi->wi_list));
+        wi->wi_scheduled = 1; /* LBUG future schedule attempts */
+
+        cfs_wi_sched_unlock(sched);
+        return;
+}
+CFS_EXPORT_SYMBOL(cfs_wi_exit);
+
+/**
+ * cancel a workitem:
+ */
+int
+cfs_wi_cancel (cfs_workitem_t *wi)
+{
+        cfs_wi_sched_t *sched = cfs_wi_to_sched(wi);
+        int             rc;
+
+        LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
+        LASSERT (!sched->ws_shuttingdown);
+
+        cfs_wi_sched_lock(sched);
+        /*
+         * return 0 if it's running already, otherwise return 1, which
+         * means the workitem will not be scheduled and will not have
+         * any race with wi_action.
+         */
+        rc = !(wi->wi_running);
+
+        if (wi->wi_scheduled) { /* cancel pending schedules */
+                LASSERT (!cfs_list_empty(&wi->wi_list));
+                cfs_list_del_init(&wi->wi_list);
+                wi->wi_scheduled = 0;
+        }
+
+        LASSERT (cfs_list_empty(&wi->wi_list));
+
+        cfs_wi_sched_unlock(sched);
+        return rc;
+}
+
+CFS_EXPORT_SYMBOL(cfs_wi_cancel);
+
+/*
+ * Workitem scheduled with (serial == 1) is strictly serialised not only with
+ * itself, but also with others scheduled this way.
+ *
+ * Now there's only one static serialised queue, but in the future more might
+ * be added, and even dynamic creation of serialised queues might be supported.
+ */
+void
+cfs_wi_schedule(cfs_workitem_t *wi)
+{
+        cfs_wi_sched_t *sched = cfs_wi_to_sched(wi);
+
+        LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
+        LASSERT (!sched->ws_shuttingdown);
+
+        cfs_wi_sched_lock(sched);
+
+        if (!wi->wi_scheduled) {
+                LASSERT (cfs_list_empty(&wi->wi_list));
+
+                wi->wi_scheduled = 1;
+                if (!wi->wi_running) {
+                        cfs_list_add_tail(&wi->wi_list, &sched->ws_runq);
+#ifdef __KERNEL__
+                        cfs_waitq_signal(&sched->ws_waitq);
+#endif
+                } else {
+                        cfs_list_add(&wi->wi_list, &sched->ws_rerunq);
+                }
+        }
+
+        LASSERT (!cfs_list_empty(&wi->wi_list));
+        cfs_wi_sched_unlock(sched);
+        return;
+}
+
+CFS_EXPORT_SYMBOL(cfs_wi_schedule);
+
+#ifdef __KERNEL__
+
+static int
+cfs_wi_scheduler (void *arg)
+{
+        int             id     = (int)(long_ptr_t) arg;
+        int             serial = (id == -1);
+        char            name[24];
+        cfs_wi_sched_t *sched;
+
+        if (serial) {
+                sched = &cfs_wi_data.wi_scheds[cfs_wi_data.wi_nsched - 1];
+                cfs_daemonize("wi_serial_sd");
+        } else {
+                /* will be sched = &cfs_wi_data.wi_scheds[id] in the future */
+                sched = &cfs_wi_data.wi_scheds[0];
+                snprintf(name, sizeof(name), "cfs_wi_sd%03d", id);
+                cfs_daemonize(name);
+        }
+
+        cfs_block_allsigs();
+
+        cfs_wi_sched_lock(sched);
+
+        while (!sched->ws_shuttingdown) {
+                int             nloops = 0;
+                int             rc;
+                cfs_workitem_t *wi;
+
+                while (!cfs_list_empty(&sched->ws_runq) &&
+                       nloops < CFS_WI_RESCHED) {
+                        wi = cfs_list_entry(sched->ws_runq.next,
+                                            cfs_workitem_t, wi_list);
+                        LASSERT (wi->wi_scheduled && !wi->wi_running);
+
+                        cfs_list_del_init(&wi->wi_list);
+
+                        wi->wi_running   = 1;
+                        wi->wi_scheduled = 0;
+                        cfs_wi_sched_unlock(sched);
+                        nloops++;
+
+                        rc = (*wi->wi_action) (wi);
+
+                        cfs_wi_sched_lock(sched);
+                        if (rc != 0) /* WI should be dead, even be freed! */
+                                continue;
+
+                        wi->wi_running = 0;
+                        if (cfs_list_empty(&wi->wi_list))
+                                continue;
+
+                        LASSERT (wi->wi_scheduled);
+                        /* wi is rescheduled, should be on rerunq now, we
+                         * move it to runq so it can run action now */
+                        cfs_list_move_tail(&wi->wi_list, &sched->ws_runq);
+                }
+
+                if (!cfs_list_empty(&sched->ws_runq)) {
+                        cfs_wi_sched_unlock(sched);
+                        /* don't sleep because some workitems still
+                         * expect me to come back soon */
+                        cfs_cond_resched();
+                        cfs_wi_sched_lock(sched);
+                        continue;
+                }
+
+                cfs_wi_sched_unlock(sched);
+                cfs_wait_event_interruptible_exclusive(sched->ws_waitq,
+                                !cfs_wi_sched_cansleep(sched), rc);
+                cfs_wi_sched_lock(sched);
+        }
+
+        cfs_wi_sched_unlock(sched);
+
+        cfs_spin_lock(&cfs_wi_data.wi_glock);
+        cfs_wi_data.wi_nthreads--;
+        cfs_spin_unlock(&cfs_wi_data.wi_glock);
+        return 0;
+}
+
+static int
+cfs_wi_start_thread (int (*func) (void*), void *arg)
+{
+        long pid;
+
+        pid = cfs_kernel_thread(func, arg, 0);
+        if (pid < 0)
+                return (int)pid;
+
+        cfs_spin_lock(&cfs_wi_data.wi_glock);
+        cfs_wi_data.wi_nthreads++;
+        cfs_spin_unlock(&cfs_wi_data.wi_glock);
+        return 0;
+}
+
+#else /* __KERNEL__ */
+
+int
+cfs_wi_check_events (void)
+{
+        int               n = 0;
+        cfs_workitem_t   *wi;
+        cfs_list_t       *q;
+
+        cfs_spin_lock(&cfs_wi_data.wi_glock);
+
+        for (;;) {
+                /** rerunq is always empty for userspace */
+                if (!cfs_list_empty(&cfs_wi_data.wi_scheds[1].ws_runq))
+                        q = &cfs_wi_data.wi_scheds[1].ws_runq;
+                else if (!cfs_list_empty(&cfs_wi_data.wi_scheds[0].ws_runq))
+                        q = &cfs_wi_data.wi_scheds[0].ws_runq;
+                else
+                        break;
+
+                wi = cfs_list_entry(q->next, cfs_workitem_t, wi_list);
+                cfs_list_del_init(&wi->wi_list);
+
+                LASSERT (wi->wi_scheduled);
+                wi->wi_scheduled = 0;
+                cfs_spin_unlock(&cfs_wi_data.wi_glock);
+
+                n++;
+                (*wi->wi_action) (wi);
+
+                cfs_spin_lock(&cfs_wi_data.wi_glock);
+        }
+
+        cfs_spin_unlock(&cfs_wi_data.wi_glock);
+        return n;
+}
+
+#endif
+
+static void
+cfs_wi_sched_init(cfs_wi_sched_t *sched)
+{
+        sched->ws_shuttingdown = 0;
+#ifdef __KERNEL__
+        cfs_spin_lock_init(&sched->ws_lock);
+        cfs_waitq_init(&sched->ws_waitq);
+#endif
+        CFS_INIT_LIST_HEAD(&sched->ws_runq);
+        CFS_INIT_LIST_HEAD(&sched->ws_rerunq);
+}
+
+static void
+cfs_wi_sched_shutdown(cfs_wi_sched_t *sched)
+{
+        cfs_wi_sched_lock(sched);
+
+        LASSERT(cfs_list_empty(&sched->ws_runq));
+        LASSERT(cfs_list_empty(&sched->ws_rerunq));
+
+        sched->ws_shuttingdown = 1;
+
+#ifdef __KERNEL__
+        cfs_waitq_broadcast(&sched->ws_waitq);
+#endif
+        cfs_wi_sched_unlock(sched);
+}
+
+
+int
+cfs_wi_startup (void)
+{
+        int i;
+        int n;
+        int rc;
+
+        cfs_wi_data.wi_nthreads = 0;
+        cfs_wi_data.wi_nsched   = CFS_WI_NSCHED;
+        LIBCFS_ALLOC(cfs_wi_data.wi_scheds,
+                     cfs_wi_data.wi_nsched * sizeof(cfs_wi_sched_t));
+        if (cfs_wi_data.wi_scheds == NULL)
+                return -ENOMEM;
+
+        cfs_spin_lock_init(&cfs_wi_data.wi_glock);
+        for (i = 0; i < cfs_wi_data.wi_nsched; i++)
+                cfs_wi_sched_init(&cfs_wi_data.wi_scheds[i]);
+
+#ifdef __KERNEL__
+        n = cfs_num_online_cpus();
+        for (i = 0; i <= n; i++) {
+                rc = cfs_wi_start_thread(cfs_wi_scheduler,
+                                         (void *)(long_ptr_t)(i == n ? -1 : i));
+                if (rc != 0) {
+                        CERROR ("Can't spawn workitem scheduler: %d\n", rc);
+                        cfs_wi_shutdown();
+                        return rc;
+                }
+        }
+#else
+        n = rc = 0;
+#endif
+
+        return 0;
+}
+
+void
+cfs_wi_shutdown (void)
+{
+        int i;
+
+        if (cfs_wi_data.wi_scheds == NULL)
+                return;
+
+        for (i = 0; i < cfs_wi_data.wi_nsched; i++)
+                cfs_wi_sched_shutdown(&cfs_wi_data.wi_scheds[i]);
+
+#ifdef __KERNEL__
+        cfs_spin_lock(&cfs_wi_data.wi_glock);
+        i = 2;
+        while (cfs_wi_data.wi_nthreads != 0) {
+                CDEBUG(IS_PO2(++i) ? D_WARNING : D_NET,
+                       "waiting for %d threads to terminate\n",
+                       cfs_wi_data.wi_nthreads);
+                cfs_spin_unlock(&cfs_wi_data.wi_glock);
+
+                cfs_pause(cfs_time_seconds(1));
+
+                cfs_spin_lock(&cfs_wi_data.wi_glock);
+        }
+        cfs_spin_unlock(&cfs_wi_data.wi_glock);
+#endif
+        LIBCFS_FREE(cfs_wi_data.wi_scheds,
+                    cfs_wi_data.wi_nsched * sizeof(cfs_wi_sched_t));
+        return;
+}
index 8ebef75..7acc249 100644 (file)
@@ -1,6 +1,6 @@
 MODULES := lnet_selftest
 
-lnet_selftest-objs := console.o conrpc.o conctl.o framework.o timer.o rpc.o workitem.o module.o ping_test.o brw_test.o
+lnet_selftest-objs := console.o conrpc.o conctl.o framework.o timer.o rpc.o module.o ping_test.o brw_test.o
 
 default: all
 
index 36af901..24d92bc 100644 (file)
@@ -1,5 +1,5 @@
 my_sources = console.c conrpc.c conctl.c console.h conrpc.h \
-            framework.c timer.c rpc.c workitem.c module.c \
+            framework.c timer.c rpc.c module.c \
             ping_test.c brw_test.c
 
 if LIBLUSTRE
index f4de757..4bf02d9 100644 (file)
@@ -319,7 +319,7 @@ sfw_server_rpc_done (srpc_server_rpc_t *rpc)
                 "Incoming framework RPC done: "
                 "service %s, peer %s, status %s:%d\n",
                 sv->sv_name, libcfs_id2str(rpc->srpc_peer),
-                swi_state2str(rpc->srpc_wi.wi_state),
+                swi_state2str(rpc->srpc_wi.swi_state),
                 status);
 
         if (rpc->srpc_bulk != NULL)
@@ -341,7 +341,7 @@ sfw_client_rpc_fini (srpc_client_rpc_t *rpc)
                 "Outgoing framework RPC done: "
                 "service %d, peer %s, status %s:%d:%d\n",
                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
-                swi_state2str(rpc->crpc_wi.wi_state),
+                swi_state2str(rpc->crpc_wi.swi_state),
                 rpc->crpc_aborted, rpc->crpc_status);
 
         cfs_spin_lock(&sfw_data.fw_lock);
@@ -925,7 +925,7 @@ sfw_create_test_rpc (sfw_test_unit_t *tsu, lnet_process_id_t peer,
 int
 sfw_run_test (swi_workitem_t *wi)
 {
-        sfw_test_unit_t     *tsu = wi->wi_data;
+        sfw_test_unit_t     *tsu = wi->swi_workitem.wi_data;
         sfw_test_instance_t *tsi = tsu->tsu_instance;
         srpc_client_rpc_t   *rpc = NULL;
 
@@ -1000,7 +1000,8 @@ sfw_run_batch (sfw_batch_t *tsb)
                         cfs_atomic_inc(&tsi->tsi_nactive);
                         tsu->tsu_loop = tsi->tsi_loop;
                         wi = &tsu->tsu_worker;
-                        swi_init_workitem(wi, tsu, sfw_run_test);
+                        swi_init_workitem(wi, tsu, sfw_run_test,
+                                          CFS_WI_SCHED_ANY);
                         swi_schedule_workitem(wi);
                 }
         }
index 64fd3e0..24af911 100644 (file)
@@ -47,7 +47,6 @@ typedef enum {
         SRPC_STATE_NONE,
         SRPC_STATE_NI_INIT,
         SRPC_STATE_EQ_INIT,
-        SRPC_STATE_WI_INIT,
         SRPC_STATE_RUNNING,
         SRPC_STATE_STOPPING,
 } srpc_state_t;
@@ -190,7 +189,9 @@ srpc_init_server_rpc (srpc_server_rpc_t *rpc,
                       srpc_service_t *sv, srpc_buffer_t *buffer)
 {
         memset(rpc, 0, sizeof(*rpc));
-        swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc);
+        swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc,
+                          sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID ?
+                          CFS_WI_SCHED_SERIAL : CFS_WI_SCHED_ANY);
 
         rpc->srpc_ev.ev_fired = 1; /* no event expected now */
 
@@ -520,9 +521,9 @@ srpc_finish_service (srpc_service_t *sv)
                                 "wi %s scheduled %d running %d, "
                                 "ev fired %d type %d status %d lnet %d\n",
                                 rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
-                                swi_state2str(rpc->srpc_wi.wi_state),
-                                rpc->srpc_wi.wi_scheduled,
-                                rpc->srpc_wi.wi_running,
+                                swi_state2str(rpc->srpc_wi.swi_state),
+                                rpc->srpc_wi.swi_workitem.wi_scheduled,
+                                rpc->srpc_wi.swi_workitem.wi_running,
                                 rpc->srpc_ev.ev_fired,
                                 rpc->srpc_ev.ev_type,
                                 rpc->srpc_ev.ev_status,
@@ -584,14 +585,7 @@ free:
 inline void
 srpc_schedule_server_rpc (srpc_server_rpc_t *rpc)
 {
-        srpc_service_t *sv = rpc->srpc_service;
-
-        if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID)
-                swi_schedule_workitem(&rpc->srpc_wi);
-        else    /* framework RPCs are handled one by one */
-                swi_schedule_serial_workitem(&rpc->srpc_wi);
-
-        return;
+        swi_schedule_workitem(&rpc->srpc_wi);
 }
 
 void
@@ -764,14 +758,14 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
         srpc_service_t *sv = rpc->srpc_service;
         srpc_buffer_t  *buffer;
 
-        LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE);
+        LASSERT (status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
 
         rpc->srpc_status = status;
 
         CDEBUG (status == 0 ? D_NET : D_NETERROR,
                 "Server RPC %p done: service %s, peer %s, status %s:%d\n",
                 rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
-                swi_state2str(rpc->srpc_wi.wi_state), status);
+                swi_state2str(rpc->srpc_wi.swi_state), status);
 
         if (status != 0) {
                 cfs_spin_lock(&srpc_data.rpc_glock);
@@ -823,7 +817,7 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status)
 int
 srpc_handle_rpc (swi_workitem_t *wi)
 {
-        srpc_server_rpc_t *rpc = wi->wi_data;
+        srpc_server_rpc_t *rpc = wi->swi_workitem.wi_data;
         srpc_service_t    *sv = rpc->srpc_service;
         srpc_event_t      *ev = &rpc->srpc_ev;
         int                rc = 0;
@@ -848,7 +842,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
 
         cfs_spin_unlock(&sv->sv_lock);
 
-        switch (wi->wi_state) {
+        switch (wi->swi_state) {
         default:
                 LBUG ();
         case SWI_STATE_NEWBORN: {
@@ -878,7 +872,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
                         return 1;
                 }
 
-                wi->wi_state = SWI_STATE_BULK_STARTED;
+                wi->swi_state = SWI_STATE_BULK_STARTED;
 
                 if (rpc->srpc_bulk != NULL) {
                         rc = srpc_do_bulk(rpc);
@@ -904,7 +898,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
                         }
                 }
 
-                wi->wi_state = SWI_STATE_REPLY_SUBMITTED;
+                wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
                 rc = srpc_send_reply(rpc);
                 if (rc == 0)
                         return 0; /* wait for reply */
@@ -920,7 +914,7 @@ srpc_handle_rpc (swi_workitem_t *wi)
                         LASSERT (ev->ev_fired);
                 }
 
-                wi->wi_state = SWI_STATE_DONE;
+                wi->swi_state = SWI_STATE_DONE;
                 srpc_server_rpc_done(rpc, ev->ev_status);
                 return 1;
         }
@@ -1000,7 +994,7 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
 {
         swi_workitem_t *wi = &rpc->crpc_wi;
 
-        LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE);
+        LASSERT (status != 0 || wi->swi_state == SWI_STATE_DONE);
 
         cfs_spin_lock(&rpc->crpc_lock);
 
@@ -1013,7 +1007,7 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status)
         CDEBUG ((status == 0) ? D_NET : D_NETERROR,
                 "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
-                swi_state2str(wi->wi_state), rpc->crpc_aborted, status);
+                swi_state2str(wi->swi_state), rpc->crpc_aborted, status);
 
         /*
          * No one can schedule me now since:
@@ -1037,7 +1031,7 @@ int
 srpc_send_rpc (swi_workitem_t *wi)
 {
         int                rc = 0;
-        srpc_client_rpc_t *rpc = wi->wi_data;
+        srpc_client_rpc_t *rpc = wi->swi_workitem.wi_data;
         srpc_msg_t        *reply = &rpc->crpc_replymsg;
         int                do_bulk = rpc->crpc_bulk.bk_niov > 0;
 
@@ -1053,7 +1047,7 @@ srpc_send_rpc (swi_workitem_t *wi)
 
         cfs_spin_unlock(&rpc->crpc_lock);
 
-        switch (wi->wi_state) {
+        switch (wi->swi_state) {
         default:
                 LBUG ();
         case SWI_STATE_NEWBORN:
@@ -1068,7 +1062,7 @@ srpc_send_rpc (swi_workitem_t *wi)
                 rc = srpc_prepare_bulk(rpc);
                 if (rc != 0) break;
 
-                wi->wi_state = SWI_STATE_REQUEST_SUBMITTED;
+                wi->swi_state = SWI_STATE_REQUEST_SUBMITTED;
                 rc = srpc_send_request(rpc);
                 break;
 
@@ -1081,7 +1075,7 @@ srpc_send_rpc (swi_workitem_t *wi)
                 rc = rpc->crpc_reqstev.ev_status;
                 if (rc != 0) break;
 
-                wi->wi_state = SWI_STATE_REQUEST_SENT;
+                wi->swi_state = SWI_STATE_REQUEST_SENT;
                 /* perhaps more events, fall thru */
         case SWI_STATE_REQUEST_SENT: {
                 srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service);
@@ -1112,7 +1106,7 @@ srpc_send_rpc (swi_workitem_t *wi)
                         LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
                 }
 
-                wi->wi_state = SWI_STATE_REPLY_RECEIVED;
+                wi->swi_state = SWI_STATE_REPLY_RECEIVED;
         }
         case SWI_STATE_REPLY_RECEIVED:
                 if (do_bulk && !rpc->crpc_bulkev.ev_fired) break;
@@ -1127,7 +1121,7 @@ srpc_send_rpc (swi_workitem_t *wi)
                     rpc->crpc_status == 0 && reply->msg_body.reply.status != 0)
                         rc = 0;
 
-                wi->wi_state = SWI_STATE_DONE;
+                wi->swi_state = SWI_STATE_DONE;
                 srpc_client_rpc_done(rpc, rc);
                 return 1;
         }
@@ -1183,7 +1177,7 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why)
         CDEBUG (D_NET,
                 "Aborting RPC: service %d, peer %s, state %s, why %d\n",
                 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
-                swi_state2str(rpc->crpc_wi.wi_state), why);
+                swi_state2str(rpc->crpc_wi.swi_state), why);
 
         rpc->crpc_aborted = 1;
         rpc->crpc_status  = why;
@@ -1491,12 +1485,6 @@ srpc_startup (void)
 
         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
 
-        rc = swi_startup();
-        if (rc != 0)
-                goto bail;
-
-        srpc_data.rpc_state = SRPC_STATE_WI_INIT;
-
         rc = stt_startup();
 
 bail:
@@ -1536,9 +1524,6 @@ srpc_shutdown (void)
 
                 stt_shutdown();
 
-        case SRPC_STATE_WI_INIT:
-                swi_shutdown();
-
         case SRPC_STATE_EQ_INIT:
                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
                 LASSERT (rc == 0);
index 4f883c9..9856472 100644 (file)
 #define SWI_STATE_DONE                     10
 
 /* forward refs */
-struct swi_workitem;
 struct srpc_service;
 struct sfw_test_unit;
 struct sfw_test_instance;
 
-/*
- * A workitems is deferred work with these semantics:
- * - a workitem always runs in thread context.
- * - a workitem can be concurrent with other workitems but is strictly
- *   serialized with respect to itself.
- * - no CPU affinity, a workitem does not necessarily run on the same CPU
- *   that schedules it. However, this might change in the future.
- * - if a workitem is scheduled again before it has a chance to run, it
- *   runs only once.
- * - if a workitem is scheduled while it runs, it runs again after it
- *   completes; this ensures that events occurring while other events are
- *   being processed receive due attention. This behavior also allows a
- *   workitem to reschedule itself.
- *
- * Usage notes:
- * - a workitem can sleep but it should be aware of how that sleep might
- *   affect others.
- * - a workitem runs inside a kernel thread so there's no user space to access.
- * - do not use a workitem if the scheduling latency can't be tolerated.
- *
- * When wi_action returns non-zero, it means the workitem has either been
- * freed or reused and workitem scheduler won't touch it any more.
- */
-typedef int (*swi_action_t) (struct swi_workitem *);
-typedef struct swi_workitem {
-        cfs_list_t       wi_list;        /* chain on runq */
-        int              wi_state;
-        swi_action_t     wi_action;
-        void            *wi_data;
-        unsigned int     wi_running:1;
-        unsigned int     wi_scheduled:1;
-} swi_workitem_t;
-
-static inline void
-swi_init_workitem (swi_workitem_t *wi, void *data, swi_action_t action)
-{
-        CFS_INIT_LIST_HEAD(&wi->wi_list);
-
-        wi->wi_running   = 0;
-        wi->wi_scheduled = 0;
-        wi->wi_data      = data;
-        wi->wi_action    = action;
-        wi->wi_state     = SWI_STATE_NEWBORN;
-}
-
-#define SWI_RESCHED    128         /* # workitem scheduler loops before reschedule */
-
 /* services below SRPC_FRAMEWORK_SERVICE_MAX_ID are framework
  * services, e.g. create/modify session.
  */
@@ -231,6 +183,15 @@ typedef struct {
         lnet_process_id_t    buf_peer;
 } srpc_buffer_t;
 
+struct swi_workitem;
+typedef int (*swi_action_t) (struct swi_workitem *);
+
+typedef struct swi_workitem {
+        cfs_workitem_t       swi_workitem;
+        swi_action_t         swi_action;
+        int                  swi_state;
+} swi_workitem_t;
+
 /* server-side state of a RPC */
 typedef struct srpc_server_rpc {
         cfs_list_t           srpc_list;    /* chain on srpc_service::*_rpcq */
@@ -417,7 +378,6 @@ typedef struct {
         sfw_test_client_ops_t  *tsc_cli_ops;      /* ops of test client */
 } sfw_test_case_t;
 
-
 srpc_client_rpc_t *
 sfw_create_rpc(lnet_process_id_t peer, int service, int nbulkiov, int bulklen,
                void (*done) (srpc_client_rpc_t *), void *priv);
@@ -453,13 +413,45 @@ void srpc_service_remove_buffers(srpc_service_t *sv, int nbuffer);
 void srpc_get_counters(srpc_counters_t *cnt);
 void srpc_set_counters(const srpc_counters_t *cnt);
 
-void swi_kill_workitem(swi_workitem_t *wi);
-void swi_schedule_workitem(swi_workitem_t *wi);
-void swi_schedule_serial_workitem(swi_workitem_t *wi);
-int swi_startup(void);
+static inline int
+swi_wi_action(cfs_workitem_t *wi)
+{
+        swi_workitem_t *swi = container_of(wi, swi_workitem_t, swi_workitem);
+
+        return swi->swi_action(swi);
+}
+
+static inline void
+swi_init_workitem (swi_workitem_t *swi, void *data,
+                   swi_action_t action, short sched_id)
+{
+        swi->swi_action = action;
+        swi->swi_state  = SWI_STATE_NEWBORN;
+        cfs_wi_init(&swi->swi_workitem, data, swi_wi_action, sched_id);
+}
+
+static inline void
+swi_schedule_workitem(swi_workitem_t *wi)
+{
+        cfs_wi_schedule(&wi->swi_workitem);
+}
+
+static inline void
+swi_kill_workitem(swi_workitem_t *swi)
+{
+        cfs_wi_exit(&swi->swi_workitem);
+}
+
+#ifndef __KERNEL__
+static inline int
+swi_check_events(void)
+{
+        return cfs_wi_check_events();
+}
+#endif
+
 int sfw_startup(void);
 int srpc_startup(void);
-void swi_shutdown(void);
 void sfw_shutdown(void);
 void srpc_shutdown(void);
 
@@ -494,7 +486,8 @@ srpc_init_client_rpc (srpc_client_rpc_t *rpc, lnet_process_id_t peer,
                                 crpc_bulk.bk_iovs[nbulkiov]));
 
         CFS_INIT_LIST_HEAD(&rpc->crpc_list);
-        swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc);
+        swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc,
+                          CFS_WI_SCHED_ANY);
         cfs_spin_lock_init(&rpc->crpc_lock);
         cfs_atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */
 
@@ -547,7 +540,6 @@ int stt_poll_interval(void);
 int sfw_session_removed(void);
 
 int stt_check_events(void);
-int swi_check_events(void);
 int srpc_check_event(int timeout);
 
 int lnet_selftest_init(void);
diff --git a/lnet/selftest/workitem.c b/lnet/selftest/workitem.c
deleted file mode 100644 (file)
index 762b8c4..0000000
+++ /dev/null
@@ -1,376 +0,0 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
- * GPL HEADER START
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 only,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * General Public License version 2 for more details (a copy is included
- * in the LICENSE file that accompanied this code).
- *
- * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
- *
- * GPL HEADER END
- */
-/*
- * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
- * Use is subject to license terms.
- */
-/*
- * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
- *
- * lnet/selftest/workitem.c
- *
- * Author: Isaac Huang <isaac@clusterfs.com>
- */
-#define DEBUG_SUBSYSTEM S_LNET
-
-#include "selftest.h"
-
-
-struct smoketest_workitem {
-        cfs_list_t       wi_runq;         /* concurrent workitems */
-        cfs_list_t       wi_serial_runq;  /* serialised workitems */
-        cfs_waitq_t      wi_waitq;        /* where schedulers sleep */
-        cfs_waitq_t      wi_serial_waitq; /* where serial scheduler sleep */
-        cfs_spinlock_t   wi_lock;         /* serialize */
-        int              wi_shuttingdown;
-        int              wi_nthreads;
-} swi_data;
-
-static inline int
-swi_sched_cansleep (cfs_list_t *q)
-{
-        int rc;
-
-        cfs_spin_lock(&swi_data.wi_lock);
-
-        rc = !swi_data.wi_shuttingdown && cfs_list_empty(q);
-
-        cfs_spin_unlock(&swi_data.wi_lock);
-        return rc;
-}
-
-/* XXX: 
- * 0. it only works when called from wi->wi_action.
- * 1. when it returns no one shall try to schedule the workitem.
- */
-void
-swi_kill_workitem (swi_workitem_t *wi)
-{
-        LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
-        LASSERT (!swi_data.wi_shuttingdown);
-
-        cfs_spin_lock(&swi_data.wi_lock);
-
-#ifdef __KERNEL__
-        LASSERT (wi->wi_running);
-#endif
-
-        if (wi->wi_scheduled) { /* cancel pending schedules */
-                LASSERT (!cfs_list_empty(&wi->wi_list));
-                cfs_list_del_init(&wi->wi_list);
-        }
-
-        LASSERT (cfs_list_empty(&wi->wi_list));
-        wi->wi_scheduled = 1; /* LBUG future schedule attempts */
-
-        cfs_spin_unlock(&swi_data.wi_lock);
-        return;
-}
-
-void
-swi_schedule_workitem (swi_workitem_t *wi)
-{
-        LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
-        LASSERT (!swi_data.wi_shuttingdown);
-
-        cfs_spin_lock(&swi_data.wi_lock);
-
-        if (!wi->wi_scheduled) {
-                LASSERT (cfs_list_empty(&wi->wi_list));
-
-                wi->wi_scheduled = 1;
-                cfs_list_add_tail(&wi->wi_list, &swi_data.wi_runq);
-                cfs_waitq_signal(&swi_data.wi_waitq);
-        }
-
-        LASSERT (!cfs_list_empty(&wi->wi_list));
-        cfs_spin_unlock(&swi_data.wi_lock);
-        return;
-}
-
-/*
- * Workitem scheduled by this function is strictly serialised not only with
- * itself, but also with others scheduled this way.
- *
- * Now there's only one static serialised queue, but in the future more might
- * be added, and even dynamic creation of serialised queues might be supported.
- */
-void
-swi_schedule_serial_workitem (swi_workitem_t *wi)
-{
-        LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
-        LASSERT (!swi_data.wi_shuttingdown);
-
-        cfs_spin_lock(&swi_data.wi_lock);
-
-        if (!wi->wi_scheduled) {
-                LASSERT (cfs_list_empty(&wi->wi_list));
-
-                wi->wi_scheduled = 1;
-                cfs_list_add_tail(&wi->wi_list, &swi_data.wi_serial_runq);
-                cfs_waitq_signal(&swi_data.wi_serial_waitq);
-        }
-
-        LASSERT (!cfs_list_empty(&wi->wi_list));
-        cfs_spin_unlock(&swi_data.wi_lock);
-        return;
-}
-
-#ifdef __KERNEL__
-
-int
-swi_scheduler_main (void *arg)
-{
-        int  id = (int)(long_ptr_t) arg;
-        char name[16];
-
-        snprintf(name, sizeof(name), "swi_sd%03d", id);
-        cfs_daemonize(name);
-        cfs_block_allsigs();
-
-        cfs_spin_lock(&swi_data.wi_lock);
-
-        while (!swi_data.wi_shuttingdown) {
-                int             nloops = 0;
-                int             rc;
-                swi_workitem_t *wi;
-
-                while (!cfs_list_empty(&swi_data.wi_runq) &&
-                       nloops < SWI_RESCHED) {
-                        wi = cfs_list_entry(swi_data.wi_runq.next,
-                                            swi_workitem_t, wi_list);
-                        cfs_list_del_init(&wi->wi_list);
-
-                        LASSERT (wi->wi_scheduled);
-
-                        nloops++;
-                        if (wi->wi_running) {
-                                cfs_list_add_tail(&wi->wi_list,
-                                                  &swi_data.wi_runq);
-                                continue;
-                        }
-
-                        wi->wi_running   = 1;
-                        wi->wi_scheduled = 0;
-                        cfs_spin_unlock(&swi_data.wi_lock);
-
-                        rc = (*wi->wi_action) (wi);
-
-                        cfs_spin_lock(&swi_data.wi_lock);
-                        if (rc == 0) /* wi still active */
-                                wi->wi_running = 0;
-                }
-
-                cfs_spin_unlock(&swi_data.wi_lock);
-
-                if (nloops < SWI_RESCHED)
-                        cfs_wait_event_interruptible_exclusive(
-                                swi_data.wi_waitq,
-                                !swi_sched_cansleep(&swi_data.wi_runq), rc);
-                else
-                        cfs_cond_resched();
-
-                cfs_spin_lock(&swi_data.wi_lock);
-        }
-
-        swi_data.wi_nthreads--;
-        cfs_spin_unlock(&swi_data.wi_lock);
-        return 0;
-}
-
-int
-swi_serial_scheduler_main (void *arg)
-{
-        UNUSED (arg);
-
-        cfs_daemonize("swi_serial_sd");
-        cfs_block_allsigs();
-
-        cfs_spin_lock(&swi_data.wi_lock);
-
-        while (!swi_data.wi_shuttingdown) {
-                int             nloops = 0;
-                int             rc;
-                swi_workitem_t *wi;
-
-                while (!cfs_list_empty(&swi_data.wi_serial_runq) &&
-                       nloops < SWI_RESCHED) {
-                        wi = cfs_list_entry(swi_data.wi_serial_runq.next,
-                                            swi_workitem_t, wi_list);
-                        cfs_list_del_init(&wi->wi_list);
-
-                        LASSERTF (!wi->wi_running && wi->wi_scheduled,
-                                  "wi %p running %d scheduled %d\n",
-                                  wi, wi->wi_running, wi->wi_scheduled);
-
-                        nloops++;
-                        wi->wi_running   = 1;
-                        wi->wi_scheduled = 0;
-                        cfs_spin_unlock(&swi_data.wi_lock);
-
-                        rc = (*wi->wi_action) (wi);
-
-                        cfs_spin_lock(&swi_data.wi_lock);
-                        if (rc == 0) /* wi still active */
-                                wi->wi_running = 0;
-                }
-
-                cfs_spin_unlock(&swi_data.wi_lock);
-
-                if (nloops < SWI_RESCHED)
-                        cfs_wait_event_interruptible_exclusive(
-                                swi_data.wi_serial_waitq,
-                                !swi_sched_cansleep(&swi_data.wi_serial_runq),
-                                rc);
-                else
-                        cfs_cond_resched();
-
-                cfs_spin_lock(&swi_data.wi_lock);
-        }
-
-        swi_data.wi_nthreads--;
-        cfs_spin_unlock(&swi_data.wi_lock);
-        return 0;
-}
-
-int
-swi_start_thread (int (*func) (void*), void *arg)
-{
-        long pid;
-
-        LASSERT (!swi_data.wi_shuttingdown);
-
-        pid = cfs_kernel_thread(func, arg, 0);
-        if (pid < 0)
-                return (int)pid;
-
-        cfs_spin_lock(&swi_data.wi_lock);
-        swi_data.wi_nthreads++;
-        cfs_spin_unlock(&swi_data.wi_lock);
-        return 0;
-}
-
-#else /* __KERNEL__ */
-
-int
-swi_check_events (void)
-{
-        int               n = 0;
-        swi_workitem_t   *wi;
-        cfs_list_t       *q;
-
-        cfs_spin_lock(&swi_data.wi_lock);
-
-        for (;;) {
-                if (!cfs_list_empty(&swi_data.wi_serial_runq))
-                        q = &swi_data.wi_serial_runq;
-                else if (!cfs_list_empty(&swi_data.wi_runq))
-                        q = &swi_data.wi_runq;
-                else
-                        break;
-
-                wi = cfs_list_entry(q->next, swi_workitem_t, wi_list);
-                cfs_list_del_init(&wi->wi_list);
-
-                LASSERT (wi->wi_scheduled);
-                wi->wi_scheduled = 0;
-                cfs_spin_unlock(&swi_data.wi_lock);
-
-                n++;
-                (*wi->wi_action) (wi);
-
-                cfs_spin_lock(&swi_data.wi_lock);
-        }
-
-        cfs_spin_unlock(&swi_data.wi_lock);
-        return n;
-}
-
-#endif
-
-int
-swi_startup (void)
-{
-        int i;
-        int rc;
-
-        swi_data.wi_nthreads = 0;
-        swi_data.wi_shuttingdown = 0;
-        cfs_spin_lock_init(&swi_data.wi_lock);
-        cfs_waitq_init(&swi_data.wi_waitq);
-        cfs_waitq_init(&swi_data.wi_serial_waitq);
-        CFS_INIT_LIST_HEAD(&swi_data.wi_runq);
-        CFS_INIT_LIST_HEAD(&swi_data.wi_serial_runq);
-
-#ifdef __KERNEL__
-        rc = swi_start_thread(swi_serial_scheduler_main, NULL);
-        if (rc != 0) {
-                LASSERT (swi_data.wi_nthreads == 0);
-                CERROR ("Can't spawn serial workitem scheduler: %d\n", rc);
-                return rc;
-        }
-
-        for (i = 0; i < cfs_num_online_cpus(); i++) {
-                rc = swi_start_thread(swi_scheduler_main,
-                                      (void *) (long_ptr_t) i);
-                if (rc != 0) {
-                        CERROR ("Can't spawn workitem scheduler: %d\n", rc);
-                        swi_shutdown();
-                        return rc;
-                }
-        }
-#else
-        UNUSED(i);
-        UNUSED(rc);
-#endif
-
-        return 0;
-}
-
-void
-swi_shutdown (void)
-{
-        cfs_spin_lock(&swi_data.wi_lock);
-
-        LASSERT (cfs_list_empty(&swi_data.wi_runq));
-        LASSERT (cfs_list_empty(&swi_data.wi_serial_runq));
-
-        swi_data.wi_shuttingdown = 1;
-
-#ifdef __KERNEL__
-        cfs_waitq_broadcast(&swi_data.wi_waitq);
-        cfs_waitq_broadcast(&swi_data.wi_serial_waitq);
-        lst_wait_until(swi_data.wi_nthreads == 0, swi_data.wi_lock,
-                       "waiting for %d threads to terminate\n",
-                       swi_data.wi_nthreads);
-#endif
-
-        cfs_spin_unlock(&swi_data.wi_lock);
-        return;
-}
index 1d72000..900164f 100644 (file)
@@ -202,9 +202,17 @@ main(int argc, char **argv)
                 return -1;
         }
 
+        rc = cfs_wi_startup();
+        if (rc != 0) {
+                CERROR("cfs_wi_startup() failed: %d\n", rc);
+                libcfs_debug_cleanup();
+                return -1;
+        }
+
         rc = LNetInit();
         if (rc != 0) {
                 CERROR("LNetInit() failed: %d\n", rc);
+                cfs_wi_shutdown();
                 libcfs_debug_cleanup();
                 return -1;
         }
@@ -216,8 +224,8 @@ main(int argc, char **argv)
         if (rc != 0) {
                 fprintf(stderr, "Can't startup selftest\n");
                 LNetFini();
+                cfs_wi_shutdown();
                 libcfs_debug_cleanup();
-
                 return -1;
         }
 
@@ -244,6 +252,8 @@ out:
 
         LNetFini();
 
+        cfs_wi_shutdown();
+
         libcfs_debug_cleanup();
 
         return rc;