/* Called with hold of tsi->tsi_lock */
LASSERT(list_empty(&rpc->crpc_list));
+ rpc->crpc_wi.swi_state = SWI_STATE_DONE;
list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
}
rpc = list_entry(tsi->tsi_free_rpcs.next,
struct srpc_client_rpc, crpc_list);
list_del(&rpc->crpc_list);
+ swi_cancel_workitem(&rpc->crpc_wi);
LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
}
blklen, sfw_test_rpc_done,
sfw_test_rpc_fini, tsu);
} else {
+ swi_cancel_workitem(&rpc->crpc_wi);
srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
blklen, sfw_test_rpc_done,
sfw_test_rpc_fini, tsu);
return 0;
}
-static int
+static void
sfw_run_test(struct swi_workitem *wi)
{
struct sfw_test_unit *tsu = container_of(wi, struct sfw_test_unit, tsu_worker);
struct sfw_test_instance *tsi = tsu->tsu_instance;
struct srpc_client_rpc *rpc = NULL;
- LASSERT (wi == &tsu->tsu_worker);
-
- if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
- LASSERT (rpc == NULL);
- goto test_done;
- }
+ if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) {
+ LASSERT(rpc == NULL);
+ wi->swi_state = SWI_STATE_DONE;
+ goto test_done;
+ }
- LASSERT (rpc != NULL);
+ LASSERT(rpc != NULL);
spin_lock(&tsi->tsi_lock);
+ if (wi->swi_state == SWI_STATE_DONE) {
+ spin_unlock(&tsi->tsi_lock);
+ return;
+ }
if (tsi->tsi_stopping) {
+ wi->swi_state = SWI_STATE_DONE;
list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
spin_unlock(&tsi->tsi_lock);
goto test_done;
tsu->tsu_loop--;
list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
+ wi->swi_state = SWI_STATE_RUNNING;
spin_unlock(&tsi->tsi_lock);
spin_lock(&rpc->crpc_lock);
rpc->crpc_timeout = rpc_timeout;
srpc_post_rpc(rpc);
spin_unlock(&rpc->crpc_lock);
- return 0;
+ return;
test_done:
- /*
- * No one can schedule me now since:
- * - previous RPC, if any, has done and
- * - no new RPC is initiated.
- * - my batch is still active; no one can run it again now.
- * Cancel pending schedules and prevent future schedule attempts:
- */
- swi_exit_workitem(wi);
+ /*
+ * No one can schedule me now since:
+ * - previous RPC, if any, has done and
+ * - no new RPC is initiated.
+ * - my batch is still active; no one can run it again now.
+ * Cancel pending schedules and prevent future schedule attempts:
+ */
sfw_test_unit_done(tsu);
- return 1;
}
static int
tsu->tsu_loop = tsi->tsi_loop;
wi = &tsu->tsu_worker;
swi_init_workitem(wi, sfw_run_test,
- lst_sched_test[lnet_cpt_of_nid(tsu->tsu_dest.nid, NULL)]);
+ lst_test_wq[lnet_cpt_of_nid(
+ tsu->tsu_dest.nid,
+ NULL)]);
swi_schedule_workitem(wi);
}
}
rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
struct srpc_client_rpc, crpc_list);
list_del(&rpc->crpc_list);
-
- srpc_init_client_rpc(rpc, peer, service, 0, 0,
- done, sfw_client_rpc_fini, priv);
- }
-
+ }
spin_unlock(&sfw_data.fw_lock);
- if (rpc == NULL) {
+ if (rpc) {
+ /* Ensure that rpc is done */
+ swi_cancel_workitem(&rpc->crpc_wi);
+ srpc_init_client_rpc(rpc, peer, service, 0, 0,
+ done, sfw_client_rpc_fini, priv);
+ } else {
rpc = srpc_create_client_rpc(peer, service,
nbulkiov, bulklen, done,
nbulkiov != 0 ? NULL :
static int lst_init_step = LST_INIT_NONE;
-struct cfs_wi_sched *lst_sched_serial;
-struct cfs_wi_sched **lst_sched_test;
+struct workqueue_struct *lst_serial_wq;
+struct workqueue_struct **lst_test_wq;
static void
lnet_selftest_exit(void)
case LST_INIT_WI_TEST:
for (i = 0;
i < cfs_cpt_number(lnet_cpt_table()); i++) {
- if (lst_sched_test[i] == NULL)
+ if (!lst_test_wq[i])
continue;
- cfs_wi_sched_destroy(lst_sched_test[i]);
+ destroy_workqueue(lst_test_wq[i]);
}
- CFS_FREE_PTR_ARRAY(lst_sched_test,
+ CFS_FREE_PTR_ARRAY(lst_test_wq,
cfs_cpt_number(lnet_cpt_table()));
- lst_sched_test = NULL;
+ lst_test_wq = NULL;
fallthrough;
case LST_INIT_WI_SERIAL:
- cfs_wi_sched_destroy(lst_sched_serial);
- lst_sched_serial = NULL;
+ destroy_workqueue(lst_serial_wq);
+ lst_serial_wq = NULL;
fallthrough;
case LST_INIT_NONE:
break;
lnet_selftest_init(void)
{
int nscheds;
- int rc;
+ int rc = -ENOMEM;
int i;
- rc = cfs_wi_sched_create("lst_s", lnet_cpt_table(), CFS_CPT_ANY,
- 1, &lst_sched_serial);
- if (rc != 0) {
+ lst_serial_wq = alloc_ordered_workqueue("lst_s", 0);
+ if (!lst_serial_wq) {
CERROR("Failed to create serial WI scheduler for LST\n");
return rc;
}
lst_init_step = LST_INIT_WI_SERIAL;
nscheds = cfs_cpt_number(lnet_cpt_table());
- CFS_ALLOC_PTR_ARRAY(lst_sched_test, nscheds);
- if (lst_sched_test == NULL) {
+ CFS_ALLOC_PTR_ARRAY(lst_test_wq, nscheds);
+ if (!lst_test_wq) {
rc = -ENOMEM;
goto error;
}
/* reserve at least one CPU for LND */
nthrs = max(nthrs - 1, 1);
- rc = cfs_wi_sched_create("lst_t", lnet_cpt_table(), i,
- nthrs, &lst_sched_test[i]);
- if (rc != 0) {
- CERROR("Failed to create CPU partition affinity WI scheduler %d for LST\n",
- i);
+ lst_test_wq[i] = cfs_cpt_bind_workqueue("lst_t",
+ lnet_cpt_table(), 0,
+ i, nthrs);
+ if (IS_ERR(lst_test_wq[i])) {
+ rc = PTR_ERR(lst_test_wq[i]);
+ CERROR("Failed to create CPU partition affinity WI scheduler %d for LST: rc = %d\n",
+ i, rc);
+ lst_test_wq[i] = NULL;
goto error;
}
}
}
/* forward ref's */
-static int srpc_handle_rpc(struct swi_workitem *wi);
+static void srpc_handle_rpc(struct swi_workitem *wi);
void srpc_get_counters(struct srpc_counters *cnt)
memset(rpc, 0, sizeof(*rpc));
swi_init_workitem(&rpc->srpc_wi, srpc_handle_rpc,
srpc_serv_is_framework(scd->scd_svc) ?
- lst_sched_serial : lst_sched_test[scd->scd_cpt]);
+ lst_serial_wq : lst_test_wq[scd->scd_cpt]);
rpc->srpc_ev.ev_fired = 1; /* no event expected now */
max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN);
}
-int srpc_add_buffer(struct swi_workitem *wi);
+void srpc_add_buffer(struct swi_workitem *wi);
static int
srpc_service_init(struct srpc_service *svc)
scd->scd_ev.ev_data = scd;
scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
- /* NB: don't use lst_sched_serial for adding buffer,
+ /* NB: don't use lst_serial_wq for adding buffer,
* see details in srpc_service_add_buffers() */
swi_init_workitem(&scd->scd_buf_wi,
- srpc_add_buffer, lst_sched_test[i]);
+ srpc_add_buffer, lst_test_wq[i]);
if (i != 0 && srpc_serv_is_framework(svc)) {
/* NB: framework service only needs srpc_service_cd for
return rc;
}
-int
+void
srpc_add_buffer(struct swi_workitem *wi)
{
struct srpc_service_cd *scd = container_of(wi, struct srpc_service_cd,
scd->scd_buf_posting--;
}
+ wi->swi_state = SWI_STATE_RUNNING;
spin_unlock(&scd->scd_lock);
- return 0;
}
int
spin_lock(&scd->scd_lock);
/*
* NB: srpc_service_add_buffers() can be called inside
- * thread context of lst_sched_serial, and we don't normally
+ * thread context of lst_serial_wq, and we don't normally
* allow to sleep inside thread context of WI scheduler
* because it will block current scheduler thread from doing
* anything else, even worse, it could deadlock if it's
* waiting on result from another WI of the same scheduler.
* However, it's safe at here because scd_buf_wi is scheduled
- * by thread in a different WI scheduler (lst_sched_test),
+ * by thread in a different WI scheduler (lst_test_wq),
* so we don't have any risk of deadlock, though this could
- * block all WIs pending on lst_sched_serial for a moment
+ * block all WIs pending on lst_serial_wq for a moment
* which is not good but not fatal.
*/
lst_wait_until(scd->scd_buf_err != 0 ||
LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */
cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
- spin_lock(&scd->scd_lock);
- if (!swi_deschedule_workitem(&scd->scd_buf_wi)) {
- spin_unlock(&scd->scd_lock);
- return 0;
- }
+ swi_cancel_workitem(&scd->scd_buf_wi);
+ spin_lock(&scd->scd_lock);
if (scd->scd_buf_nposted > 0) {
CDEBUG(D_NET, "waiting for %d posted buffers to unlink\n",
scd->scd_buf_nposted);
rpc = list_entry(scd->scd_rpc_active.next,
struct srpc_server_rpc, srpc_list);
- CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s scheduled %d running %d, ev fired %d type %d status %d lnet %d\n",
+ CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s, ev fired %d type %d status %d lnet %d\n",
rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
swi_state2str(rpc->srpc_wi.swi_state),
- rpc->srpc_wi.swi_workitem.wi_scheduled,
- rpc->srpc_wi.swi_workitem.wi_running,
rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
spin_unlock(&scd->scd_lock);
struct srpc_service *sv = scd->scd_svc;
struct srpc_buffer *buffer;
- LASSERT(status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
-
rpc->srpc_status = status;
CDEBUG_LIMIT(status == 0 ? D_NET : D_NETERROR,
* Cancel pending schedules and prevent future schedule attempts:
*/
LASSERT(rpc->srpc_ev.ev_fired);
- swi_exit_workitem(&rpc->srpc_wi);
+ rpc->srpc_wi.swi_state = SWI_STATE_DONE;
if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
buffer = list_entry(scd->scd_buf_blocked.next,
}
/* handles an incoming RPC */
-static int srpc_handle_rpc(struct swi_workitem *wi)
+static void srpc_handle_rpc(struct swi_workitem *wi)
{
struct srpc_server_rpc *rpc = container_of(wi, struct srpc_server_rpc,
srpc_wi);
struct srpc_event *ev = &rpc->srpc_ev;
int rc = 0;
- LASSERT(wi == &rpc->srpc_wi);
-
spin_lock(&scd->scd_lock);
+ if (wi->swi_state == SWI_STATE_DONE) {
+ spin_unlock(&scd->scd_lock);
+ return;
+ }
if (sv->sv_shuttingdown || rpc->srpc_aborted) {
+ wi->swi_state = SWI_STATE_DONE;
spin_unlock(&scd->scd_lock);
if (rpc->srpc_bulk != NULL)
LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
LNetMDUnlink(rpc->srpc_replymdh);
- if (ev->ev_fired) { /* no more event, OK to finish */
+ if (ev->ev_fired) /* no more event, OK to finish */
srpc_server_rpc_done(rpc, -ESHUTDOWN);
- return 1;
- }
- return 0;
+ return;
}
spin_unlock(&scd->scd_lock);
if (msg->msg_magic == 0) {
/* moaned already in srpc_lnet_ev_handler */
srpc_server_rpc_done(rpc, EBADMSG);
- return 1;
+ return;
}
srpc_unpack_msg_hdr(msg);
LASSERT(reply->status == 0 || !rpc->srpc_bulk);
if (rc != 0) {
srpc_server_rpc_done(rpc, rc);
- return 1;
+ return;
}
}
if (rpc->srpc_bulk != NULL) {
rc = srpc_do_bulk(rpc);
if (rc == 0)
- return 0; /* wait for bulk */
+ return; /* wait for bulk */
LASSERT(ev->ev_fired);
ev->ev_status = rc;
if (rc != 0) {
srpc_server_rpc_done(rpc, rc);
- return 1;
+ return;
}
}
wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
rc = srpc_send_reply(rpc);
if (rc == 0)
- return 0; /* wait for reply */
+ return; /* wait for reply */
srpc_server_rpc_done(rpc, rc);
- return 1;
+ return;
case SWI_STATE_REPLY_SUBMITTED:
if (!ev->ev_fired) {
wi->swi_state = SWI_STATE_DONE;
srpc_server_rpc_done(rpc, ev->ev_status);
- return 1;
+ return;
}
-
- return 0;
}
static void
{
struct swi_workitem *wi = &rpc->crpc_wi;
- LASSERT(status != 0 || wi->swi_state == SWI_STATE_DONE);
-
spin_lock(&rpc->crpc_lock);
rpc->crpc_closed = 1;
* Cancel pending schedules and prevent future schedule attempts:
*/
LASSERT(!srpc_event_pending(rpc));
- swi_exit_workitem(wi);
+ wi->swi_state = SWI_STATE_DONE;
spin_unlock(&rpc->crpc_lock);
}
/* sends an outgoing RPC */
-int
+void
srpc_send_rpc(struct swi_workitem *wi)
{
int rc = 0;
do_bulk = rpc->crpc_bulk.bk_niov > 0;
spin_lock(&rpc->crpc_lock);
+ if (wi->swi_state == SWI_STATE_DONE) {
+ spin_unlock(&rpc->crpc_lock);
+ return;
+ }
if (rpc->crpc_aborted) {
spin_unlock(&rpc->crpc_lock);
rc = srpc_prepare_reply(rpc);
if (rc != 0) {
srpc_client_rpc_done(rpc, rc);
- return 1;
+ return;
}
rc = srpc_prepare_bulk(rpc);
wi->swi_state = SWI_STATE_DONE;
srpc_client_rpc_done(rpc, rc);
- return 1;
+ return;
}
if (rc != 0) {
if (!srpc_event_pending(rpc)) {
srpc_client_rpc_done(rpc, -EINTR);
- return 1;
+ return;
}
}
- return 0;
}
struct srpc_client_rpc *
#define LNET_SELFTEST_GROUP_NODELIST_PROP_MAX (__LNET_SELFTEST_GROUP_NODELIST_PROP_MAX_PLUS_ONE - 1)
-#define SWI_STATE_NEWBORN 0
-#define SWI_STATE_REPLY_SUBMITTED 1
-#define SWI_STATE_REPLY_SENT 2
-#define SWI_STATE_REQUEST_SUBMITTED 3
-#define SWI_STATE_REQUEST_SENT 4
-#define SWI_STATE_REPLY_RECEIVED 5
-#define SWI_STATE_BULK_STARTED 6
-#define SWI_STATE_DONE 10
+enum lsr_swi_state {
+ SWI_STATE_DONE = 0,
+ SWI_STATE_NEWBORN,
+ SWI_STATE_REPLY_SUBMITTED,
+ SWI_STATE_REPLY_SENT,
+ SWI_STATE_REQUEST_SUBMITTED,
+ SWI_STATE_REQUEST_SENT,
+ SWI_STATE_REPLY_RECEIVED,
+ SWI_STATE_BULK_STARTED,
+ SWI_STATE_RUNNING,
+ SWI_STATE_PAUSE,
+};
/* forward refs */
struct srpc_service;
};
struct swi_workitem;
-typedef int (*swi_action_t)(struct swi_workitem *);
+typedef void (*swi_action_t)(struct swi_workitem *);
struct swi_workitem {
- struct cfs_wi_sched *swi_sched;
- struct cfs_workitem swi_workitem;
- swi_action_t swi_action;
- int swi_state;
+ struct workqueue_struct *swi_wq;
+ struct work_struct swi_work;
+ swi_action_t swi_action;
+ enum lsr_swi_state swi_state;
};
/* server-side state of a RPC */
struct srpc_bulk *srpc_alloc_bulk(int cpt, unsigned int off,
unsigned int bulk_npg, unsigned int bulk_len,
int sink);
-int srpc_send_rpc(struct swi_workitem *wi);
+void srpc_send_rpc(struct swi_workitem *wi);
int srpc_send_reply(struct srpc_server_rpc *rpc);
int srpc_add_service(struct srpc_service *sv);
int srpc_remove_service(struct srpc_service *sv);
void srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer);
void srpc_get_counters(struct srpc_counters *cnt);
-extern struct cfs_wi_sched *lst_sched_serial;
-extern struct cfs_wi_sched **lst_sched_test;
+extern struct workqueue_struct *lst_serial_wq;
+extern struct workqueue_struct **lst_test_wq;
static inline int
srpc_serv_is_framework(struct srpc_service *svc)
return svc->sv_id < SRPC_FRAMEWORK_SERVICE_MAX_ID;
}
-static inline int
-swi_wi_action(struct cfs_workitem *wi)
+static void
+swi_wi_action(struct work_struct *wi)
{
struct swi_workitem *swi;
- swi = container_of(wi, struct swi_workitem, swi_workitem);
- return swi->swi_action(swi);
+ swi = container_of(wi, struct swi_workitem, swi_work);
+ swi->swi_action(swi);
}
static inline void
swi_init_workitem(struct swi_workitem *swi,
- swi_action_t action, struct cfs_wi_sched *sched)
+ swi_action_t action, struct workqueue_struct *wq)
{
- swi->swi_sched = sched;
+ swi->swi_wq = wq;
swi->swi_action = action;
swi->swi_state = SWI_STATE_NEWBORN;
- cfs_wi_init(&swi->swi_workitem, swi_wi_action);
+ INIT_WORK(&swi->swi_work, swi_wi_action);
}
static inline void
swi_schedule_workitem(struct swi_workitem *wi)
{
- cfs_wi_schedule(wi->swi_sched, &wi->swi_workitem);
-}
-
-static inline void
-swi_exit_workitem(struct swi_workitem *swi)
-{
- cfs_wi_exit(swi->swi_sched, &swi->swi_workitem);
+ queue_work(wi->swi_wq, &wi->swi_work);
}
static inline int
-swi_deschedule_workitem(struct swi_workitem *swi)
+swi_cancel_workitem(struct swi_workitem *swi)
{
- return cfs_wi_deschedule(swi->swi_sched, &swi->swi_workitem);
+ swi->swi_state = SWI_STATE_DONE;
+ return cancel_work_sync(&swi->swi_work);
}
int sfw_startup(void);
INIT_LIST_HEAD(&rpc->crpc_list);
swi_init_workitem(&rpc->crpc_wi, srpc_send_rpc,
- lst_sched_test[lnet_cpt_of_nid(peer.nid, NULL)]);
+ lst_test_wq[lnet_cpt_of_nid(peer.nid, NULL)]);
spin_lock_init(&rpc->crpc_lock);
atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */