Whamcloud - gitweb
LU-9859 lnet: convert selftest to use workqueues 91/36991/26
authorMr NeilBrown <neilb@suse.de>
Fri, 17 Jul 2020 23:11:29 +0000 (19:11 -0400)
committerOleg Drokin <green@whamcloud.com>
Fri, 19 May 2023 07:02:54 +0000 (07:02 +0000)
Instead of the cfs workitem library, use workqueues.

As lnet wants to provide a cpu mask of allowed cpus, it
needs to be a WQ_UNBOUND work queue so that tasks can
run on cpus other than where they were submitted.
We use alloc_ordered_workqueue for lst_sched_serial (now called
lst_serial_wq) - "ordered" means the same as "serial" did.
We use cfs_cpt_bind_queue() for the other workqueues which sets up the
CPU mask as required.

An important difference with workqueues is that there is no equivalent
to cfs_wi_exit() which can be called in the action function and which
will ensure the function is not called again - and that the item is no
longer queued.

To provide similar semantics we treat swi_state == SWI_STATE_DONE as
meaning that the wi is complete and any further calls must be no-op.
We also call cancel_work_sync() (via swi_cancel_workitem()) before
freeing or reusing memory that held a work-item.

To ensure the same exclusion that cfs_wi_exit() provided the state is
set and tested under a lock - either crpc_lock, scd_lock, or tsi_lock
depending on which structure the wi is embedded in.

Another minor difference is that with workqueues the action function
returns void, not an int.

Also change SWI_STATE_* from #define to an enum.  The only place these
values are ever stored is in one field in a struct.

Linux-commit: 6106c0f82481e686b337ee0c403821fb5c3c17ef
Linux-commit: 3fc0b7d3e0a4d37e4c60c2232df4500187a07232
Linux-commit: 7d70718de014ada7280bb011db8655e18ed935b1

Test-Parameters: trivial testlist=lnet-selftest
Change-Id: I5ccf1399ebbfdd4cab3696749bd1ec666147b757
Signed-off-by: Mr NeilBrown <neilb@suse.de>
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/36991
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Serguei Smirnov <ssmirnov@whamcloud.com>
Reviewed-by: Chris Horn <chris.horn@hpe.com>
Reviewed-by: Frank Sehr <fsehr@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/selftest/framework.c
lnet/selftest/module.c
lnet/selftest/rpc.c
lnet/selftest/selftest.h

index 4b65968..919cd05 100644 (file)
@@ -552,6 +552,7 @@ sfw_test_rpc_fini(struct srpc_client_rpc *rpc)
 
        /* Called with hold of tsi->tsi_lock */
        LASSERT(list_empty(&rpc->crpc_list));
+       rpc->crpc_wi.swi_state = SWI_STATE_DONE;
        list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
 }
 
@@ -653,6 +654,7 @@ sfw_destroy_test_instance(struct sfw_test_instance *tsi)
                rpc = list_entry(tsi->tsi_free_rpcs.next,
                                 struct srpc_client_rpc, crpc_list);
                list_del(&rpc->crpc_list);
+               swi_cancel_workitem(&rpc->crpc_wi);
                LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
        }
 
@@ -941,6 +943,7 @@ sfw_create_test_rpc(struct sfw_test_unit *tsu, struct lnet_process_id peer,
                                             blklen, sfw_test_rpc_done,
                                             sfw_test_rpc_fini, tsu);
        } else {
+               swi_cancel_workitem(&rpc->crpc_wi);
                srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
                                     blklen, sfw_test_rpc_done,
                                     sfw_test_rpc_fini, tsu);
@@ -957,25 +960,29 @@ sfw_create_test_rpc(struct sfw_test_unit *tsu, struct lnet_process_id peer,
        return 0;
 }
 
-static int
+static void
 sfw_run_test(struct swi_workitem *wi)
 {
        struct sfw_test_unit *tsu = container_of(wi, struct sfw_test_unit, tsu_worker);
        struct sfw_test_instance *tsi = tsu->tsu_instance;
        struct srpc_client_rpc *rpc = NULL;
 
-        LASSERT (wi == &tsu->tsu_worker);
-
-        if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
-                LASSERT (rpc == NULL);
-                goto test_done;
-        }
+       if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
+               LASSERT(rpc == NULL);
+               wi->swi_state = SWI_STATE_DONE;
+               goto test_done;
+       }
 
-        LASSERT (rpc != NULL);
+       LASSERT(rpc != NULL);
 
        spin_lock(&tsi->tsi_lock);
+       if (wi->swi_state == SWI_STATE_DONE) {
+               spin_unlock(&tsi->tsi_lock);
+               return;
+       }
 
        if (tsi->tsi_stopping) {
+               wi->swi_state = SWI_STATE_DONE;
                list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
                spin_unlock(&tsi->tsi_lock);
                goto test_done;
@@ -985,25 +992,24 @@ sfw_run_test(struct swi_workitem *wi)
                tsu->tsu_loop--;
 
        list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
+       wi->swi_state = SWI_STATE_RUNNING;
        spin_unlock(&tsi->tsi_lock);
 
        spin_lock(&rpc->crpc_lock);
        rpc->crpc_timeout = rpc_timeout;
        srpc_post_rpc(rpc);
        spin_unlock(&rpc->crpc_lock);
-       return 0;
+       return;
 
 test_done:
-        /*
-         * No one can schedule me now since:
-         * - previous RPC, if any, has done and
-         * - no new RPC is initiated.
-         * - my batch is still active; no one can run it again now.
-         * Cancel pending schedules and prevent future schedule attempts:
-         */
-       swi_exit_workitem(wi);
+       /*
+        * No one can schedule me now since:
+        * - previous RPC, if any, has done and
+        * - no new RPC is initiated.
+        * - my batch is still active; no one can run it again now.
+        * Cancel pending schedules and prevent future schedule attempts:
+        */
        sfw_test_unit_done(tsu);
-       return 1;
 }
 
 static int
@@ -1033,7 +1039,9 @@ sfw_run_batch(struct sfw_batch *tsb)
                        tsu->tsu_loop = tsi->tsi_loop;
                        wi = &tsu->tsu_worker;
                        swi_init_workitem(wi, sfw_run_test,
-                                         lst_sched_test[lnet_cpt_of_nid(tsu->tsu_dest.nid, NULL)]);
+                                         lst_test_wq[lnet_cpt_of_nid(
+                                                             tsu->tsu_dest.nid,
+                                                             NULL)]);
                        swi_schedule_workitem(wi);
                }
        }
@@ -1413,14 +1421,15 @@ sfw_create_rpc(struct lnet_process_id peer, int service,
                rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
                                     struct srpc_client_rpc, crpc_list);
                list_del(&rpc->crpc_list);
-
-                srpc_init_client_rpc(rpc, peer, service, 0, 0,
-                                     done, sfw_client_rpc_fini, priv);
-        }
-
+       }
        spin_unlock(&sfw_data.fw_lock);
 
-       if (rpc == NULL) {
+       if (rpc) {
+               /* Ensure that rpc is done */
+               swi_cancel_workitem(&rpc->crpc_wi);
+               srpc_init_client_rpc(rpc, peer, service, 0, 0,
+                                    done, sfw_client_rpc_fini, priv);
+       } else {
                rpc = srpc_create_client_rpc(peer, service,
                                             nbulkiov, bulklen, done,
                                             nbulkiov != 0 ?  NULL :
index 1441600..ae73095 100644 (file)
@@ -45,8 +45,8 @@ enum {
 
 static int lst_init_step = LST_INIT_NONE;
 
-struct cfs_wi_sched *lst_sched_serial;
-struct cfs_wi_sched **lst_sched_test;
+struct workqueue_struct *lst_serial_wq;
+struct workqueue_struct **lst_test_wq;
 
 static void
 lnet_selftest_exit(void)
@@ -66,17 +66,17 @@ lnet_selftest_exit(void)
        case LST_INIT_WI_TEST:
                for (i = 0;
                     i < cfs_cpt_number(lnet_cpt_table()); i++) {
-                       if (lst_sched_test[i] == NULL)
+                       if (!lst_test_wq[i])
                                continue;
-                       cfs_wi_sched_destroy(lst_sched_test[i]);
+                       destroy_workqueue(lst_test_wq[i]);
                }
-               CFS_FREE_PTR_ARRAY(lst_sched_test,
+               CFS_FREE_PTR_ARRAY(lst_test_wq,
                                   cfs_cpt_number(lnet_cpt_table()));
-               lst_sched_test = NULL;
+               lst_test_wq = NULL;
                fallthrough;
        case LST_INIT_WI_SERIAL:
-               cfs_wi_sched_destroy(lst_sched_serial);
-               lst_sched_serial = NULL;
+               destroy_workqueue(lst_serial_wq);
+               lst_serial_wq = NULL;
                fallthrough;
        case LST_INIT_NONE:
                break;
@@ -102,20 +102,19 @@ static int __init
 lnet_selftest_init(void)
 {
        int nscheds;
-       int rc;
+       int rc = -ENOMEM;
        int i;
 
-       rc = cfs_wi_sched_create("lst_s", lnet_cpt_table(), CFS_CPT_ANY,
-                                1, &lst_sched_serial);
-       if (rc != 0) {
+       lst_serial_wq = alloc_ordered_workqueue("lst_s", 0);
+       if (!lst_serial_wq) {
                CERROR("Failed to create serial WI scheduler for LST\n");
                return rc;
        }
        lst_init_step = LST_INIT_WI_SERIAL;
 
        nscheds = cfs_cpt_number(lnet_cpt_table());
-       CFS_ALLOC_PTR_ARRAY(lst_sched_test, nscheds);
-       if (lst_sched_test == NULL) {
+       CFS_ALLOC_PTR_ARRAY(lst_test_wq, nscheds);
+       if (!lst_test_wq) {
                rc = -ENOMEM;
                goto error;
        }
@@ -126,11 +125,14 @@ lnet_selftest_init(void)
 
                /* reserve at least one CPU for LND */
                nthrs = max(nthrs - 1, 1);
-               rc = cfs_wi_sched_create("lst_t", lnet_cpt_table(), i,
-                                        nthrs, &lst_sched_test[i]);
-               if (rc != 0) {
-                       CERROR("Failed to create CPU partition affinity WI scheduler %d for LST\n",
-                              i);
+               lst_test_wq[i] = cfs_cpt_bind_workqueue("lst_t",
+                                                       lnet_cpt_table(), 0,
+                                                       i, nthrs);
+               if (IS_ERR(lst_test_wq[i])) {
+                       rc = PTR_ERR(lst_test_wq[i]);
+                       CERROR("Failed to create CPU partition affinity WI scheduler %d for LST: rc = %d\n",
+                              i, rc);
+                       lst_test_wq[i] = NULL;
                        goto error;
                }
        }
index b85e045..5a43ee7 100644 (file)
@@ -92,7 +92,7 @@ srpc_serv_portal(int svc_id)
 }
 
 /* forward ref's */
-static int srpc_handle_rpc(struct swi_workitem *wi);
+static void srpc_handle_rpc(struct swi_workitem *wi);
 
 
 void srpc_get_counters(struct srpc_counters *cnt)
@@ -196,7 +196,7 @@ srpc_init_server_rpc(struct srpc_server_rpc *rpc,
        memset(rpc, 0, sizeof(*rpc));
        swi_init_workitem(&rpc->srpc_wi, srpc_handle_rpc,
                          srpc_serv_is_framework(scd->scd_svc) ?
-                         lst_sched_serial : lst_sched_test[scd->scd_cpt]);
+                         lst_serial_wq : lst_test_wq[scd->scd_cpt]);
 
        rpc->srpc_ev.ev_fired = 1; /* no event expected now */
 
@@ -261,7 +261,7 @@ srpc_service_nrpcs(struct srpc_service *svc)
               max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN);
 }
 
-int srpc_add_buffer(struct swi_workitem *wi);
+void srpc_add_buffer(struct swi_workitem *wi);
 
 static int
 srpc_service_init(struct srpc_service *svc)
@@ -295,10 +295,10 @@ srpc_service_init(struct srpc_service *svc)
                scd->scd_ev.ev_data = scd;
                scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
 
-               /* NB: don't use lst_sched_serial for adding buffer,
+               /* NB: don't use lst_serial_wq for adding buffer,
                 * see details in srpc_service_add_buffers() */
                swi_init_workitem(&scd->scd_buf_wi,
-                                 srpc_add_buffer, lst_sched_test[i]);
+                                 srpc_add_buffer, lst_test_wq[i]);
 
                if (i != 0 && srpc_serv_is_framework(svc)) {
                        /* NB: framework service only needs srpc_service_cd for
@@ -534,7 +534,7 @@ __must_hold(&scd->scd_lock)
        return rc;
 }
 
-int
+void
 srpc_add_buffer(struct swi_workitem *wi)
 {
        struct srpc_service_cd *scd = container_of(wi, struct srpc_service_cd,
@@ -591,8 +591,8 @@ srpc_add_buffer(struct swi_workitem *wi)
                scd->scd_buf_posting--;
        }
 
+       wi->swi_state = SWI_STATE_RUNNING;
        spin_unlock(&scd->scd_lock);
-       return 0;
 }
 
 int
@@ -624,15 +624,15 @@ srpc_service_add_buffers(struct srpc_service *sv, int nbuffer)
                spin_lock(&scd->scd_lock);
                /*
                 * NB: srpc_service_add_buffers() can be called inside
-                * thread context of lst_sched_serial, and we don't normally
+                * thread context of lst_serial_wq, and we don't normally
                 * allow to sleep inside thread context of WI scheduler
                 * because it will block current scheduler thread from doing
                 * anything else, even worse, it could deadlock if it's
                 * waiting on result from another WI of the same scheduler.
                 * However, it's safe at here because scd_buf_wi is scheduled
-                * by thread in a different WI scheduler (lst_sched_test),
+                * by thread in a different WI scheduler (lst_test_wq),
                 * so we don't have any risk of deadlock, though this could
-                * block all WIs pending on lst_sched_serial for a moment
+                * block all WIs pending on lst_serial_wq for a moment
                 * which is not good but not fatal.
                 */
                lst_wait_until(scd->scd_buf_err != 0 ||
@@ -679,12 +679,9 @@ srpc_finish_service(struct srpc_service *sv)
        LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */
 
        cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
-               spin_lock(&scd->scd_lock);
-               if (!swi_deschedule_workitem(&scd->scd_buf_wi)) {
-                       spin_unlock(&scd->scd_lock);
-                       return 0;
-               }
+               swi_cancel_workitem(&scd->scd_buf_wi);
 
+               spin_lock(&scd->scd_lock);
                if (scd->scd_buf_nposted > 0) {
                        CDEBUG(D_NET, "waiting for %d posted buffers to unlink\n",
                               scd->scd_buf_nposted);
@@ -699,11 +696,9 @@ srpc_finish_service(struct srpc_service *sv)
 
                rpc = list_entry(scd->scd_rpc_active.next,
                                 struct srpc_server_rpc, srpc_list);
-               CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s scheduled %d running %d, ev fired %d type %d status %d lnet %d\n",
+               CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s, ev fired %d type %d status %d lnet %d\n",
                        rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
                        swi_state2str(rpc->srpc_wi.swi_state),
-                       rpc->srpc_wi.swi_workitem.wi_scheduled,
-                       rpc->srpc_wi.swi_workitem.wi_running,
                        rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
                        rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
                spin_unlock(&scd->scd_lock);
@@ -926,8 +921,6 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
        struct srpc_service     *sv  = scd->scd_svc;
        struct srpc_buffer *buffer;
 
-       LASSERT(status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
-
        rpc->srpc_status = status;
 
        CDEBUG_LIMIT(status == 0 ? D_NET : D_NETERROR,
@@ -961,7 +954,7 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
         * Cancel pending schedules and prevent future schedule attempts:
         */
        LASSERT(rpc->srpc_ev.ev_fired);
-       swi_exit_workitem(&rpc->srpc_wi);
+       rpc->srpc_wi.swi_state = SWI_STATE_DONE;
 
        if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
                buffer = list_entry(scd->scd_buf_blocked.next,
@@ -979,7 +972,7 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
 }
 
 /* handles an incoming RPC */
-static int srpc_handle_rpc(struct swi_workitem *wi)
+static void srpc_handle_rpc(struct swi_workitem *wi)
 {
        struct srpc_server_rpc *rpc = container_of(wi, struct srpc_server_rpc,
                                                   srpc_wi);
@@ -988,22 +981,23 @@ static int srpc_handle_rpc(struct swi_workitem *wi)
        struct srpc_event *ev = &rpc->srpc_ev;
        int rc = 0;
 
-       LASSERT(wi == &rpc->srpc_wi);
-
        spin_lock(&scd->scd_lock);
+       if (wi->swi_state == SWI_STATE_DONE) {
+               spin_unlock(&scd->scd_lock);
+               return;
+       }
 
        if (sv->sv_shuttingdown || rpc->srpc_aborted) {
+               wi->swi_state = SWI_STATE_DONE;
                spin_unlock(&scd->scd_lock);
 
                if (rpc->srpc_bulk != NULL)
                        LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
                LNetMDUnlink(rpc->srpc_replymdh);
 
-               if (ev->ev_fired) /* no more event, OK to finish */
+               if (ev->ev_fired) /* no more event, OK to finish */
                        srpc_server_rpc_done(rpc, -ESHUTDOWN);
-                       return 1;
-               }
-               return 0;
+               return;
        }
 
        spin_unlock(&scd->scd_lock);
@@ -1022,7 +1016,7 @@ static int srpc_handle_rpc(struct swi_workitem *wi)
                if (msg->msg_magic == 0) {
                        /* moaned already in srpc_lnet_ev_handler */
                        srpc_server_rpc_done(rpc, EBADMSG);
-                       return 1;
+                       return;
                }
 
                srpc_unpack_msg_hdr(msg);
@@ -1038,7 +1032,7 @@ static int srpc_handle_rpc(struct swi_workitem *wi)
                        LASSERT(reply->status == 0 || !rpc->srpc_bulk);
                        if (rc != 0) {
                                srpc_server_rpc_done(rpc, rc);
-                               return 1;
+                               return;
                        }
                }
 
@@ -1047,7 +1041,7 @@ static int srpc_handle_rpc(struct swi_workitem *wi)
                if (rpc->srpc_bulk != NULL) {
                        rc = srpc_do_bulk(rpc);
                        if (rc == 0)
-                               return 0; /* wait for bulk */
+                               return; /* wait for bulk */
 
                        LASSERT(ev->ev_fired);
                        ev->ev_status = rc;
@@ -1065,16 +1059,16 @@ static int srpc_handle_rpc(struct swi_workitem *wi)
 
                        if (rc != 0) {
                                srpc_server_rpc_done(rpc, rc);
-                               return 1;
+                               return;
                        }
                }
 
                wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
                rc = srpc_send_reply(rpc);
                if (rc == 0)
-                       return 0; /* wait for reply */
+                       return; /* wait for reply */
                srpc_server_rpc_done(rpc, rc);
-               return 1;
+               return;
 
        case SWI_STATE_REPLY_SUBMITTED:
                if (!ev->ev_fired) {
@@ -1087,10 +1081,8 @@ static int srpc_handle_rpc(struct swi_workitem *wi)
 
                wi->swi_state = SWI_STATE_DONE;
                srpc_server_rpc_done(rpc, ev->ev_status);
-               return 1;
+               return;
        }
-
-       return 0;
 }
 
 static void
@@ -1159,8 +1151,6 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
 {
        struct swi_workitem *wi = &rpc->crpc_wi;
 
-       LASSERT(status != 0 || wi->swi_state == SWI_STATE_DONE);
-
        spin_lock(&rpc->crpc_lock);
 
        rpc->crpc_closed = 1;
@@ -1183,7 +1173,7 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
         * Cancel pending schedules and prevent future schedule attempts:
         */
        LASSERT(!srpc_event_pending(rpc));
-       swi_exit_workitem(wi);
+       wi->swi_state = SWI_STATE_DONE;
 
        spin_unlock(&rpc->crpc_lock);
 
@@ -1191,7 +1181,7 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
 }
 
 /* sends an outgoing RPC */
-int
+void
 srpc_send_rpc(struct swi_workitem *wi)
 {
        int rc = 0;
@@ -1210,6 +1200,10 @@ srpc_send_rpc(struct swi_workitem *wi)
        do_bulk = rpc->crpc_bulk.bk_niov > 0;
 
        spin_lock(&rpc->crpc_lock);
+       if (wi->swi_state == SWI_STATE_DONE) {
+               spin_unlock(&rpc->crpc_lock);
+               return;
+       }
 
        if (rpc->crpc_aborted) {
                spin_unlock(&rpc->crpc_lock);
@@ -1227,7 +1221,7 @@ srpc_send_rpc(struct swi_workitem *wi)
                rc = srpc_prepare_reply(rpc);
                if (rc != 0) {
                        srpc_client_rpc_done(rpc, rc);
-                       return 1;
+                       return;
                }
 
                rc = srpc_prepare_bulk(rpc);
@@ -1303,7 +1297,7 @@ srpc_send_rpc(struct swi_workitem *wi)
 
                wi->swi_state = SWI_STATE_DONE;
                srpc_client_rpc_done(rpc, rc);
-               return 1;
+               return;
        }
 
        if (rc != 0) {
@@ -1320,10 +1314,9 @@ abort:
 
                if (!srpc_event_pending(rpc)) {
                        srpc_client_rpc_done(rpc, -EINTR);
-                       return 1;
+                       return;
                }
        }
-       return 0;
 }
 
 struct srpc_client_rpc *
index 0b609ad..b1f6d46 100644 (file)
@@ -127,14 +127,18 @@ enum lnet_selftest_group_nodelist_prop_attrs {
 
 #define LNET_SELFTEST_GROUP_NODELIST_PROP_MAX  (__LNET_SELFTEST_GROUP_NODELIST_PROP_MAX_PLUS_ONE - 1)
 
-#define SWI_STATE_NEWBORN                  0
-#define SWI_STATE_REPLY_SUBMITTED          1
-#define SWI_STATE_REPLY_SENT               2
-#define SWI_STATE_REQUEST_SUBMITTED        3
-#define SWI_STATE_REQUEST_SENT             4
-#define SWI_STATE_REPLY_RECEIVED           5
-#define SWI_STATE_BULK_STARTED             6
-#define SWI_STATE_DONE                     10
+enum lsr_swi_state {
+       SWI_STATE_DONE = 0,
+       SWI_STATE_NEWBORN,
+       SWI_STATE_REPLY_SUBMITTED,
+       SWI_STATE_REPLY_SENT,
+       SWI_STATE_REQUEST_SUBMITTED,
+       SWI_STATE_REQUEST_SENT,
+       SWI_STATE_REPLY_RECEIVED,
+       SWI_STATE_BULK_STARTED,
+       SWI_STATE_RUNNING,
+       SWI_STATE_PAUSE,
+};
 
 /* forward refs */
 struct srpc_service;
@@ -243,13 +247,13 @@ struct srpc_buffer {
 };
 
 struct swi_workitem;
-typedef int (*swi_action_t)(struct swi_workitem *);
+typedef void (*swi_action_t)(struct swi_workitem *);
 
 struct swi_workitem {
-       struct cfs_wi_sched     *swi_sched;
-       struct cfs_workitem     swi_workitem;
-        swi_action_t         swi_action;
-        int                  swi_state;
+       struct workqueue_struct *swi_wq;
+       struct work_struct      swi_work;
+       swi_action_t            swi_action;
+       enum lsr_swi_state      swi_state;
 };
 
 /* server-side state of a RPC */
@@ -526,7 +530,7 @@ void srpc_free_bulk(struct srpc_bulk *bk);
 struct srpc_bulk *srpc_alloc_bulk(int cpt, unsigned int off,
                                  unsigned int bulk_npg, unsigned int bulk_len,
                                  int sink);
-int srpc_send_rpc(struct swi_workitem *wi);
+void srpc_send_rpc(struct swi_workitem *wi);
 int srpc_send_reply(struct srpc_server_rpc *rpc);
 int srpc_add_service(struct srpc_service *sv);
 int srpc_remove_service(struct srpc_service *sv);
@@ -537,8 +541,8 @@ int srpc_service_add_buffers(struct srpc_service *sv, int nbuffer);
 void srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer);
 void srpc_get_counters(struct srpc_counters *cnt);
 
-extern struct cfs_wi_sched *lst_sched_serial;
-extern struct cfs_wi_sched **lst_sched_test;
+extern struct workqueue_struct *lst_serial_wq;
+extern struct workqueue_struct **lst_test_wq;
 
 static inline int
 srpc_serv_is_framework(struct srpc_service *svc)
@@ -546,41 +550,36 @@ srpc_serv_is_framework(struct srpc_service *svc)
        return svc->sv_id < SRPC_FRAMEWORK_SERVICE_MAX_ID;
 }
 
-static inline int
-swi_wi_action(struct cfs_workitem *wi)
+static void
+swi_wi_action(struct work_struct *wi)
 {
        struct swi_workitem *swi;
 
-       swi = container_of(wi, struct swi_workitem, swi_workitem);
-       return swi->swi_action(swi);
+       swi = container_of(wi, struct swi_workitem, swi_work);
+       swi->swi_action(swi);
 }
 
 static inline void
 swi_init_workitem(struct swi_workitem *swi,
-                 swi_action_t action, struct cfs_wi_sched *sched)
+                 swi_action_t action, struct workqueue_struct *wq)
 {
-       swi->swi_sched  = sched;
+       swi->swi_wq = wq;
        swi->swi_action = action;
        swi->swi_state  = SWI_STATE_NEWBORN;
-       cfs_wi_init(&swi->swi_workitem, swi_wi_action);
+       INIT_WORK(&swi->swi_work, swi_wi_action);
 }
 
 static inline void
 swi_schedule_workitem(struct swi_workitem *wi)
 {
-       cfs_wi_schedule(wi->swi_sched, &wi->swi_workitem);
-}
-
-static inline void
-swi_exit_workitem(struct swi_workitem *swi)
-{
-       cfs_wi_exit(swi->swi_sched, &swi->swi_workitem);
+       queue_work(wi->swi_wq, &wi->swi_work);
 }
 
 static inline int
-swi_deschedule_workitem(struct swi_workitem *swi)
+swi_cancel_workitem(struct swi_workitem *swi)
 {
-       return cfs_wi_deschedule(swi->swi_sched, &swi->swi_workitem);
+       swi->swi_state = SWI_STATE_DONE;
+       return cancel_work_sync(&swi->swi_work);
 }
 
 int sfw_startup(void);
@@ -615,7 +614,7 @@ srpc_init_client_rpc(struct srpc_client_rpc *rpc, struct lnet_process_id peer,
 
        INIT_LIST_HEAD(&rpc->crpc_list);
        swi_init_workitem(&rpc->crpc_wi, srpc_send_rpc,
-                         lst_sched_test[lnet_cpt_of_nid(peer.nid, NULL)]);
+                         lst_test_wq[lnet_cpt_of_nid(peer.nid, NULL)]);
        spin_lock_init(&rpc->crpc_lock);
        atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */