From ba74663795a8664967eb868b8f64c3e1bc5ae332 Mon Sep 17 00:00:00 2001 From: liangzhen Date: Sat, 22 Sep 2007 03:57:56 +0000 Subject: [PATCH] Branch HEAD Remove useless orphan console RPC list, module unloading can just wait on allocating counter. --- lnet/selftest/conrpc.c | 77 ++++++++++++++++++++++--------------------------- lnet/selftest/conrpc.h | 41 ++++++++++++++------------ lnet/selftest/console.h | 3 +- 3 files changed, 57 insertions(+), 64 deletions(-) diff --git a/lnet/selftest/conrpc.c b/lnet/selftest/conrpc.c index a9ae420..45c8031 100644 --- a/lnet/selftest/conrpc.c +++ b/lnet/selftest/conrpc.c @@ -23,22 +23,14 @@ lstcon_rpc_done(srpc_client_rpc_t *rpc) { lstcon_rpc_t *crpc = (lstcon_rpc_t *)rpc->crpc_priv; - LASSERT (!list_empty(&crpc->crp_link)); LASSERT (crpc != NULL && rpc == crpc->crp_rpc); LASSERT (crpc->crp_posted && !crpc->crp_finished); spin_lock(&rpc->crpc_lock); if (crpc->crp_trans == NULL) { - /* orphan RPC */ - spin_lock(&console_session.ses_rpc_lock); - - /* delete from orphan rpcs list */ - console_session.ses_rpc_pending --; - list_del_init(&crpc->crp_link); - - spin_unlock(&console_session.ses_rpc_lock); - + /* Orphan RPC is not in any transaction, + * I'm just a poor body and nobody loves me */ spin_unlock(&rpc->crpc_lock); /* release it */ @@ -57,8 +49,7 @@ lstcon_rpc_done(srpc_client_rpc_t *rpc) crpc->crp_status = rpc->crpc_status; } - /* wakeup thread waiting on the group if - * it's the last rpc in the group */ + /* wakeup (transaction)thread if I'm the last RPC in the transaction */ if (atomic_dec_and_test(&crpc->crp_trans->tas_remaining)) cfs_waitq_signal(&crpc->crp_trans->tas_waitq); @@ -86,6 +77,8 @@ lstcon_rpc_init(lstcon_node_t *nd, int service, crpc->crp_static = !cached; CFS_INIT_LIST_HEAD(&crpc->crp_link); + atomic_inc(&console_session.ses_rpc_counter); + return 0; } @@ -101,7 +94,7 @@ lstcon_rpc_prep(lstcon_node_t *nd, int service, if (!list_empty(&console_session.ses_rpc_freelist)) { crpc = list_entry(console_session.ses_rpc_freelist.next, lstcon_rpc_t, crp_link); - list_del(&crpc->crp_link); + list_del_init(&crpc->crp_link); } spin_unlock(&console_session.ses_rpc_lock); @@ -141,16 +134,20 @@ lstcon_rpc_put(lstcon_rpc_t *crpc) srpc_client_rpc_decref(crpc->crp_rpc); if (crpc->crp_static) { + /* Static RPC, not allocated */ memset(crpc, 0, sizeof(*crpc)); crpc->crp_static = 1; - return; - } - spin_lock(&console_session.ses_rpc_lock); + } else { + spin_lock(&console_session.ses_rpc_lock); - list_add(&crpc->crp_link, &console_session.ses_rpc_freelist); + list_add(&crpc->crp_link, &console_session.ses_rpc_freelist); - spin_unlock(&console_session.ses_rpc_lock); + spin_unlock(&console_session.ses_rpc_lock); + } + + /* RPC is not alive now */ + atomic_dec(&console_session.ses_rpc_counter); } void @@ -530,22 +527,15 @@ lstcon_rpc_trans_destroy(lstcon_rpc_trans_t *trans) /* rpcs can be still not callbacked (even LNetMDUnlink is called) * because huge timeout for inaccessible network, don't make - * user wait for them, just put rpcs in orphan list */ + * user wait for them, just abandon them, they will be recycled + * in callback */ LASSERT (crpc->crp_status != 0); crpc->crp_node = NULL; crpc->crp_trans = NULL; - list_del(&crpc->crp_link); - - spin_lock(&console_session.ses_rpc_lock); - + list_del_init(&crpc->crp_link); count ++; - /* add to orphan list */ - console_session.ses_rpc_pending ++; - list_add_tail(&crpc->crp_link, &console_session.ses_rpc_list); - - spin_unlock(&console_session.ses_rpc_lock); spin_unlock(&rpc->crpc_lock); @@ -979,8 +969,9 @@ lstcon_rpc_stat_reply(int transop, srpc_msg_t *msg, int lstcon_rpc_trans_ndlist(struct list_head *ndlist, - struct list_head *translist, int transop, void *arg, - lstcon_rpc_cond_func_t condition, lstcon_rpc_trans_t **transpp) + struct list_head *translist, int transop, + void *arg, lstcon_rpc_cond_func_t condition, + lstcon_rpc_trans_t **transpp) { lstcon_rpc_trans_t *trans; lstcon_ndlink_t *ndl; @@ -1181,9 +1172,8 @@ lstcon_rpc_pinger_start(void) stt_timer_t *ptimer; int rc; - LASSERT (console_session.ses_rpc_pending == 0); - LASSERT (list_empty(&console_session.ses_rpc_list)); LASSERT (list_empty(&console_session.ses_rpc_freelist)); + LASSERT (atomic_read(&console_session.ses_rpc_counter) == 0); rc = lstcon_rpc_trans_prep(NULL, LST_TRANS_SESPING, &console_session.ses_ping); @@ -1224,20 +1214,24 @@ lstcon_rpc_cleanup_wait(void) struct list_head *pacer; struct list_head zlist; + /* Called with hold of global mutex */ + LASSERT (console_session.ses_shutdown); while (!list_empty(&console_session.ses_trans_list)) { list_for_each(pacer, &console_session.ses_trans_list) { trans = list_entry(pacer, lstcon_rpc_trans_t, tas_link); - cfs_waitq_signal(&trans->tas_waitq); CDEBUG(D_NET, "Session closed, wakeup transaction %s\n", lstcon_rpc_trans_name(trans->tas_opc)); + + cfs_waitq_signal(&trans->tas_waitq); } mutex_up(&console_session.ses_mutex); - CWARN("Session is shutting down, close all transactions\n"); + CWARN("Session is shutting down, " + "waiting for termination of transactions\n"); cfs_pause(cfs_time_seconds(1)); mutex_down(&console_session.ses_mutex); @@ -1245,19 +1239,17 @@ lstcon_rpc_cleanup_wait(void) spin_lock(&console_session.ses_rpc_lock); - lst_wait_until(list_empty(&console_session.ses_rpc_list), + lst_wait_until((atomic_read(&console_session.ses_rpc_counter) == 0), console_session.ses_rpc_lock, "Network is not accessable or target is down, " - "waiting for %d console rpcs to die\n", - console_session.ses_rpc_pending); + "waiting for %d console RPCs to being recycled\n", + atomic_read(&console_session.ses_rpc_counter)); list_add(&zlist, &console_session.ses_rpc_freelist); list_del_init(&console_session.ses_rpc_freelist); spin_unlock(&console_session.ses_rpc_lock); - LASSERT (console_session.ses_rpc_pending == 0); - while (!list_empty(&zlist)) { crpc = list_entry(zlist.next, lstcon_rpc_t, crp_link); @@ -1274,9 +1266,9 @@ lstcon_rpc_module_init(void) console_session.ses_ping_timer.stt_data = &console_session.ses_ping_timer; console_session.ses_ping = NULL; - console_session.ses_rpc_pending = 0; + spin_lock_init(&console_session.ses_rpc_lock); - CFS_INIT_LIST_HEAD(&console_session.ses_rpc_list); + atomic_set(&console_session.ses_rpc_counter, 0); CFS_INIT_LIST_HEAD(&console_session.ses_rpc_freelist); return 0; @@ -1285,9 +1277,8 @@ lstcon_rpc_module_init(void) void lstcon_rpc_module_fini(void) { - LASSERT (console_session.ses_rpc_pending == 0); - LASSERT (list_empty(&console_session.ses_rpc_list)); LASSERT (list_empty(&console_session.ses_rpc_freelist)); + LASSERT (atomic_read(&console_session.ses_rpc_counter) == 0); } #endif diff --git a/lnet/selftest/conrpc.h b/lnet/selftest/conrpc.h index ba6a21b..cd57511 100644 --- a/lnet/selftest/conrpc.h +++ b/lnet/selftest/conrpc.h @@ -71,31 +71,34 @@ typedef struct lstcon_rpc_trans { typedef int (* lstcon_rpc_cond_func_t)(int, struct lstcon_node *, void *); typedef int (* lstcon_rpc_readent_func_t)(int, srpc_msg_t *, lstcon_rpc_ent_t *); -int lstcon_sesrpc_prep(struct lstcon_node *nd, - int transop, lstcon_rpc_t **crpc); -int lstcon_dbgrpc_prep(struct lstcon_node *nd, lstcon_rpc_t **crpc); -int lstcon_batrpc_prep(struct lstcon_node *nd, - int transop, struct lstcon_tsb_hdr *tsb, lstcon_rpc_t **crpc); -int lstcon_testrpc_prep(struct lstcon_node *nd, - int transop, struct lstcon_test *test, lstcon_rpc_t **crpc); -int lstcon_statrpc_prep(struct lstcon_node *nd, lstcon_rpc_t **crpc); +int lstcon_sesrpc_prep(struct lstcon_node *nd, + int transop, lstcon_rpc_t **crpc); +int lstcon_dbgrpc_prep(struct lstcon_node *nd, lstcon_rpc_t **crpc); +int lstcon_batrpc_prep(struct lstcon_node *nd, int transop, + struct lstcon_tsb_hdr *tsb, lstcon_rpc_t **crpc); +int lstcon_testrpc_prep(struct lstcon_node *nd, int transop, + struct lstcon_test *test, lstcon_rpc_t **crpc); +int lstcon_statrpc_prep(struct lstcon_node *nd, lstcon_rpc_t **crpc); void lstcon_rpc_put(lstcon_rpc_t *crpc); -int lstcon_rpc_trans_prep(struct list_head *translist, - int transop, lstcon_rpc_trans_t **transpp); -int lstcon_rpc_trans_ndlist(struct list_head *ndlist, struct list_head *translist, - int transop, void *arg, lstcon_rpc_cond_func_t condition, - lstcon_rpc_trans_t **transpp); -void lstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans, lstcon_trans_stat_t *stat); -int lstcon_rpc_trans_interpreter(lstcon_rpc_trans_t *trans, struct list_head *head_up, - lstcon_rpc_readent_func_t readent); +int lstcon_rpc_trans_prep(struct list_head *translist, + int transop, lstcon_rpc_trans_t **transpp); +int lstcon_rpc_trans_ndlist(struct list_head *ndlist, + struct list_head *translist, int transop, + void *arg, lstcon_rpc_cond_func_t condition, + lstcon_rpc_trans_t **transpp); +void lstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans, + lstcon_trans_stat_t *stat); +int lstcon_rpc_trans_interpreter(lstcon_rpc_trans_t *trans, + struct list_head *head_up, + lstcon_rpc_readent_func_t readent); void lstcon_rpc_trans_abort(lstcon_rpc_trans_t *trans, int error); void lstcon_rpc_trans_destroy(lstcon_rpc_trans_t *trans); void lstcon_rpc_trans_addreq(lstcon_rpc_trans_t *trans, lstcon_rpc_t *req); -int lstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout); -int lstcon_rpc_pinger_start(void); +int lstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout); +int lstcon_rpc_pinger_start(void); void lstcon_rpc_pinger_stop(void); void lstcon_rpc_cleanup_wait(void); -int lstcon_rpc_module_init(void); +int lstcon_rpc_module_init(void); void lstcon_rpc_module_fini(void); #endif diff --git a/lnet/selftest/console.h b/lnet/selftest/console.h index deb02e4..222f542 100644 --- a/lnet/selftest/console.h +++ b/lnet/selftest/console.h @@ -124,8 +124,7 @@ typedef struct { struct list_head *ses_ndl_hash; /* hash table of nodes */ spinlock_t ses_rpc_lock; /* serialize */ - int ses_rpc_pending; /* number of pending RPCs */ - struct list_head ses_rpc_list; /* list head of RPCs */ + atomic_t ses_rpc_counter;/* # of initialized RPCs */ struct list_head ses_rpc_freelist; /* idle console rpc */ } lstcon_session_t; /*** session descriptor */ -- 1.8.3.1