From 51dd6269c91dab7543cd9dfd1848c983efa6db36 Mon Sep 17 00:00:00 2001 From: Mr NeilBrown Date: Fri, 17 Jul 2020 19:11:29 -0400 Subject: [PATCH 1/1] LU-9859 lnet: convert selftest to use workqueues Instead of the cfs workitem library, use workqueues. As lnet wants to provide a cpu mask of allowed cpus, it needs to be a WQ_UNBOUND work queue so that tasks can run on cpus other than where they were submitted. We use alloc_ordered_workqueue for lst_sched_serial (now called lst_serial_wq) - "ordered" means the same as "serial" did. We use cfs_cpt_bind_queue() for the other workqueues which sets up the CPU mask as required. An important difference with workqueues is that there is no equivalent to cfs_wi_exit() which can be called in the action function and which will ensure the function is not called again - and that the item is no longer queued. To provide similar semantics we treat swi_state == SWI_STATE_DONE as meaning that the wi is complete and any further calls must be no-op. We also call cancel_work_sync() (via swi_cancel_workitem()) before freeing or reusing memory that held a work-item. To ensure the same exclusion that cfs_wi_exit() provided the state is set and tested under a lock - either crpc_lock, scd_lock, or tsi_lock depending on which structure the wi is embedded in. Another minor difference is that with workqueues the action function returns void, not an int. Also change SWI_STATE_* from #define to an enum. The only place these values are ever stored is in one field in a struct. Linux-commit: 6106c0f82481e686b337ee0c403821fb5c3c17ef Linux-commit: 3fc0b7d3e0a4d37e4c60c2232df4500187a07232 Linux-commit: 7d70718de014ada7280bb011db8655e18ed935b1 Test-Parameters: trivial testlist=lnet-selftest Change-Id: I5ccf1399ebbfdd4cab3696749bd1ec666147b757 Signed-off-by: Mr NeilBrown Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/36991 Tested-by: jenkins Tested-by: Maloo Reviewed-by: James Simmons Reviewed-by: Serguei Smirnov Reviewed-by: Chris Horn Reviewed-by: Frank Sehr Reviewed-by: Oleg Drokin --- lnet/selftest/framework.c | 59 +++++++++++++++++++-------------- lnet/selftest/module.c | 40 ++++++++++++----------- lnet/selftest/rpc.c | 83 ++++++++++++++++++++++------------------------- lnet/selftest/selftest.h | 65 ++++++++++++++++++------------------- 4 files changed, 125 insertions(+), 122 deletions(-) diff --git a/lnet/selftest/framework.c b/lnet/selftest/framework.c index 4b65968..919cd05 100644 --- a/lnet/selftest/framework.c +++ b/lnet/selftest/framework.c @@ -552,6 +552,7 @@ sfw_test_rpc_fini(struct srpc_client_rpc *rpc) /* Called with hold of tsi->tsi_lock */ LASSERT(list_empty(&rpc->crpc_list)); + rpc->crpc_wi.swi_state = SWI_STATE_DONE; list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs); } @@ -653,6 +654,7 @@ sfw_destroy_test_instance(struct sfw_test_instance *tsi) rpc = list_entry(tsi->tsi_free_rpcs.next, struct srpc_client_rpc, crpc_list); list_del(&rpc->crpc_list); + swi_cancel_workitem(&rpc->crpc_wi); LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc)); } @@ -941,6 +943,7 @@ sfw_create_test_rpc(struct sfw_test_unit *tsu, struct lnet_process_id peer, blklen, sfw_test_rpc_done, sfw_test_rpc_fini, tsu); } else { + swi_cancel_workitem(&rpc->crpc_wi); srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk, blklen, sfw_test_rpc_done, sfw_test_rpc_fini, tsu); @@ -957,25 +960,29 @@ sfw_create_test_rpc(struct sfw_test_unit *tsu, struct lnet_process_id peer, return 0; } -static int +static void sfw_run_test(struct swi_workitem *wi) { struct sfw_test_unit *tsu = container_of(wi, struct sfw_test_unit, tsu_worker); struct sfw_test_instance *tsi = tsu->tsu_instance; struct srpc_client_rpc *rpc = NULL; - LASSERT (wi == &tsu->tsu_worker); - - if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) { - LASSERT (rpc == NULL); - goto test_done; - } + if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) { + LASSERT(rpc == NULL); + wi->swi_state = SWI_STATE_DONE; + goto test_done; + } - LASSERT (rpc != NULL); + LASSERT(rpc != NULL); spin_lock(&tsi->tsi_lock); + if (wi->swi_state == SWI_STATE_DONE) { + spin_unlock(&tsi->tsi_lock); + return; + } if (tsi->tsi_stopping) { + wi->swi_state = SWI_STATE_DONE; list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs); spin_unlock(&tsi->tsi_lock); goto test_done; @@ -985,25 +992,24 @@ sfw_run_test(struct swi_workitem *wi) tsu->tsu_loop--; list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs); + wi->swi_state = SWI_STATE_RUNNING; spin_unlock(&tsi->tsi_lock); spin_lock(&rpc->crpc_lock); rpc->crpc_timeout = rpc_timeout; srpc_post_rpc(rpc); spin_unlock(&rpc->crpc_lock); - return 0; + return; test_done: - /* - * No one can schedule me now since: - * - previous RPC, if any, has done and - * - no new RPC is initiated. - * - my batch is still active; no one can run it again now. - * Cancel pending schedules and prevent future schedule attempts: - */ - swi_exit_workitem(wi); + /* + * No one can schedule me now since: + * - previous RPC, if any, has done and + * - no new RPC is initiated. + * - my batch is still active; no one can run it again now. + * Cancel pending schedules and prevent future schedule attempts: + */ sfw_test_unit_done(tsu); - return 1; } static int @@ -1033,7 +1039,9 @@ sfw_run_batch(struct sfw_batch *tsb) tsu->tsu_loop = tsi->tsi_loop; wi = &tsu->tsu_worker; swi_init_workitem(wi, sfw_run_test, - lst_sched_test[lnet_cpt_of_nid(tsu->tsu_dest.nid, NULL)]); + lst_test_wq[lnet_cpt_of_nid( + tsu->tsu_dest.nid, + NULL)]); swi_schedule_workitem(wi); } } @@ -1413,14 +1421,15 @@ sfw_create_rpc(struct lnet_process_id peer, int service, rpc = list_entry(sfw_data.fw_zombie_rpcs.next, struct srpc_client_rpc, crpc_list); list_del(&rpc->crpc_list); - - srpc_init_client_rpc(rpc, peer, service, 0, 0, - done, sfw_client_rpc_fini, priv); - } - + } spin_unlock(&sfw_data.fw_lock); - if (rpc == NULL) { + if (rpc) { + /* Ensure that rpc is done */ + swi_cancel_workitem(&rpc->crpc_wi); + srpc_init_client_rpc(rpc, peer, service, 0, 0, + done, sfw_client_rpc_fini, priv); + } else { rpc = srpc_create_client_rpc(peer, service, nbulkiov, bulklen, done, nbulkiov != 0 ? NULL : diff --git a/lnet/selftest/module.c b/lnet/selftest/module.c index 1441600..ae73095 100644 --- a/lnet/selftest/module.c +++ b/lnet/selftest/module.c @@ -45,8 +45,8 @@ enum { static int lst_init_step = LST_INIT_NONE; -struct cfs_wi_sched *lst_sched_serial; -struct cfs_wi_sched **lst_sched_test; +struct workqueue_struct *lst_serial_wq; +struct workqueue_struct **lst_test_wq; static void lnet_selftest_exit(void) @@ -66,17 +66,17 @@ lnet_selftest_exit(void) case LST_INIT_WI_TEST: for (i = 0; i < cfs_cpt_number(lnet_cpt_table()); i++) { - if (lst_sched_test[i] == NULL) + if (!lst_test_wq[i]) continue; - cfs_wi_sched_destroy(lst_sched_test[i]); + destroy_workqueue(lst_test_wq[i]); } - CFS_FREE_PTR_ARRAY(lst_sched_test, + CFS_FREE_PTR_ARRAY(lst_test_wq, cfs_cpt_number(lnet_cpt_table())); - lst_sched_test = NULL; + lst_test_wq = NULL; fallthrough; case LST_INIT_WI_SERIAL: - cfs_wi_sched_destroy(lst_sched_serial); - lst_sched_serial = NULL; + destroy_workqueue(lst_serial_wq); + lst_serial_wq = NULL; fallthrough; case LST_INIT_NONE: break; @@ -102,20 +102,19 @@ static int __init lnet_selftest_init(void) { int nscheds; - int rc; + int rc = -ENOMEM; int i; - rc = cfs_wi_sched_create("lst_s", lnet_cpt_table(), CFS_CPT_ANY, - 1, &lst_sched_serial); - if (rc != 0) { + lst_serial_wq = alloc_ordered_workqueue("lst_s", 0); + if (!lst_serial_wq) { CERROR("Failed to create serial WI scheduler for LST\n"); return rc; } lst_init_step = LST_INIT_WI_SERIAL; nscheds = cfs_cpt_number(lnet_cpt_table()); - CFS_ALLOC_PTR_ARRAY(lst_sched_test, nscheds); - if (lst_sched_test == NULL) { + CFS_ALLOC_PTR_ARRAY(lst_test_wq, nscheds); + if (!lst_test_wq) { rc = -ENOMEM; goto error; } @@ -126,11 +125,14 @@ lnet_selftest_init(void) /* reserve at least one CPU for LND */ nthrs = max(nthrs - 1, 1); - rc = cfs_wi_sched_create("lst_t", lnet_cpt_table(), i, - nthrs, &lst_sched_test[i]); - if (rc != 0) { - CERROR("Failed to create CPU partition affinity WI scheduler %d for LST\n", - i); + lst_test_wq[i] = cfs_cpt_bind_workqueue("lst_t", + lnet_cpt_table(), 0, + i, nthrs); + if (IS_ERR(lst_test_wq[i])) { + rc = PTR_ERR(lst_test_wq[i]); + CERROR("Failed to create CPU partition affinity WI scheduler %d for LST: rc = %d\n", + i, rc); + lst_test_wq[i] = NULL; goto error; } } diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index b85e045..5a43ee7 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -92,7 +92,7 @@ srpc_serv_portal(int svc_id) } /* forward ref's */ -static int srpc_handle_rpc(struct swi_workitem *wi); +static void srpc_handle_rpc(struct swi_workitem *wi); void srpc_get_counters(struct srpc_counters *cnt) @@ -196,7 +196,7 @@ srpc_init_server_rpc(struct srpc_server_rpc *rpc, memset(rpc, 0, sizeof(*rpc)); swi_init_workitem(&rpc->srpc_wi, srpc_handle_rpc, srpc_serv_is_framework(scd->scd_svc) ? - lst_sched_serial : lst_sched_test[scd->scd_cpt]); + lst_serial_wq : lst_test_wq[scd->scd_cpt]); rpc->srpc_ev.ev_fired = 1; /* no event expected now */ @@ -261,7 +261,7 @@ srpc_service_nrpcs(struct srpc_service *svc) max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN); } -int srpc_add_buffer(struct swi_workitem *wi); +void srpc_add_buffer(struct swi_workitem *wi); static int srpc_service_init(struct srpc_service *svc) @@ -295,10 +295,10 @@ srpc_service_init(struct srpc_service *svc) scd->scd_ev.ev_data = scd; scd->scd_ev.ev_type = SRPC_REQUEST_RCVD; - /* NB: don't use lst_sched_serial for adding buffer, + /* NB: don't use lst_serial_wq for adding buffer, * see details in srpc_service_add_buffers() */ swi_init_workitem(&scd->scd_buf_wi, - srpc_add_buffer, lst_sched_test[i]); + srpc_add_buffer, lst_test_wq[i]); if (i != 0 && srpc_serv_is_framework(svc)) { /* NB: framework service only needs srpc_service_cd for @@ -534,7 +534,7 @@ __must_hold(&scd->scd_lock) return rc; } -int +void srpc_add_buffer(struct swi_workitem *wi) { struct srpc_service_cd *scd = container_of(wi, struct srpc_service_cd, @@ -591,8 +591,8 @@ srpc_add_buffer(struct swi_workitem *wi) scd->scd_buf_posting--; } + wi->swi_state = SWI_STATE_RUNNING; spin_unlock(&scd->scd_lock); - return 0; } int @@ -624,15 +624,15 @@ srpc_service_add_buffers(struct srpc_service *sv, int nbuffer) spin_lock(&scd->scd_lock); /* * NB: srpc_service_add_buffers() can be called inside - * thread context of lst_sched_serial, and we don't normally + * thread context of lst_serial_wq, and we don't normally * allow to sleep inside thread context of WI scheduler * because it will block current scheduler thread from doing * anything else, even worse, it could deadlock if it's * waiting on result from another WI of the same scheduler. * However, it's safe at here because scd_buf_wi is scheduled - * by thread in a different WI scheduler (lst_sched_test), + * by thread in a different WI scheduler (lst_test_wq), * so we don't have any risk of deadlock, though this could - * block all WIs pending on lst_sched_serial for a moment + * block all WIs pending on lst_serial_wq for a moment * which is not good but not fatal. */ lst_wait_until(scd->scd_buf_err != 0 || @@ -679,12 +679,9 @@ srpc_finish_service(struct srpc_service *sv) LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */ cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { - spin_lock(&scd->scd_lock); - if (!swi_deschedule_workitem(&scd->scd_buf_wi)) { - spin_unlock(&scd->scd_lock); - return 0; - } + swi_cancel_workitem(&scd->scd_buf_wi); + spin_lock(&scd->scd_lock); if (scd->scd_buf_nposted > 0) { CDEBUG(D_NET, "waiting for %d posted buffers to unlink\n", scd->scd_buf_nposted); @@ -699,11 +696,9 @@ srpc_finish_service(struct srpc_service *sv) rpc = list_entry(scd->scd_rpc_active.next, struct srpc_server_rpc, srpc_list); - CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s scheduled %d running %d, ev fired %d type %d status %d lnet %d\n", + CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s, ev fired %d type %d status %d lnet %d\n", rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), swi_state2str(rpc->srpc_wi.swi_state), - rpc->srpc_wi.swi_workitem.wi_scheduled, - rpc->srpc_wi.swi_workitem.wi_running, rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type, rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet); spin_unlock(&scd->scd_lock); @@ -926,8 +921,6 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status) struct srpc_service *sv = scd->scd_svc; struct srpc_buffer *buffer; - LASSERT(status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE); - rpc->srpc_status = status; CDEBUG_LIMIT(status == 0 ? D_NET : D_NETERROR, @@ -961,7 +954,7 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status) * Cancel pending schedules and prevent future schedule attempts: */ LASSERT(rpc->srpc_ev.ev_fired); - swi_exit_workitem(&rpc->srpc_wi); + rpc->srpc_wi.swi_state = SWI_STATE_DONE; if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) { buffer = list_entry(scd->scd_buf_blocked.next, @@ -979,7 +972,7 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status) } /* handles an incoming RPC */ -static int srpc_handle_rpc(struct swi_workitem *wi) +static void srpc_handle_rpc(struct swi_workitem *wi) { struct srpc_server_rpc *rpc = container_of(wi, struct srpc_server_rpc, srpc_wi); @@ -988,22 +981,23 @@ static int srpc_handle_rpc(struct swi_workitem *wi) struct srpc_event *ev = &rpc->srpc_ev; int rc = 0; - LASSERT(wi == &rpc->srpc_wi); - spin_lock(&scd->scd_lock); + if (wi->swi_state == SWI_STATE_DONE) { + spin_unlock(&scd->scd_lock); + return; + } if (sv->sv_shuttingdown || rpc->srpc_aborted) { + wi->swi_state = SWI_STATE_DONE; spin_unlock(&scd->scd_lock); if (rpc->srpc_bulk != NULL) LNetMDUnlink(rpc->srpc_bulk->bk_mdh); LNetMDUnlink(rpc->srpc_replymdh); - if (ev->ev_fired) { /* no more event, OK to finish */ + if (ev->ev_fired) /* no more event, OK to finish */ srpc_server_rpc_done(rpc, -ESHUTDOWN); - return 1; - } - return 0; + return; } spin_unlock(&scd->scd_lock); @@ -1022,7 +1016,7 @@ static int srpc_handle_rpc(struct swi_workitem *wi) if (msg->msg_magic == 0) { /* moaned already in srpc_lnet_ev_handler */ srpc_server_rpc_done(rpc, EBADMSG); - return 1; + return; } srpc_unpack_msg_hdr(msg); @@ -1038,7 +1032,7 @@ static int srpc_handle_rpc(struct swi_workitem *wi) LASSERT(reply->status == 0 || !rpc->srpc_bulk); if (rc != 0) { srpc_server_rpc_done(rpc, rc); - return 1; + return; } } @@ -1047,7 +1041,7 @@ static int srpc_handle_rpc(struct swi_workitem *wi) if (rpc->srpc_bulk != NULL) { rc = srpc_do_bulk(rpc); if (rc == 0) - return 0; /* wait for bulk */ + return; /* wait for bulk */ LASSERT(ev->ev_fired); ev->ev_status = rc; @@ -1065,16 +1059,16 @@ static int srpc_handle_rpc(struct swi_workitem *wi) if (rc != 0) { srpc_server_rpc_done(rpc, rc); - return 1; + return; } } wi->swi_state = SWI_STATE_REPLY_SUBMITTED; rc = srpc_send_reply(rpc); if (rc == 0) - return 0; /* wait for reply */ + return; /* wait for reply */ srpc_server_rpc_done(rpc, rc); - return 1; + return; case SWI_STATE_REPLY_SUBMITTED: if (!ev->ev_fired) { @@ -1087,10 +1081,8 @@ static int srpc_handle_rpc(struct swi_workitem *wi) wi->swi_state = SWI_STATE_DONE; srpc_server_rpc_done(rpc, ev->ev_status); - return 1; + return; } - - return 0; } static void @@ -1159,8 +1151,6 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status) { struct swi_workitem *wi = &rpc->crpc_wi; - LASSERT(status != 0 || wi->swi_state == SWI_STATE_DONE); - spin_lock(&rpc->crpc_lock); rpc->crpc_closed = 1; @@ -1183,7 +1173,7 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status) * Cancel pending schedules and prevent future schedule attempts: */ LASSERT(!srpc_event_pending(rpc)); - swi_exit_workitem(wi); + wi->swi_state = SWI_STATE_DONE; spin_unlock(&rpc->crpc_lock); @@ -1191,7 +1181,7 @@ srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status) } /* sends an outgoing RPC */ -int +void srpc_send_rpc(struct swi_workitem *wi) { int rc = 0; @@ -1210,6 +1200,10 @@ srpc_send_rpc(struct swi_workitem *wi) do_bulk = rpc->crpc_bulk.bk_niov > 0; spin_lock(&rpc->crpc_lock); + if (wi->swi_state == SWI_STATE_DONE) { + spin_unlock(&rpc->crpc_lock); + return; + } if (rpc->crpc_aborted) { spin_unlock(&rpc->crpc_lock); @@ -1227,7 +1221,7 @@ srpc_send_rpc(struct swi_workitem *wi) rc = srpc_prepare_reply(rpc); if (rc != 0) { srpc_client_rpc_done(rpc, rc); - return 1; + return; } rc = srpc_prepare_bulk(rpc); @@ -1303,7 +1297,7 @@ srpc_send_rpc(struct swi_workitem *wi) wi->swi_state = SWI_STATE_DONE; srpc_client_rpc_done(rpc, rc); - return 1; + return; } if (rc != 0) { @@ -1320,10 +1314,9 @@ abort: if (!srpc_event_pending(rpc)) { srpc_client_rpc_done(rpc, -EINTR); - return 1; + return; } } - return 0; } struct srpc_client_rpc * diff --git a/lnet/selftest/selftest.h b/lnet/selftest/selftest.h index 0b609ad..b1f6d46 100644 --- a/lnet/selftest/selftest.h +++ b/lnet/selftest/selftest.h @@ -127,14 +127,18 @@ enum lnet_selftest_group_nodelist_prop_attrs { #define LNET_SELFTEST_GROUP_NODELIST_PROP_MAX (__LNET_SELFTEST_GROUP_NODELIST_PROP_MAX_PLUS_ONE - 1) -#define SWI_STATE_NEWBORN 0 -#define SWI_STATE_REPLY_SUBMITTED 1 -#define SWI_STATE_REPLY_SENT 2 -#define SWI_STATE_REQUEST_SUBMITTED 3 -#define SWI_STATE_REQUEST_SENT 4 -#define SWI_STATE_REPLY_RECEIVED 5 -#define SWI_STATE_BULK_STARTED 6 -#define SWI_STATE_DONE 10 +enum lsr_swi_state { + SWI_STATE_DONE = 0, + SWI_STATE_NEWBORN, + SWI_STATE_REPLY_SUBMITTED, + SWI_STATE_REPLY_SENT, + SWI_STATE_REQUEST_SUBMITTED, + SWI_STATE_REQUEST_SENT, + SWI_STATE_REPLY_RECEIVED, + SWI_STATE_BULK_STARTED, + SWI_STATE_RUNNING, + SWI_STATE_PAUSE, +}; /* forward refs */ struct srpc_service; @@ -243,13 +247,13 @@ struct srpc_buffer { }; struct swi_workitem; -typedef int (*swi_action_t)(struct swi_workitem *); +typedef void (*swi_action_t)(struct swi_workitem *); struct swi_workitem { - struct cfs_wi_sched *swi_sched; - struct cfs_workitem swi_workitem; - swi_action_t swi_action; - int swi_state; + struct workqueue_struct *swi_wq; + struct work_struct swi_work; + swi_action_t swi_action; + enum lsr_swi_state swi_state; }; /* server-side state of a RPC */ @@ -526,7 +530,7 @@ void srpc_free_bulk(struct srpc_bulk *bk); struct srpc_bulk *srpc_alloc_bulk(int cpt, unsigned int off, unsigned int bulk_npg, unsigned int bulk_len, int sink); -int srpc_send_rpc(struct swi_workitem *wi); +void srpc_send_rpc(struct swi_workitem *wi); int srpc_send_reply(struct srpc_server_rpc *rpc); int srpc_add_service(struct srpc_service *sv); int srpc_remove_service(struct srpc_service *sv); @@ -537,8 +541,8 @@ int srpc_service_add_buffers(struct srpc_service *sv, int nbuffer); void srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer); void srpc_get_counters(struct srpc_counters *cnt); -extern struct cfs_wi_sched *lst_sched_serial; -extern struct cfs_wi_sched **lst_sched_test; +extern struct workqueue_struct *lst_serial_wq; +extern struct workqueue_struct **lst_test_wq; static inline int srpc_serv_is_framework(struct srpc_service *svc) @@ -546,41 +550,36 @@ srpc_serv_is_framework(struct srpc_service *svc) return svc->sv_id < SRPC_FRAMEWORK_SERVICE_MAX_ID; } -static inline int -swi_wi_action(struct cfs_workitem *wi) +static void +swi_wi_action(struct work_struct *wi) { struct swi_workitem *swi; - swi = container_of(wi, struct swi_workitem, swi_workitem); - return swi->swi_action(swi); + swi = container_of(wi, struct swi_workitem, swi_work); + swi->swi_action(swi); } static inline void swi_init_workitem(struct swi_workitem *swi, - swi_action_t action, struct cfs_wi_sched *sched) + swi_action_t action, struct workqueue_struct *wq) { - swi->swi_sched = sched; + swi->swi_wq = wq; swi->swi_action = action; swi->swi_state = SWI_STATE_NEWBORN; - cfs_wi_init(&swi->swi_workitem, swi_wi_action); + INIT_WORK(&swi->swi_work, swi_wi_action); } static inline void swi_schedule_workitem(struct swi_workitem *wi) { - cfs_wi_schedule(wi->swi_sched, &wi->swi_workitem); -} - -static inline void -swi_exit_workitem(struct swi_workitem *swi) -{ - cfs_wi_exit(swi->swi_sched, &swi->swi_workitem); + queue_work(wi->swi_wq, &wi->swi_work); } static inline int -swi_deschedule_workitem(struct swi_workitem *swi) +swi_cancel_workitem(struct swi_workitem *swi) { - return cfs_wi_deschedule(swi->swi_sched, &swi->swi_workitem); + swi->swi_state = SWI_STATE_DONE; + return cancel_work_sync(&swi->swi_work); } int sfw_startup(void); @@ -615,7 +614,7 @@ srpc_init_client_rpc(struct srpc_client_rpc *rpc, struct lnet_process_id peer, INIT_LIST_HEAD(&rpc->crpc_list); swi_init_workitem(&rpc->crpc_wi, srpc_send_rpc, - lst_sched_test[lnet_cpt_of_nid(peer.nid, NULL)]); + lst_test_wq[lnet_cpt_of_nid(peer.nid, NULL)]); spin_lock_init(&rpc->crpc_lock); atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */ -- 1.8.3.1