/* fall through */
case KQN_INIT_DATA:
- LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
break;
case KQN_INIT_NOTHING:
if (kqswnal_data.kqn_eptx != NULL)
ep_free_xmtr (kqswnal_data.kqn_eptx);
+
+ /* freeing the xmtr completes all txs pdq */
+ LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
#else
if (kqswnal_data.kqn_eprx_small != NULL)
ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
if (kqswnal_data.kqn_eprx_large != NULL)
ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
+ /* wait for transmits to complete */
+ while (!list_empty(&kqswnal_data.kqn_activetxds)) {
+ CWARN("waiting for active transmits to complete\n");
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ schedule_timeout(HZ);
+ }
+
if (kqswnal_data.kqn_eptx != NULL)
ep_free_large_xmtr (kqswnal_data.kqn_eptx);
#endif
/* fall through */
case KQN_INIT_DATA:
- LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
break;
case KQN_INIT_NOTHING:
if (kqswnal_data.kqn_eptx != NULL)
ep_free_xmtr (kqswnal_data.kqn_eptx);
+
+ /* freeing the xmtr completes all txs pdq */
+ LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
#else
if (kqswnal_data.kqn_eprx_small != NULL)
ep_remove_large_rcvr (kqswnal_data.kqn_eprx_small);
if (kqswnal_data.kqn_eprx_large != NULL)
ep_remove_large_rcvr (kqswnal_data.kqn_eprx_large);
+ /* wait for transmits to complete */
+ while (!list_empty(&kqswnal_data.kqn_activetxds)) {
+ CWARN("waiting for active transmits to complete\n");
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ schedule_timeout(HZ);
+ }
+
if (kqswnal_data.kqn_eptx != NULL)
ep_free_large_xmtr (kqswnal_data.kqn_eptx);
#endif
void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
{
- PtlEQFree(pni->pni_eq_h);
- kportal_put_ni (pni->pni_number);
+ wait_queue_head_t waitq;
+ struct l_wait_info lwi;
+ int rc;
+ int retries;
+
+ /* Wait for the event queue to become idle since there may still be
+ * messages in flight with pending events (i.e. the fire-and-forget
+ * messages == client requests and "non-difficult" server
+ * replies */
+
+ for (retries = 0;; retries++) {
+ rc = PtlEQFree(pni->pni_eq_h);
+ switch (rc) {
+ default:
+ LBUG();
+
+ case PTL_OK:
+ kportal_put_ni (pni->pni_number);
+ return;
+
+ case PTL_EQ_INUSE:
+ if (retries != 0)
+ CWARN("Event queue for %s still busy\n",
+ pni->pni_name);
+
+ /* Wait for a bit */
+ init_waitqueue_head(&waitq);
+ lwi = LWI_TIMEOUT(2*HZ, NULL, NULL);
+ l_wait_event(waitq, 0, &lwi);
+ break;
+ }
+ }
+ /* notreached */
}
int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)