From d8bb34d0c1e8c3f768161a1281ee52f8361380bc Mon Sep 17 00:00:00 2001 From: isaac Date: Mon, 1 Jun 2009 21:25:03 +0000 Subject: [PATCH] i=liangzhen,b=18075: - an assortment of LNet selftest fixes and enhancements. --- lnet/ChangeLog | 4 + lnet/selftest/conrpc.h | 3 + lnet/selftest/console.c | 13 +- lnet/selftest/framework.c | 12 +- lnet/selftest/rpc.c | 360 ++++++++++------------------------------------ lnet/selftest/selftest.h | 33 ++--- lnet/selftest/workitem.c | 9 +- 7 files changed, 109 insertions(+), 325 deletions(-) diff --git a/lnet/ChangeLog b/lnet/ChangeLog index e4ee186..af2618c 100644 --- a/lnet/ChangeLog +++ b/lnet/ChangeLog @@ -17,6 +17,10 @@ Bugzilla : Description: Details : +Severity : normal +Bugzilla : 18075 +Description: LNet selftest fixes and enhancements + Severity : enhancement Bugzilla : 19156 Description: allow a test node to be a member of multiple test groups diff --git a/lnet/selftest/conrpc.h b/lnet/selftest/conrpc.h index ba2a72c..1bb4963 100644 --- a/lnet/selftest/conrpc.h +++ b/lnet/selftest/conrpc.h @@ -54,6 +54,9 @@ /* Console rpc and rpc transaction */ #define LST_TRANS_TIMEOUT 30 #define LST_TRANS_MIN_TIMEOUT 3 + +#define LST_VALIDATE_TIMEOUT(t) MIN(MAX(t, LST_TRANS_MIN_TIMEOUT), LST_TRANS_TIMEOUT) + #define LST_PING_INTERVAL 8 struct lstcon_rpc_trans; diff --git a/lnet/selftest/console.c b/lnet/selftest/console.c index d8cbabd..12a9f05 100644 --- a/lnet/selftest/console.c +++ b/lnet/selftest/console.c @@ -1475,9 +1475,7 @@ lstcon_ndlist_stat(struct list_head *ndlist, return rc; } - timeout = (timeout > LST_TRANS_MIN_TIMEOUT) ? timeout : - LST_TRANS_MIN_TIMEOUT; - lstcon_rpc_trans_postwait(trans, timeout); + lstcon_rpc_trans_postwait(trans, LST_VALIDATE_TIMEOUT(timeout)); rc = lstcon_rpc_trans_interpreter(trans, result_up, lstcon_statrpc_readent); @@ -1564,10 +1562,7 @@ lstcon_debug_ndlist(struct list_head *ndlist, return rc; } - timeout = (timeout > LST_TRANS_MIN_TIMEOUT) ? timeout : - LST_TRANS_MIN_TIMEOUT; - - lstcon_rpc_trans_postwait(trans, timeout); + lstcon_rpc_trans_postwait(trans, LST_VALIDATE_TIMEOUT(timeout)); rc = lstcon_rpc_trans_interpreter(trans, result_up, lstcon_sesrpc_readent); @@ -2000,10 +1995,10 @@ lstcon_console_fini(void) { int i; - mutex_down(&console_session.ses_mutex); - libcfs_deregister_ioctl(&lstcon_ioctl_handler); + mutex_down(&console_session.ses_mutex); + srpc_shutdown_service(&lstcon_acceptor_service); srpc_remove_service(&lstcon_acceptor_service); diff --git a/lnet/selftest/framework.c b/lnet/selftest/framework.c index 73734a08..63e5089 100644 --- a/lnet/selftest/framework.c +++ b/lnet/selftest/framework.c @@ -53,7 +53,7 @@ static int session_timeout = 100; CFS_MODULE_PARM(session_timeout, "i", int, 0444, "test session timeout in seconds (100 by default, 0 == never)"); -#define SFW_TEST_CONCURRENCY 128 +#define SFW_TEST_CONCURRENCY 1792 #define SFW_TEST_RPC_TIMEOUT 64 #define SFW_CLIENT_RPC_TIMEOUT 64 /* in seconds */ #define SFW_EXTRA_TEST_BUFFERS 8 /* tolerate buggy peers with extra buffers */ @@ -214,6 +214,7 @@ sfw_deactivate_session (void) sfw_session_t *sn = sfw_data.fw_session; int nactive = 0; sfw_batch_t *tsb; + sfw_test_case_t *tsc; if (sn == NULL) return; @@ -223,6 +224,15 @@ sfw_deactivate_session (void) atomic_inc(&sfw_data.fw_nzombies); list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions); + spin_unlock(&sfw_data.fw_lock); + + cfs_list_for_each_entry_typed (tsc, &sfw_data.fw_tests, + sfw_test_case_t, tsc_list) { + srpc_abort_service(tsc->tsc_srv_service); + } + + spin_lock(&sfw_data.fw_lock); + cfs_list_for_each_entry_typed (tsb, &sn->sn_batches, sfw_batch_t, bat_list) { if (sfw_batch_active(tsb)) { diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index 942ff26..49f483f 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -43,8 +43,6 @@ #include "selftest.h" -#define SRPC_PEER_HASH_SIZE 101 /* # peer lists */ - typedef enum { SRPC_STATE_NONE, SRPC_STATE_NI_INIT, @@ -57,17 +55,12 @@ typedef enum { struct smoketest_rpc { spinlock_t rpc_glock; /* global lock */ srpc_service_t *rpc_services[SRPC_SERVICE_MAX_ID + 1]; - struct list_head *rpc_peers; /* hash table of known peers */ lnet_handle_eq_t rpc_lnet_eq; /* _the_ LNet event queue */ srpc_state_t rpc_state; srpc_counters_t rpc_counters; __u64 rpc_matchbits; /* matchbits counter */ } srpc_data; -static int srpc_peer_credits = 16; -CFS_MODULE_PARM(srpc_peer_credits, "i", int, 0444, - "# in-flight RPCs per peer (16 by default)"); - /* forward ref's */ int srpc_handle_rpc (swi_workitem_t *wi); @@ -181,92 +174,6 @@ srpc_alloc_bulk (int npages, int sink) return bk; } - -static inline struct list_head * -srpc_nid2peerlist (lnet_nid_t nid) -{ - unsigned int hash = ((unsigned int)nid) % SRPC_PEER_HASH_SIZE; - - return &srpc_data.rpc_peers[hash]; -} - -static inline srpc_peer_t * -srpc_create_peer (lnet_nid_t nid) -{ - srpc_peer_t *peer; - - LASSERT (nid != LNET_NID_ANY); - - LIBCFS_ALLOC(peer, sizeof(srpc_peer_t)); - if (peer == NULL) { - CERROR ("Failed to allocate peer structure for %s\n", - libcfs_nid2str(nid)); - return NULL; - } - - memset(peer, 0, sizeof(srpc_peer_t)); - peer->stp_nid = nid; - peer->stp_credits = srpc_peer_credits; - - spin_lock_init(&peer->stp_lock); - CFS_INIT_LIST_HEAD(&peer->stp_rpcq); - CFS_INIT_LIST_HEAD(&peer->stp_ctl_rpcq); - return peer; -} - -srpc_peer_t * -srpc_find_peer_locked (lnet_nid_t nid) -{ - struct list_head *peer_list = srpc_nid2peerlist(nid); - srpc_peer_t *peer; - - LASSERT (nid != LNET_NID_ANY); - - cfs_list_for_each_entry_typed (peer, peer_list, - srpc_peer_t, stp_list) { - if (peer->stp_nid == nid) - return peer; - } - - return NULL; -} - -static srpc_peer_t * -srpc_nid2peer (lnet_nid_t nid) -{ - srpc_peer_t *peer; - srpc_peer_t *new_peer; - - spin_lock(&srpc_data.rpc_glock); - peer = srpc_find_peer_locked(nid); - spin_unlock(&srpc_data.rpc_glock); - - if (peer != NULL) - return peer; - - new_peer = srpc_create_peer(nid); - - spin_lock(&srpc_data.rpc_glock); - - peer = srpc_find_peer_locked(nid); - if (peer != NULL) { - spin_unlock(&srpc_data.rpc_glock); - if (new_peer != NULL) - LIBCFS_FREE(new_peer, sizeof(srpc_peer_t)); - - return peer; - } - - if (new_peer == NULL) { - spin_unlock(&srpc_data.rpc_glock); - return NULL; - } - - list_add_tail(&new_peer->stp_list, srpc_nid2peerlist(nid)); - spin_unlock(&srpc_data.rpc_glock); - return new_peer; -} - static inline __u64 srpc_next_id (void) { @@ -609,10 +516,10 @@ srpc_finish_service (srpc_service_t *sv) rpc = list_entry(sv->sv_active_rpcq.next, srpc_server_rpc_t, srpc_list); CDEBUG (D_NETERROR, - "Active RPC on shutdown: sv %s, peer %s, " + "Active RPC %p on shutdown: sv %s, peer %s, " "wi %s scheduled %d running %d, " "ev fired %d type %d status %d lnet %d\n", - sv->sv_name, libcfs_id2str(rpc->srpc_peer), + rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), swi_state2str(rpc->srpc_wi.wi_state), rpc->srpc_wi.wi_scheduled, rpc->srpc_wi.wi_running, @@ -673,6 +580,42 @@ free: spin_lock(&sv->sv_lock); } +/* called with srpc_service_t::sv_lock held */ +inline void +srpc_schedule_server_rpc (srpc_server_rpc_t *rpc) +{ + srpc_service_t *sv = rpc->srpc_service; + + if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) + swi_schedule_workitem(&rpc->srpc_wi); + else /* framework RPCs are handled one by one */ + swi_schedule_serial_workitem(&rpc->srpc_wi); + + return; +} + +void +srpc_abort_service (srpc_service_t *sv) +{ + srpc_server_rpc_t *rpc; + + spin_lock(&sv->sv_lock); + + CDEBUG(D_NET, "Aborting service: id %d, name %s\n", + sv->sv_id, sv->sv_name); + + /* schedule in-flight RPCs to notice the abort, NB: + * racing with incoming RPCs; complete fix should make test + * RPCs carry session ID in its headers */ + list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) { + rpc->srpc_aborted = 1; + srpc_schedule_server_rpc(rpc); + } + + spin_unlock(&sv->sv_lock); + return; +} + void srpc_shutdown_service (srpc_service_t *sv) { @@ -681,15 +624,15 @@ srpc_shutdown_service (srpc_service_t *sv) spin_lock(&sv->sv_lock); - CDEBUG (D_NET, "Shutting down service: id %d, name %s\n", - sv->sv_id, sv->sv_name); + CDEBUG(D_NET, "Shutting down service: id %d, name %s\n", + sv->sv_id, sv->sv_name); sv->sv_shuttingdown = 1; /* i.e. no new active RPC */ /* schedule in-flight RPCs to notice the shutdown */ cfs_list_for_each_entry_typed (rpc, &sv->sv_active_rpcq, srpc_server_rpc_t, srpc_list) { - swi_schedule_workitem(&rpc->srpc_wi); + srpc_schedule_server_rpc(rpc); } spin_unlock(&sv->sv_lock); @@ -814,20 +757,6 @@ srpc_do_bulk (srpc_server_rpc_t *rpc) return rc; } -/* called with srpc_service_t::sv_lock held */ -inline void -srpc_schedule_server_rpc (srpc_server_rpc_t *rpc) -{ - srpc_service_t *sv = rpc->srpc_service; - - if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) - swi_schedule_workitem(&rpc->srpc_wi); - else /* framework RPCs are handled one by one */ - swi_schedule_serial_workitem(&rpc->srpc_wi); - - return; -} - /* only called from srpc_handle_rpc */ void srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status) @@ -840,8 +769,8 @@ srpc_server_rpc_done (srpc_server_rpc_t *rpc, int status) rpc->srpc_status = status; CDEBUG (status == 0 ? D_NET : D_NETERROR, - "Server RPC done: service %s, peer %s, status %s:%d\n", - sv->sv_name, libcfs_id2str(rpc->srpc_peer), + "Server RPC %p done: service %s, peer %s, status %s:%d\n", + rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), swi_state2str(rpc->srpc_wi.wi_state), status); if (status != 0) { @@ -903,7 +832,7 @@ srpc_handle_rpc (swi_workitem_t *wi) spin_lock(&sv->sv_lock); - if (sv->sv_shuttingdown) { + if (sv->sv_shuttingdown || rpc->srpc_aborted) { spin_unlock(&sv->sv_lock); if (rpc->srpc_bulk != NULL) @@ -961,8 +890,7 @@ srpc_handle_rpc (swi_workitem_t *wi) } } case SWI_STATE_BULK_STARTED: - /* we cannot LASSERT ev_fired right here because it - * may be set only upon an event with unlinked==1 */ + LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired); if (rpc->srpc_bulk != NULL) { rc = ev->ev_status; @@ -971,20 +899,11 @@ srpc_handle_rpc (swi_workitem_t *wi) rc = (*sv->sv_bulk_ready) (rpc, rc); if (rc != 0) { - if (ev->ev_fired) { - srpc_server_rpc_done(rpc, rc); - return 1; - } - - rpc->srpc_status = rc; - wi->wi_state = SWI_STATE_BULK_ERRORED; - LNetMDUnlink(rpc->srpc_bulk->bk_mdh); - return 0; /* wait for UNLINK event */ + srpc_server_rpc_done(rpc, rc); + return 1; } } - LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired); - wi->wi_state = SWI_STATE_REPLY_SUBMITTED; rc = srpc_send_reply(rpc); if (rc == 0) @@ -993,18 +912,17 @@ srpc_handle_rpc (swi_workitem_t *wi) return 1; case SWI_STATE_REPLY_SUBMITTED: - LASSERT (ev->ev_fired); + if (!ev->ev_fired) { + CERROR("RPC %p: bulk %p, service %d\n", + rpc, rpc->srpc_bulk, rpc->srpc_service->sv_id); + CERROR("Event: status %d, type %d, lnet %d\n", + ev->ev_status, ev->ev_type, ev->ev_lnet); + LASSERT (ev->ev_fired); + } wi->wi_state = SWI_STATE_DONE; srpc_server_rpc_done(rpc, ev->ev_status); return 1; - - case SWI_STATE_BULK_ERRORED: - LASSERT (rpc->srpc_bulk != NULL && ev->ev_fired); - LASSERT (rpc->srpc_status != 0); - - srpc_server_rpc_done(rpc, rpc->srpc_status); - return 1; } return 0; @@ -1078,43 +996,9 @@ srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc) } void -srpc_check_sends (srpc_peer_t *peer, int credits) -{ - struct list_head *q; - srpc_client_rpc_t *rpc; - - LASSERT (credits >= 0); - LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); - - spin_lock(&peer->stp_lock); - peer->stp_credits += credits; - - while (peer->stp_credits) { - if (!list_empty(&peer->stp_ctl_rpcq)) - q = &peer->stp_ctl_rpcq; - else if (!list_empty(&peer->stp_rpcq)) - q = &peer->stp_rpcq; - else - break; - - peer->stp_credits--; - - rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl); - list_del_init(&rpc->crpc_privl); - srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */ - - swi_schedule_workitem(&rpc->crpc_wi); - } - - spin_unlock(&peer->stp_lock); - return; -} - -void srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status) { swi_workitem_t *wi = &rpc->crpc_wi; - srpc_peer_t *peer = rpc->crpc_peer; LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE); @@ -1145,9 +1029,6 @@ srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status) spin_unlock(&rpc->crpc_lock); (*rpc->crpc_done) (rpc); - - if (peer != NULL) - srpc_check_sends(peer, 1); return; } @@ -1290,38 +1171,9 @@ srpc_create_client_rpc (lnet_process_id_t peer, int service, } /* called with rpc->crpc_lock held */ -static inline void -srpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc) -{ - int service = rpc->crpc_service; - - LASSERT (peer->stp_nid == rpc->crpc_dest.nid); - LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); - - rpc->crpc_peer = peer; - - spin_lock(&peer->stp_lock); - - /* Framework RPCs that alter session state shall take precedence - * over test RPCs and framework query RPCs */ - if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID && - service != SRPC_SERVICE_DEBUG && - service != SRPC_SERVICE_QUERY_STAT) - list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq); - else - list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq); - - srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */ - spin_unlock(&peer->stp_lock); - return; -} - -/* called with rpc->crpc_lock held */ void srpc_abort_rpc (srpc_client_rpc_t *rpc, int why) { - srpc_peer_t *peer = rpc->crpc_peer; - LASSERT (why != 0); if (rpc->crpc_aborted || /* already aborted */ @@ -1335,19 +1187,6 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why) rpc->crpc_aborted = 1; rpc->crpc_status = why; - - if (peer != NULL) { - spin_lock(&peer->stp_lock); - - if (!list_empty(&rpc->crpc_privl)) { /* still queued */ - list_del_init(&rpc->crpc_privl); - srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */ - rpc->crpc_peer = NULL; /* no credit taken */ - } - - spin_unlock(&peer->stp_lock); - } - swi_schedule_workitem(&rpc->crpc_wi); return; } @@ -1356,10 +1195,7 @@ srpc_abort_rpc (srpc_client_rpc_t *rpc, int why) void srpc_post_rpc (srpc_client_rpc_t *rpc) { - srpc_peer_t *peer; - LASSERT (!rpc->crpc_aborted); - LASSERT (rpc->crpc_peer == NULL); LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0); @@ -1368,18 +1204,7 @@ srpc_post_rpc (srpc_client_rpc_t *rpc) rpc->crpc_timeout); srpc_add_client_rpc_timer(rpc); - - peer = srpc_nid2peer(rpc->crpc_dest.nid); - if (peer == NULL) { - srpc_abort_rpc(rpc, -ENOMEM); - return; - } - - srpc_queue_rpc(peer, rpc); - - spin_unlock(&rpc->crpc_lock); - srpc_check_sends(peer, 0); - spin_lock(&rpc->crpc_lock); + swi_schedule_workitem(&rpc->crpc_wi); return; } @@ -1438,7 +1263,6 @@ srpc_lnet_ev_handler (lnet_event_t *ev) srpc_service_t *sv; srpc_msg_t *msg; srpc_msg_type_t type; - int fired_flag = 1; LASSERT (!in_interrupt()); @@ -1452,6 +1276,8 @@ srpc_lnet_ev_handler (lnet_event_t *ev) switch (rpcev->ev_type) { default: + CERROR("Unknown event: status %d, type %d, lnet %d\n", + rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet); LBUG (); case SRPC_REQUEST_SENT: if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { @@ -1463,9 +1289,16 @@ srpc_lnet_ev_handler (lnet_event_t *ev) case SRPC_BULK_REQ_RCVD: crpc = rpcev->ev_data; - LASSERT (rpcev == &crpc->crpc_reqstev || - rpcev == &crpc->crpc_replyev || - rpcev == &crpc->crpc_bulkev); + if (rpcev != &crpc->crpc_reqstev && + rpcev != &crpc->crpc_replyev && + rpcev != &crpc->crpc_bulkev) { + CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n", + rpcev, crpc, &crpc->crpc_reqstev, + &crpc->crpc_replyev, &crpc->crpc_bulkev); + CERROR("Bad event: status %d, type %d, lnet %d\n", + rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet); + LBUG (); + } spin_lock(&crpc->crpc_lock); @@ -1550,13 +1383,9 @@ srpc_lnet_ev_handler (lnet_event_t *ev) ev->type == LNET_EVENT_REPLY || ev->type == LNET_EVENT_UNLINK); - if (ev->type == LNET_EVENT_SEND && !ev->unlinked) { - if (ev->status == 0) - break; /* wait for the final LNET_EVENT_REPLY */ - else - fired_flag = 0; /* LNET_EVENT_REPLY may arrive - (optimized GET case) */ - } + if (!ev->unlinked) + break; /* wait for final event */ + case SRPC_BULK_PUT_SENT: if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { spin_lock(&srpc_data.rpc_glock); @@ -1575,13 +1404,12 @@ srpc_lnet_ev_handler (lnet_event_t *ev) LASSERT (rpcev == &srpc->srpc_ev); spin_lock(&sv->sv_lock); - if (fired_flag) - rpcev->ev_fired = 1; + rpcev->ev_fired = 1; rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? -EINTR : ev->status; - srpc_schedule_server_rpc(srpc); + spin_unlock(&sv->sv_lock); break; } @@ -1619,21 +1447,8 @@ srpc_check_event (int timeout) int srpc_startup (void) { - int i; int rc; -#ifndef __KERNEL__ - char *s; - - s = getenv("SRPC_PEER_CREDITS"); - srpc_peer_credits = (s != NULL) ? atoi(s) : srpc_peer_credits; -#endif - - if (srpc_peer_credits <= 0) { - CERROR("Peer credits must be positive: %d\n", srpc_peer_credits); - return -EINVAL; - } - memset(&srpc_data, 0, sizeof(struct smoketest_rpc)); spin_lock_init(&srpc_data.rpc_glock); @@ -1643,16 +1458,6 @@ srpc_startup (void) srpc_data.rpc_state = SRPC_STATE_NONE; - LIBCFS_ALLOC(srpc_data.rpc_peers, - sizeof(struct list_head) * SRPC_PEER_HASH_SIZE); - if (srpc_data.rpc_peers == NULL) { - CERROR ("Failed to alloc peer hash.\n"); - return -ENOMEM; - } - - for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) - CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]); - #ifdef __KERNEL__ rc = LNetNIInit(LUSTRE_SRV_LNET_PID); #else @@ -1663,8 +1468,6 @@ srpc_startup (void) #endif if (rc < 0) { CERROR ("LNetNIInit() has failed: %d\n", rc); - LIBCFS_FREE(srpc_data.rpc_peers, - sizeof(struct list_head) * SRPC_PEER_HASH_SIZE); return rc; } @@ -1745,24 +1548,5 @@ srpc_shutdown (void) break; } - /* srpc_peer_t's are kept in hash until shutdown */ - for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) { - srpc_peer_t *peer; - - while (!list_empty(&srpc_data.rpc_peers[i])) { - peer = list_entry(srpc_data.rpc_peers[i].next, - srpc_peer_t, stp_list); - list_del(&peer->stp_list); - - LASSERT (list_empty(&peer->stp_rpcq)); - LASSERT (list_empty(&peer->stp_ctl_rpcq)); - LASSERT (peer->stp_credits == srpc_peer_credits); - - LIBCFS_FREE(peer, sizeof(srpc_peer_t)); - } - } - - LIBCFS_FREE(srpc_data.rpc_peers, - sizeof(struct list_head) * SRPC_PEER_HASH_SIZE); return; } diff --git a/lnet/selftest/selftest.h b/lnet/selftest/selftest.h index edd67b1..7312eac 100644 --- a/lnet/selftest/selftest.h +++ b/lnet/selftest/selftest.h @@ -72,7 +72,6 @@ #define SWI_STATE_REQUEST_SENT 4 #define SWI_STATE_REPLY_RECEIVED 5 #define SWI_STATE_BULK_STARTED 6 -#define SWI_STATE_BULK_ERRORED 7 #define SWI_STATE_DONE 10 /* forward refs */ @@ -192,13 +191,13 @@ srpc_service2reply (int service) } typedef enum { - SRPC_BULK_REQ_RCVD = 0, /* passive bulk request(PUT sink/GET source) received */ - SRPC_BULK_PUT_SENT = 1, /* active bulk PUT sent (source) */ - SRPC_BULK_GET_RPLD = 2, /* active bulk GET replied (sink) */ - SRPC_REPLY_RCVD = 3, /* incoming reply received */ - SRPC_REPLY_SENT = 4, /* outgoing reply sent */ - SRPC_REQUEST_RCVD = 5, /* incoming request received */ - SRPC_REQUEST_SENT = 6, /* outgoing request sent */ + SRPC_BULK_REQ_RCVD = 1, /* passive bulk request(PUT sink/GET source) received */ + SRPC_BULK_PUT_SENT = 2, /* active bulk PUT sent (source) */ + SRPC_BULK_GET_RPLD = 3, /* active bulk GET replied (sink) */ + SRPC_REPLY_RCVD = 4, /* incoming reply received */ + SRPC_REPLY_SENT = 5, /* outgoing reply sent */ + SRPC_REQUEST_RCVD = 6, /* incoming request received */ + SRPC_REQUEST_SENT = 7, /* outgoing request sent */ } srpc_event_type_t; /* RPC event */ @@ -223,15 +222,6 @@ typedef struct { #endif } srpc_bulk_t; /* bulk descriptor */ -typedef struct srpc_peer { - struct list_head stp_list; /* chain on peer hash */ - struct list_head stp_rpcq; /* q of non-control RPCs */ - struct list_head stp_ctl_rpcq; /* q of control RPCs */ - spinlock_t stp_lock; /* serialize */ - lnet_nid_t stp_nid; - int stp_credits; /* available credits */ -} srpc_peer_t; - /* message buffer descriptor */ typedef struct { struct list_head buf_list; /* chain on srpc_service::*_msgq */ @@ -254,6 +244,7 @@ typedef struct srpc_server_rpc { srpc_buffer_t *srpc_reqstbuf; srpc_bulk_t *srpc_bulk; + unsigned int srpc_aborted; /* being given up */ int srpc_status; void (*srpc_done)(struct srpc_server_rpc *); } srpc_server_rpc_t; @@ -261,7 +252,6 @@ typedef struct srpc_server_rpc { /* client-side state of a RPC */ typedef struct srpc_client_rpc { struct list_head crpc_list; /* chain on user's lists */ - struct list_head crpc_privl; /* chain on srpc_peer_t::*rpcq */ spinlock_t crpc_lock; /* serialize */ int crpc_service; atomic_t crpc_refcount; @@ -269,7 +259,6 @@ typedef struct srpc_client_rpc { stt_timer_t crpc_timer; swi_workitem_t crpc_wi; lnet_process_id_t crpc_dest; - srpc_peer_t *crpc_peer; void (*crpc_done)(struct srpc_client_rpc *); void (*crpc_fini)(struct srpc_client_rpc *); @@ -342,7 +331,7 @@ typedef struct srpc_service { int (*sv_bulk_ready) (srpc_server_rpc_t *, int); } srpc_service_t; -#define SFW_POST_BUFFERS 8 +#define SFW_POST_BUFFERS 256 #define SFW_SERVICE_CONCURRENCY (SFW_POST_BUFFERS/2) typedef struct { @@ -456,6 +445,7 @@ int srpc_send_reply(srpc_server_rpc_t *rpc); int srpc_add_service(srpc_service_t *sv); int srpc_remove_service(srpc_service_t *sv); void srpc_shutdown_service(srpc_service_t *sv); +void srpc_abort_service(srpc_service_t *sv); int srpc_finish_service(srpc_service_t *sv); int srpc_service_add_buffers(srpc_service_t *sv, int nbuffer); void srpc_service_remove_buffers(srpc_service_t *sv, int nbuffer); @@ -477,7 +467,6 @@ srpc_destroy_client_rpc (srpc_client_rpc_t *rpc) { LASSERT (rpc != NULL); LASSERT (!srpc_event_pending(rpc)); - LASSERT (list_empty(&rpc->crpc_privl)); LASSERT (atomic_read(&rpc->crpc_refcount) == 0); #ifndef __KERNEL__ LASSERT (rpc->crpc_bulk.bk_pages == NULL); @@ -504,7 +493,6 @@ srpc_init_client_rpc (srpc_client_rpc_t *rpc, lnet_process_id_t peer, crpc_bulk.bk_iovs[nbulkiov])); CFS_INIT_LIST_HEAD(&rpc->crpc_list); - CFS_INIT_LIST_HEAD(&rpc->crpc_privl); swi_init_workitem(&rpc->crpc_wi, rpc, srpc_send_rpc); spin_lock_init(&rpc->crpc_lock); atomic_set(&rpc->crpc_refcount, 1); /* 1 ref for caller */ @@ -545,7 +533,6 @@ swi_state2str (int state) STATE2STR(SWI_STATE_REQUEST_SENT); STATE2STR(SWI_STATE_REPLY_RECEIVED); STATE2STR(SWI_STATE_BULK_STARTED); - STATE2STR(SWI_STATE_BULK_ERRORED); STATE2STR(SWI_STATE_DONE); } #undef STATE2STR diff --git a/lnet/selftest/workitem.c b/lnet/selftest/workitem.c index 960d00f..c638188 100644 --- a/lnet/selftest/workitem.c +++ b/lnet/selftest/workitem.c @@ -218,14 +218,15 @@ swi_serial_scheduler_main (void *arg) int rc; swi_workitem_t *wi; - while (!list_empty(&swi_data.wi_serial_runq) && + while (!list_empty(&swi_data.wi_serial_runq) && nloops < SWI_RESCHED) { wi = list_entry(swi_data.wi_serial_runq.next, swi_workitem_t, wi_list); list_del_init(&wi->wi_list); - LASSERT (!wi->wi_running); - LASSERT (wi->wi_scheduled); + LASSERTF (!wi->wi_running && wi->wi_scheduled, + "wi %p running %d scheduled %d\n", + wi, wi->wi_running, wi->wi_scheduled); nloops++; wi->wi_running = 1; @@ -243,7 +244,7 @@ swi_serial_scheduler_main (void *arg) if (nloops < SWI_RESCHED) cfs_wait_event_interruptible_exclusive( - swi_data.wi_serial_waitq, + swi_data.wi_serial_waitq, !swi_sched_cansleep(&swi_data.wi_serial_runq), rc); else our_cond_resched(); -- 1.8.3.1