{
lstcon_rpc_t *crpc = (lstcon_rpc_t *)rpc->crpc_priv;
- LASSERT (!list_empty(&crpc->crp_link));
LASSERT (crpc != NULL && rpc == crpc->crp_rpc);
LASSERT (crpc->crp_posted && !crpc->crp_finished);
spin_lock(&rpc->crpc_lock);
if (crpc->crp_trans == NULL) {
- /* orphan RPC */
- spin_lock(&console_session.ses_rpc_lock);
-
- /* delete from orphan rpcs list */
- console_session.ses_rpc_pending --;
- list_del_init(&crpc->crp_link);
-
- spin_unlock(&console_session.ses_rpc_lock);
-
+ /* Orphan RPC is not in any transaction,
+ * I'm just a poor body and nobody loves me */
spin_unlock(&rpc->crpc_lock);
/* release it */
crpc->crp_status = rpc->crpc_status;
}
- /* wakeup thread waiting on the group if
- * it's the last rpc in the group */
+ /* wakeup (transaction)thread if I'm the last RPC in the transaction */
if (atomic_dec_and_test(&crpc->crp_trans->tas_remaining))
cfs_waitq_signal(&crpc->crp_trans->tas_waitq);
crpc->crp_static = !cached;
CFS_INIT_LIST_HEAD(&crpc->crp_link);
+ atomic_inc(&console_session.ses_rpc_counter);
+
return 0;
}
if (!list_empty(&console_session.ses_rpc_freelist)) {
crpc = list_entry(console_session.ses_rpc_freelist.next,
lstcon_rpc_t, crp_link);
- list_del(&crpc->crp_link);
+ list_del_init(&crpc->crp_link);
}
spin_unlock(&console_session.ses_rpc_lock);
srpc_client_rpc_decref(crpc->crp_rpc);
if (crpc->crp_static) {
+ /* Static RPC, not allocated */
memset(crpc, 0, sizeof(*crpc));
crpc->crp_static = 1;
- return;
- }
- spin_lock(&console_session.ses_rpc_lock);
+ } else {
+ spin_lock(&console_session.ses_rpc_lock);
- list_add(&crpc->crp_link, &console_session.ses_rpc_freelist);
+ list_add(&crpc->crp_link, &console_session.ses_rpc_freelist);
- spin_unlock(&console_session.ses_rpc_lock);
+ spin_unlock(&console_session.ses_rpc_lock);
+ }
+
+ /* RPC is not alive now */
+ atomic_dec(&console_session.ses_rpc_counter);
}
void
/* rpcs can be still not callbacked (even LNetMDUnlink is called)
* because huge timeout for inaccessible network, don't make
- * user wait for them, just put rpcs in orphan list */
+ * user wait for them, just abandon them, they will be recycled
+ * in callback */
LASSERT (crpc->crp_status != 0);
crpc->crp_node = NULL;
crpc->crp_trans = NULL;
- list_del(&crpc->crp_link);
-
- spin_lock(&console_session.ses_rpc_lock);
-
+ list_del_init(&crpc->crp_link);
count ++;
- /* add to orphan list */
- console_session.ses_rpc_pending ++;
- list_add_tail(&crpc->crp_link, &console_session.ses_rpc_list);
-
- spin_unlock(&console_session.ses_rpc_lock);
spin_unlock(&rpc->crpc_lock);
int
lstcon_rpc_trans_ndlist(struct list_head *ndlist,
- struct list_head *translist, int transop, void *arg,
- lstcon_rpc_cond_func_t condition, lstcon_rpc_trans_t **transpp)
+ struct list_head *translist, int transop,
+ void *arg, lstcon_rpc_cond_func_t condition,
+ lstcon_rpc_trans_t **transpp)
{
lstcon_rpc_trans_t *trans;
lstcon_ndlink_t *ndl;
stt_timer_t *ptimer;
int rc;
- LASSERT (console_session.ses_rpc_pending == 0);
- LASSERT (list_empty(&console_session.ses_rpc_list));
LASSERT (list_empty(&console_session.ses_rpc_freelist));
+ LASSERT (atomic_read(&console_session.ses_rpc_counter) == 0);
rc = lstcon_rpc_trans_prep(NULL, LST_TRANS_SESPING,
&console_session.ses_ping);
struct list_head *pacer;
struct list_head zlist;
+ /* Called with hold of global mutex */
+
LASSERT (console_session.ses_shutdown);
while (!list_empty(&console_session.ses_trans_list)) {
list_for_each(pacer, &console_session.ses_trans_list) {
trans = list_entry(pacer, lstcon_rpc_trans_t, tas_link);
- cfs_waitq_signal(&trans->tas_waitq);
CDEBUG(D_NET, "Session closed, wakeup transaction %s\n",
lstcon_rpc_trans_name(trans->tas_opc));
+
+ cfs_waitq_signal(&trans->tas_waitq);
}
mutex_up(&console_session.ses_mutex);
- CWARN("Session is shutting down, close all transactions\n");
+ CWARN("Session is shutting down, "
+ "waiting for termination of transactions\n");
cfs_pause(cfs_time_seconds(1));
mutex_down(&console_session.ses_mutex);
spin_lock(&console_session.ses_rpc_lock);
- lst_wait_until(list_empty(&console_session.ses_rpc_list),
+ lst_wait_until((atomic_read(&console_session.ses_rpc_counter) == 0),
console_session.ses_rpc_lock,
"Network is not accessable or target is down, "
- "waiting for %d console rpcs to die\n",
- console_session.ses_rpc_pending);
+ "waiting for %d console RPCs to being recycled\n",
+ atomic_read(&console_session.ses_rpc_counter));
list_add(&zlist, &console_session.ses_rpc_freelist);
list_del_init(&console_session.ses_rpc_freelist);
spin_unlock(&console_session.ses_rpc_lock);
- LASSERT (console_session.ses_rpc_pending == 0);
-
while (!list_empty(&zlist)) {
crpc = list_entry(zlist.next, lstcon_rpc_t, crp_link);
console_session.ses_ping_timer.stt_data = &console_session.ses_ping_timer;
console_session.ses_ping = NULL;
- console_session.ses_rpc_pending = 0;
+
spin_lock_init(&console_session.ses_rpc_lock);
- CFS_INIT_LIST_HEAD(&console_session.ses_rpc_list);
+ atomic_set(&console_session.ses_rpc_counter, 0);
CFS_INIT_LIST_HEAD(&console_session.ses_rpc_freelist);
return 0;
void
lstcon_rpc_module_fini(void)
{
- LASSERT (console_session.ses_rpc_pending == 0);
- LASSERT (list_empty(&console_session.ses_rpc_list));
LASSERT (list_empty(&console_session.ses_rpc_freelist));
+ LASSERT (atomic_read(&console_session.ses_rpc_counter) == 0);
}
#endif
typedef int (* lstcon_rpc_cond_func_t)(int, struct lstcon_node *, void *);
typedef int (* lstcon_rpc_readent_func_t)(int, srpc_msg_t *, lstcon_rpc_ent_t *);
-int lstcon_sesrpc_prep(struct lstcon_node *nd,
- int transop, lstcon_rpc_t **crpc);
-int lstcon_dbgrpc_prep(struct lstcon_node *nd, lstcon_rpc_t **crpc);
-int lstcon_batrpc_prep(struct lstcon_node *nd,
- int transop, struct lstcon_tsb_hdr *tsb, lstcon_rpc_t **crpc);
-int lstcon_testrpc_prep(struct lstcon_node *nd,
- int transop, struct lstcon_test *test, lstcon_rpc_t **crpc);
-int lstcon_statrpc_prep(struct lstcon_node *nd, lstcon_rpc_t **crpc);
+int lstcon_sesrpc_prep(struct lstcon_node *nd,
+ int transop, lstcon_rpc_t **crpc);
+int lstcon_dbgrpc_prep(struct lstcon_node *nd, lstcon_rpc_t **crpc);
+int lstcon_batrpc_prep(struct lstcon_node *nd, int transop,
+ struct lstcon_tsb_hdr *tsb, lstcon_rpc_t **crpc);
+int lstcon_testrpc_prep(struct lstcon_node *nd, int transop,
+ struct lstcon_test *test, lstcon_rpc_t **crpc);
+int lstcon_statrpc_prep(struct lstcon_node *nd, lstcon_rpc_t **crpc);
void lstcon_rpc_put(lstcon_rpc_t *crpc);
-int lstcon_rpc_trans_prep(struct list_head *translist,
- int transop, lstcon_rpc_trans_t **transpp);
-int lstcon_rpc_trans_ndlist(struct list_head *ndlist, struct list_head *translist,
- int transop, void *arg, lstcon_rpc_cond_func_t condition,
- lstcon_rpc_trans_t **transpp);
-void lstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans, lstcon_trans_stat_t *stat);
-int lstcon_rpc_trans_interpreter(lstcon_rpc_trans_t *trans, struct list_head *head_up,
- lstcon_rpc_readent_func_t readent);
+int lstcon_rpc_trans_prep(struct list_head *translist,
+ int transop, lstcon_rpc_trans_t **transpp);
+int lstcon_rpc_trans_ndlist(struct list_head *ndlist,
+ struct list_head *translist, int transop,
+ void *arg, lstcon_rpc_cond_func_t condition,
+ lstcon_rpc_trans_t **transpp);
+void lstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans,
+ lstcon_trans_stat_t *stat);
+int lstcon_rpc_trans_interpreter(lstcon_rpc_trans_t *trans,
+ struct list_head *head_up,
+ lstcon_rpc_readent_func_t readent);
void lstcon_rpc_trans_abort(lstcon_rpc_trans_t *trans, int error);
void lstcon_rpc_trans_destroy(lstcon_rpc_trans_t *trans);
void lstcon_rpc_trans_addreq(lstcon_rpc_trans_t *trans, lstcon_rpc_t *req);
-int lstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout);
-int lstcon_rpc_pinger_start(void);
+int lstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout);
+int lstcon_rpc_pinger_start(void);
void lstcon_rpc_pinger_stop(void);
void lstcon_rpc_cleanup_wait(void);
-int lstcon_rpc_module_init(void);
+int lstcon_rpc_module_init(void);
void lstcon_rpc_module_fini(void);
#endif