Whamcloud - gitweb
LU-1346 libcfs: cleanup macros in portals_compat25.h
[fs/lustre-release.git] / libcfs / libcfs / workitem.c
index 66fc46a..b56d266 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -29,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 #include <libcfs/libcfs.h>
 
+#define CFS_WS_NAME_LEN         16
+
 typedef struct cfs_wi_sched {
+       cfs_list_t              ws_list;        /* chain on global list */
 #ifdef __KERNEL__
-        /** serialised workitems */
-        cfs_spinlock_t  ws_lock;
-        /** where schedulers sleep */
-        cfs_waitq_t     ws_waitq;
+       /** serialised workitems */
+       spinlock_t              ws_lock;
+       /** where schedulers sleep */
+       cfs_waitq_t             ws_waitq;
 #endif
-        /** concurrent workitems */
-        cfs_list_t      ws_runq;
-        /** rescheduled running-workitems */
-        cfs_list_t      ws_rerunq;
-        /** shutting down */
-        int             ws_shuttingdown;
+       /** concurrent workitems */
+       cfs_list_t              ws_runq;
+       /** rescheduled running-workitems, a workitem can be rescheduled
+        * while running in wi_action(), but we don't to execute it again
+        * unless it returns from wi_action(), so we put it on ws_rerunq
+        * while rescheduling, and move it to runq after it returns
+        * from wi_action() */
+       cfs_list_t              ws_rerunq;
+       /** CPT-table for this scheduler */
+       struct cfs_cpt_table    *ws_cptab;
+       /** CPT id for affinity */
+       int                     ws_cpt;
+       /** number of scheduled workitems */
+       int                     ws_nscheduled;
+       /** started scheduler thread, protected by cfs_wi_data::wi_glock */
+       unsigned int            ws_nthreads:30;
+       /** shutting down, protected by cfs_wi_data::wi_glock */
+       unsigned int            ws_stopping:1;
+       /** serialize starting thread, protected by cfs_wi_data::wi_glock */
+       unsigned int            ws_starting:1;
+       /** scheduler name */
+       char                    ws_name[CFS_WS_NAME_LEN];
 } cfs_wi_sched_t;
 
-#ifdef __KERNEL__
-/**
- * we have 2 cfs_wi_sched_t so far:
- * one for CFS_WI_SCHED_ANY, another for CFS_WI_SCHED_SERIAL
- * per-cpu implementation will be added for SMP scalability
- */
-
-#define CFS_WI_NSCHED   2
-#else
-/** always 2 for userspace */
-#define CFS_WI_NSCHED   2
-#endif /* __KERNEL__ */
-
 struct cfs_workitem_data {
-        /** serialize */
-        cfs_spinlock_t  wi_glock;
-        /** number of cfs_wi_sched_t */
-        int             wi_nsched;
-        /** number of threads (all schedulers) */
-        int             wi_nthreads;
-        /** default scheduler */
-        cfs_wi_sched_t *wi_scheds;
+       /** serialize */
+       spinlock_t              wi_glock;
+       /** list of all schedulers */
+       cfs_list_t              wi_scheds;
+       /** WI module is initialized */
+       int                     wi_init;
+       /** shutting down the whole WI module */
+       int                     wi_stopping;
 } cfs_wi_data;
 
-static inline cfs_wi_sched_t *
-cfs_wi_to_sched(cfs_workitem_t *wi)
-{
-        LASSERT(wi->wi_sched_id == CFS_WI_SCHED_ANY ||
-                wi->wi_sched_id == CFS_WI_SCHED_SERIAL ||
-                (wi->wi_sched_id >= 0 &&
-                 wi->wi_sched_id < cfs_wi_data.wi_nsched));
-
-        if (wi->wi_sched_id == CFS_WI_SCHED_ANY)
-                return &cfs_wi_data.wi_scheds[0];
-        if (wi->wi_sched_id == CFS_WI_SCHED_SERIAL)
-                return &cfs_wi_data.wi_scheds[cfs_wi_data.wi_nsched - 1];
-
-        return &cfs_wi_data.wi_scheds[wi->wi_sched_id];
-}
-
 #ifdef __KERNEL__
 static inline void
 cfs_wi_sched_lock(cfs_wi_sched_t *sched)
 {
-        cfs_spin_lock(&sched->ws_lock);
+       spin_lock(&sched->ws_lock);
 }
 
 static inline void
 cfs_wi_sched_unlock(cfs_wi_sched_t *sched)
 {
-        cfs_spin_unlock(&sched->ws_lock);
+       spin_unlock(&sched->ws_lock);
 }
 
 static inline int
 cfs_wi_sched_cansleep(cfs_wi_sched_t *sched)
 {
-        cfs_wi_sched_lock(sched);
-        if (sched->ws_shuttingdown) {
+       cfs_wi_sched_lock(sched);
+       if (sched->ws_stopping) {
                 cfs_wi_sched_unlock(sched);
                 return 0;
         }
@@ -130,75 +118,81 @@ cfs_wi_sched_cansleep(cfs_wi_sched_t *sched)
         return 1;
 }
 
-#else
+#else /* !__KERNEL__ */
 
 static inline void
 cfs_wi_sched_lock(cfs_wi_sched_t *sched)
 {
-        cfs_spin_lock(&cfs_wi_data.wi_glock);
+       spin_lock(&cfs_wi_data.wi_glock);
 }
 
 static inline void
 cfs_wi_sched_unlock(cfs_wi_sched_t *sched)
 {
-        cfs_spin_unlock(&cfs_wi_data.wi_glock);
+       spin_unlock(&cfs_wi_data.wi_glock);
 }
 
-#endif
+#endif /* __KERNEL__ */
 
 /* XXX:
  * 0. it only works when called from wi->wi_action.
  * 1. when it returns no one shall try to schedule the workitem.
  */
 void
-cfs_wi_exit(cfs_workitem_t *wi)
+cfs_wi_exit(struct cfs_wi_sched *sched, cfs_workitem_t *wi)
 {
-        cfs_wi_sched_t *sched = cfs_wi_to_sched(wi);
-
-        LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
-        LASSERT (!sched->ws_shuttingdown);
+       LASSERT(!cfs_in_interrupt()); /* because we use plain spinlock */
+       LASSERT(!sched->ws_stopping);
 
-        cfs_wi_sched_lock(sched);
+       cfs_wi_sched_lock(sched);
 
 #ifdef __KERNEL__
-        LASSERT (wi->wi_running);
+       LASSERT(wi->wi_running);
 #endif
-        if (wi->wi_scheduled) { /* cancel pending schedules */
-                LASSERT (!cfs_list_empty(&wi->wi_list));
-                cfs_list_del_init(&wi->wi_list);
-        }
+       if (wi->wi_scheduled) { /* cancel pending schedules */
+               LASSERT(!cfs_list_empty(&wi->wi_list));
+               cfs_list_del_init(&wi->wi_list);
 
-        LASSERT (cfs_list_empty(&wi->wi_list));
-        wi->wi_scheduled = 1; /* LBUG future schedule attempts */
+               LASSERT(sched->ws_nscheduled > 0);
+               sched->ws_nscheduled--;
+       }
 
-        cfs_wi_sched_unlock(sched);
-        return;
+       LASSERT(cfs_list_empty(&wi->wi_list));
+
+       wi->wi_scheduled = 1; /* LBUG future schedule attempts */
+       cfs_wi_sched_unlock(sched);
+
+       return;
 }
-CFS_EXPORT_SYMBOL(cfs_wi_exit);
+EXPORT_SYMBOL(cfs_wi_exit);
 
 /**
- * cancel a workitem:
+ * cancel schedule request of workitem \a wi
  */
 int
-cfs_wi_cancel (cfs_workitem_t *wi)
+cfs_wi_deschedule(struct cfs_wi_sched *sched, cfs_workitem_t *wi)
 {
-        cfs_wi_sched_t *sched = cfs_wi_to_sched(wi);
-        int             rc;
+       int     rc;
 
-        LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
-        LASSERT (!sched->ws_shuttingdown);
+       LASSERT(!cfs_in_interrupt()); /* because we use plain spinlock */
+       LASSERT(!sched->ws_stopping);
 
-        cfs_wi_sched_lock(sched);
         /*
          * return 0 if it's running already, otherwise return 1, which
          * means the workitem will not be scheduled and will not have
          * any race with wi_action.
          */
-        rc = !(wi->wi_running);
+       cfs_wi_sched_lock(sched);
+
+       rc = !(wi->wi_running);
+
+       if (wi->wi_scheduled) { /* cancel pending schedules */
+               LASSERT(!cfs_list_empty(&wi->wi_list));
+               cfs_list_del_init(&wi->wi_list);
+
+               LASSERT(sched->ws_nscheduled > 0);
+               sched->ws_nscheduled--;
 
-        if (wi->wi_scheduled) { /* cancel pending schedules */
-                LASSERT (!cfs_list_empty(&wi->wi_list));
-                cfs_list_del_init(&wi->wi_list);
                 wi->wi_scheduled = 0;
         }
 
@@ -207,8 +201,7 @@ cfs_wi_cancel (cfs_workitem_t *wi)
         cfs_wi_sched_unlock(sched);
         return rc;
 }
-
-CFS_EXPORT_SYMBOL(cfs_wi_cancel);
+EXPORT_SYMBOL(cfs_wi_deschedule);
 
 /*
  * Workitem scheduled with (serial == 1) is strictly serialised not only with
@@ -218,12 +211,10 @@ CFS_EXPORT_SYMBOL(cfs_wi_cancel);
  * be added, and even dynamic creation of serialised queues might be supported.
  */
 void
-cfs_wi_schedule(cfs_workitem_t *wi)
+cfs_wi_schedule(struct cfs_wi_sched *sched, cfs_workitem_t *wi)
 {
-        cfs_wi_sched_t *sched = cfs_wi_to_sched(wi);
-
-        LASSERT (!cfs_in_interrupt()); /* because we use plain spinlock */
-        LASSERT (!sched->ws_shuttingdown);
+       LASSERT(!cfs_in_interrupt()); /* because we use plain spinlock */
+       LASSERT(!sched->ws_stopping);
 
         cfs_wi_sched_lock(sched);
 
@@ -231,6 +222,7 @@ cfs_wi_schedule(cfs_workitem_t *wi)
                 LASSERT (cfs_list_empty(&wi->wi_list));
 
                 wi->wi_scheduled = 1;
+               sched->ws_nscheduled++;
                 if (!wi->wi_running) {
                         cfs_list_add_tail(&wi->wi_list, &sched->ws_runq);
 #ifdef __KERNEL__
@@ -245,34 +237,34 @@ cfs_wi_schedule(cfs_workitem_t *wi)
         cfs_wi_sched_unlock(sched);
         return;
 }
-
-CFS_EXPORT_SYMBOL(cfs_wi_schedule);
+EXPORT_SYMBOL(cfs_wi_schedule);
 
 #ifdef __KERNEL__
 
 static int
 cfs_wi_scheduler (void *arg)
 {
-        int             id     = (int)(long_ptr_t) arg;
-        int             serial = (id == -1);
-        char            name[24];
-        cfs_wi_sched_t *sched;
-
-        if (serial) {
-                sched = &cfs_wi_data.wi_scheds[cfs_wi_data.wi_nsched - 1];
-                cfs_daemonize("wi_serial_sd");
-        } else {
-                /* will be sched = &cfs_wi_data.wi_scheds[id] in the future */
-                sched = &cfs_wi_data.wi_scheds[0];
-                snprintf(name, sizeof(name), "cfs_wi_sd%03d", id);
-                cfs_daemonize(name);
-        }
+       struct cfs_wi_sched     *sched = (cfs_wi_sched_t *)arg;
 
-        cfs_block_allsigs();
+       cfs_block_allsigs();
 
-        cfs_wi_sched_lock(sched);
+       /* CPT affinity scheduler? */
+       if (sched->ws_cptab != NULL)
+               if (cfs_cpt_bind(sched->ws_cptab, sched->ws_cpt) != 0)
+                       CWARN("Failed to bind %s on CPT %d\n",
+                               sched->ws_name, sched->ws_cpt);
+
+       spin_lock(&cfs_wi_data.wi_glock);
+
+       LASSERT(sched->ws_starting == 1);
+       sched->ws_starting--;
+       sched->ws_nthreads++;
+
+       spin_unlock(&cfs_wi_data.wi_glock);
 
-        while (!sched->ws_shuttingdown) {
+       cfs_wi_sched_lock(sched);
+
+       while (!sched->ws_stopping) {
                 int             nloops = 0;
                 int             rc;
                 cfs_workitem_t *wi;
@@ -281,12 +273,17 @@ cfs_wi_scheduler (void *arg)
                        nloops < CFS_WI_RESCHED) {
                         wi = cfs_list_entry(sched->ws_runq.next,
                                             cfs_workitem_t, wi_list);
-                        LASSERT (wi->wi_scheduled && !wi->wi_running);
+                       LASSERT(wi->wi_scheduled && !wi->wi_running);
+
+                       cfs_list_del_init(&wi->wi_list);
 
-                        cfs_list_del_init(&wi->wi_list);
+                       LASSERT(sched->ws_nscheduled > 0);
+                       sched->ws_nscheduled--;
 
                         wi->wi_running   = 1;
                         wi->wi_scheduled = 0;
+
+
                         cfs_wi_sched_unlock(sched);
                         nloops++;
 
@@ -300,7 +297,7 @@ cfs_wi_scheduler (void *arg)
                         if (cfs_list_empty(&wi->wi_list))
                                 continue;
 
-                        LASSERT (wi->wi_scheduled);
+                       LASSERT(wi->wi_scheduled);
                         /* wi is rescheduled, should be on rerunq now, we
                          * move it to runq so it can run action now */
                         cfs_list_move_tail(&wi->wi_list, &sched->ws_runq);
@@ -315,33 +312,19 @@ cfs_wi_scheduler (void *arg)
                         continue;
                 }
 
-                cfs_wi_sched_unlock(sched);
-                cfs_wait_event_interruptible_exclusive(sched->ws_waitq,
-                                !cfs_wi_sched_cansleep(sched), rc);
-                cfs_wi_sched_lock(sched);
+               cfs_wi_sched_unlock(sched);
+               rc = wait_event_interruptible_exclusive(sched->ws_waitq,
+                               !cfs_wi_sched_cansleep(sched));
+               cfs_wi_sched_lock(sched);
         }
 
         cfs_wi_sched_unlock(sched);
 
-        cfs_spin_lock(&cfs_wi_data.wi_glock);
-        cfs_wi_data.wi_nthreads--;
-        cfs_spin_unlock(&cfs_wi_data.wi_glock);
-        return 0;
-}
-
-static int
-cfs_wi_start_thread (int (*func) (void*), void *arg)
-{
-        long pid;
-
-        pid = cfs_create_thread(func, arg, 0);
-        if (pid < 0)
-                return (int)pid;
+       spin_lock(&cfs_wi_data.wi_glock);
+       sched->ws_nthreads--;
+       spin_unlock(&cfs_wi_data.wi_glock);
 
-        cfs_spin_lock(&cfs_wi_data.wi_glock);
-        cfs_wi_data.wi_nthreads++;
-        cfs_spin_unlock(&cfs_wi_data.wi_glock);
-        return 0;
+       return 0;
 }
 
 #else /* __KERNEL__ */
@@ -349,132 +332,224 @@ cfs_wi_start_thread (int (*func) (void*), void *arg)
 int
 cfs_wi_check_events (void)
 {
-        int               n = 0;
-        cfs_workitem_t   *wi;
-        cfs_list_t       *q;
+       int               n = 0;
+       cfs_workitem_t   *wi;
+
+       spin_lock(&cfs_wi_data.wi_glock);
 
-        cfs_spin_lock(&cfs_wi_data.wi_glock);
+       for (;;) {
+               struct cfs_wi_sched     *sched = NULL;
+               struct cfs_wi_sched     *tmp;
 
-        for (;;) {
                 /** rerunq is always empty for userspace */
-                if (!cfs_list_empty(&cfs_wi_data.wi_scheds[1].ws_runq))
-                        q = &cfs_wi_data.wi_scheds[1].ws_runq;
-                else if (!cfs_list_empty(&cfs_wi_data.wi_scheds[0].ws_runq))
-                        q = &cfs_wi_data.wi_scheds[0].ws_runq;
-                else
-                        break;
+               cfs_list_for_each_entry(tmp,
+                                       &cfs_wi_data.wi_scheds, ws_list) {
+                       if (!cfs_list_empty(&tmp->ws_runq)) {
+                               sched = tmp;
+                               break;
+                       }
+               }
 
-                wi = cfs_list_entry(q->next, cfs_workitem_t, wi_list);
-                cfs_list_del_init(&wi->wi_list);
+               if (sched == NULL)
+                       break;
 
-                LASSERT (wi->wi_scheduled);
-                wi->wi_scheduled = 0;
-                cfs_spin_unlock(&cfs_wi_data.wi_glock);
+               wi = cfs_list_entry(sched->ws_runq.next,
+                                   cfs_workitem_t, wi_list);
+               cfs_list_del_init(&wi->wi_list);
 
-                n++;
-                (*wi->wi_action) (wi);
+               LASSERT(sched->ws_nscheduled > 0);
+               sched->ws_nscheduled--;
 
-                cfs_spin_lock(&cfs_wi_data.wi_glock);
-        }
+               LASSERT(wi->wi_scheduled);
+               wi->wi_scheduled = 0;
+               spin_unlock(&cfs_wi_data.wi_glock);
+
+               n++;
+               (*wi->wi_action) (wi);
 
-        cfs_spin_unlock(&cfs_wi_data.wi_glock);
-        return n;
+               spin_lock(&cfs_wi_data.wi_glock);
+       }
+
+       spin_unlock(&cfs_wi_data.wi_glock);
+       return n;
 }
 
 #endif
 
-static void
-cfs_wi_sched_init(cfs_wi_sched_t *sched)
+void
+cfs_wi_sched_destroy(struct cfs_wi_sched *sched)
 {
-        sched->ws_shuttingdown = 0;
-#ifdef __KERNEL__
-        cfs_spin_lock_init(&sched->ws_lock);
-        cfs_waitq_init(&sched->ws_waitq);
-#endif
-        CFS_INIT_LIST_HEAD(&sched->ws_runq);
-        CFS_INIT_LIST_HEAD(&sched->ws_rerunq);
-}
+       LASSERT(cfs_wi_data.wi_init);
+       LASSERT(!cfs_wi_data.wi_stopping);
 
-static void
-cfs_wi_sched_shutdown(cfs_wi_sched_t *sched)
-{
-        cfs_wi_sched_lock(sched);
+       spin_lock(&cfs_wi_data.wi_glock);
+       if (sched->ws_stopping) {
+               CDEBUG(D_INFO, "%s is in progress of stopping\n",
+                      sched->ws_name);
+               spin_unlock(&cfs_wi_data.wi_glock);
+               return;
+       }
 
-        LASSERT(cfs_list_empty(&sched->ws_runq));
-        LASSERT(cfs_list_empty(&sched->ws_rerunq));
+       LASSERT(!cfs_list_empty(&sched->ws_list));
+       sched->ws_stopping = 1;
 
-        sched->ws_shuttingdown = 1;
+       spin_unlock(&cfs_wi_data.wi_glock);
 
 #ifdef __KERNEL__
-        cfs_waitq_broadcast(&sched->ws_waitq);
+       cfs_waitq_broadcast(&sched->ws_waitq);
+
+       spin_lock(&cfs_wi_data.wi_glock);
+       {
+               int i = 2;
+
+               while (sched->ws_nthreads > 0) {
+                       CDEBUG(IS_PO2(++i) ? D_WARNING : D_NET,
+                              "waiting for %d threads of WI sched[%s] to "
+                              "terminate\n", sched->ws_nthreads,
+                              sched->ws_name);
+
+                       spin_unlock(&cfs_wi_data.wi_glock);
+                       cfs_pause(cfs_time_seconds(1) / 20);
+                       spin_lock(&cfs_wi_data.wi_glock);
+               }
+       }
+
+       cfs_list_del(&sched->ws_list);
+
+       spin_unlock(&cfs_wi_data.wi_glock);
 #endif
-        cfs_wi_sched_unlock(sched);
-}
+       LASSERT(sched->ws_nscheduled == 0);
 
+       LIBCFS_FREE(sched, sizeof(*sched));
+}
+EXPORT_SYMBOL(cfs_wi_sched_destroy);
 
 int
-cfs_wi_startup (void)
+cfs_wi_sched_create(char *name, struct cfs_cpt_table *cptab,
+                   int cpt, int nthrs, struct cfs_wi_sched **sched_pp)
 {
-        int i;
-        int n, rc;
+       struct cfs_wi_sched     *sched;
 
-        cfs_wi_data.wi_nthreads = 0;
-        cfs_wi_data.wi_nsched   = CFS_WI_NSCHED;
-        LIBCFS_ALLOC(cfs_wi_data.wi_scheds,
-                     cfs_wi_data.wi_nsched * sizeof(cfs_wi_sched_t));
-        if (cfs_wi_data.wi_scheds == NULL)
-                return -ENOMEM;
+       LASSERT(cfs_wi_data.wi_init);
+       LASSERT(!cfs_wi_data.wi_stopping);
+       LASSERT(cptab == NULL || cpt == CFS_CPT_ANY ||
+               (cpt >= 0 && cpt < cfs_cpt_number(cptab)));
 
-        cfs_spin_lock_init(&cfs_wi_data.wi_glock);
-        for (i = 0; i < cfs_wi_data.wi_nsched; i++)
-                cfs_wi_sched_init(&cfs_wi_data.wi_scheds[i]);
+       LIBCFS_ALLOC(sched, sizeof(*sched));
+       if (sched == NULL)
+               return -ENOMEM;
+
+       strncpy(sched->ws_name, name, CFS_WS_NAME_LEN);
+       sched->ws_cptab = cptab;
+       sched->ws_cpt = cpt;
 
 #ifdef __KERNEL__
-        n = cfs_num_online_cpus();
-        for (i = 0; i <= n; i++) {
-                rc = cfs_wi_start_thread(cfs_wi_scheduler,
-                                         (void *)(long_ptr_t)(i == n ? -1 : i));
-                if (rc != 0) {
-                        CERROR ("Can't spawn workitem scheduler: %d\n", rc);
-                        cfs_wi_shutdown();
-                        return rc;
-                }
-        }
-#else
-        SET_BUT_UNUSED(rc);
-        SET_BUT_UNUSED(n);
+       spin_lock_init(&sched->ws_lock);
+       cfs_waitq_init(&sched->ws_waitq);
 #endif
+       CFS_INIT_LIST_HEAD(&sched->ws_runq);
+       CFS_INIT_LIST_HEAD(&sched->ws_rerunq);
+       CFS_INIT_LIST_HEAD(&sched->ws_list);
 
-        return 0;
+#ifdef __KERNEL__
+       for (; nthrs > 0; nthrs--)  {
+               char            name[16];
+               cfs_task_t      *task;
+
+               spin_lock(&cfs_wi_data.wi_glock);
+               while (sched->ws_starting > 0) {
+                       spin_unlock(&cfs_wi_data.wi_glock);
+                       cfs_schedule();
+                       spin_lock(&cfs_wi_data.wi_glock);
+               }
+
+               sched->ws_starting++;
+               spin_unlock(&cfs_wi_data.wi_glock);
+
+               if (sched->ws_cptab != NULL && sched->ws_cpt >= 0) {
+                       snprintf(name, sizeof(name), "%s_%02d_%02d",
+                                sched->ws_name, sched->ws_cpt,
+                                sched->ws_nthreads);
+               } else {
+                       snprintf(name, sizeof(name), "%s_%02d",
+                                sched->ws_name, sched->ws_nthreads);
+               }
+
+               task = kthread_run(cfs_wi_scheduler, sched, name);
+               if (IS_ERR(task)) {
+                       int rc = PTR_ERR(task);
+
+                       CERROR("Failed to create thread for "
+                               "WI scheduler %s: %d\n", name, rc);
+
+                       spin_lock(&cfs_wi_data.wi_glock);
+
+                       /* make up for cfs_wi_sched_destroy */
+                       cfs_list_add(&sched->ws_list, &cfs_wi_data.wi_scheds);
+                       sched->ws_starting--;
+
+                       spin_unlock(&cfs_wi_data.wi_glock);
+
+                       cfs_wi_sched_destroy(sched);
+                       return rc;
+               }
+       }
+#endif
+       spin_lock(&cfs_wi_data.wi_glock);
+       cfs_list_add(&sched->ws_list, &cfs_wi_data.wi_scheds);
+       spin_unlock(&cfs_wi_data.wi_glock);
+
+       *sched_pp = sched;
+       return 0;
 }
+EXPORT_SYMBOL(cfs_wi_sched_create);
 
-void
-cfs_wi_shutdown (void)
+int
+cfs_wi_startup(void)
 {
-        int i;
+       memset(&cfs_wi_data, 0, sizeof(cfs_wi_data));
 
-        if (cfs_wi_data.wi_scheds == NULL)
-                return;
+       spin_lock_init(&cfs_wi_data.wi_glock);
+       CFS_INIT_LIST_HEAD(&cfs_wi_data.wi_scheds);
+       cfs_wi_data.wi_init = 1;
 
-        for (i = 0; i < cfs_wi_data.wi_nsched; i++)
-                cfs_wi_sched_shutdown(&cfs_wi_data.wi_scheds[i]);
+       return 0;
+}
 
-#ifdef __KERNEL__
-        cfs_spin_lock(&cfs_wi_data.wi_glock);
-        i = 2;
-        while (cfs_wi_data.wi_nthreads != 0) {
-                CDEBUG(IS_PO2(++i) ? D_WARNING : D_NET,
-                       "waiting for %d threads to terminate\n",
-                       cfs_wi_data.wi_nthreads);
-                cfs_spin_unlock(&cfs_wi_data.wi_glock);
+void
+cfs_wi_shutdown (void)
+{
+       struct cfs_wi_sched     *sched;
 
-                cfs_pause(cfs_time_seconds(1));
+       spin_lock(&cfs_wi_data.wi_glock);
+       cfs_wi_data.wi_stopping = 1;
+       spin_unlock(&cfs_wi_data.wi_glock);
 
-                cfs_spin_lock(&cfs_wi_data.wi_glock);
-        }
-        cfs_spin_unlock(&cfs_wi_data.wi_glock);
+#ifdef __KERNEL__
+       /* nobody should contend on this list */
+       cfs_list_for_each_entry(sched, &cfs_wi_data.wi_scheds, ws_list) {
+               sched->ws_stopping = 1;
+               cfs_waitq_broadcast(&sched->ws_waitq);
+       }
+
+       cfs_list_for_each_entry(sched, &cfs_wi_data.wi_scheds, ws_list) {
+               spin_lock(&cfs_wi_data.wi_glock);
+
+               while (sched->ws_nthreads != 0) {
+                       spin_unlock(&cfs_wi_data.wi_glock);
+                       cfs_pause(cfs_time_seconds(1) / 20);
+                       spin_lock(&cfs_wi_data.wi_glock);
+               }
+               spin_unlock(&cfs_wi_data.wi_glock);
+       }
 #endif
-        LIBCFS_FREE(cfs_wi_data.wi_scheds,
-                    cfs_wi_data.wi_nsched * sizeof(cfs_wi_sched_t));
-        return;
+       while (!cfs_list_empty(&cfs_wi_data.wi_scheds)) {
+               sched = cfs_list_entry(cfs_wi_data.wi_scheds.next,
+                                      struct cfs_wi_sched, ws_list);
+               cfs_list_del(&sched->ws_list);
+               LIBCFS_FREE(sched, sizeof(*sched));
+       }
+
+       cfs_wi_data.wi_stopping = 0;
+       cfs_wi_data.wi_init = 0;
 }