lnet_init_locks(void)
{
spin_lock_init(&the_lnet.ln_eq_wait_lock);
+ spin_lock_init(&the_lnet.ln_msg_resend_lock);
init_waitqueue_head(&the_lnet.ln_eq_waitq);
init_waitqueue_head(&the_lnet.ln_rc_waitq);
mutex_init(&the_lnet.ln_lnd_mutex);
lnet_shutdown_lndnets(void)
{
struct lnet_net *net;
+ struct list_head resend;
+ struct lnet_msg *msg, *tmp;
+
+ INIT_LIST_HEAD(&resend);
/* NB called holding the global mutex */
lnet_shutdown_lndnet(net);
}
+ spin_lock(&the_lnet.ln_msg_resend_lock);
+ list_splice(&the_lnet.ln_msg_resend, &resend);
+ spin_unlock(&the_lnet.ln_msg_resend_lock);
+
+ list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
+ list_del_init(&msg->msg_list);
+ lnet_finalize(msg, -ECANCELED);
+ }
+
lnet_net_lock(LNET_LOCK_EX);
the_lnet.ln_state = LNET_STATE_SHUTDOWN;
lnet_net_unlock(LNET_LOCK_EX);
INIT_LIST_HEAD(&the_lnet.ln_lnds);
INIT_LIST_HEAD(&the_lnet.ln_net_zombie);
INIT_LIST_HEAD(&the_lnet.ln_rcd_zombie);
+ INIT_LIST_HEAD(&the_lnet.ln_msg_resend);
INIT_LIST_HEAD(&the_lnet.ln_rcd_deathrow);
/* The hash table size is the number of bits it takes to express the set
if (lp->lp_data)
lnet_ping_buffer_decref(lp->lp_data);
+ /*
+ * if there are messages still on the pending queue, then make
+ * sure to queue them on the ln_msg_resend list so they can be
+ * resent at a later point if the discovery thread is still
+ * running.
+ * If the discovery thread has stopped, then the wakeup will be a
+ * no-op, and it is expected the lnet_shutdown_lndnets() will
+ * eventually be called, which will traverse this list and
+ * finalize the messages on the list.
+ * We can not resend them now because we're holding the cpt lock.
+ * Releasing the lock can cause an inconsistent state
+ */
+ spin_lock(&the_lnet.ln_msg_resend_lock);
+ list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
+ spin_unlock(&the_lnet.ln_msg_resend_lock);
+ wake_up(&the_lnet.ln_dc_waitq);
+
LIBCFS_FREE(lp, sizeof(*lp));
}
void
lnet_peer_tables_cleanup(struct lnet_net *net)
{
- int i;
- struct lnet_peer_table *ptable;
+ int i;
+ struct lnet_peer_table *ptable;
LASSERT(the_lnet.ln_state != LNET_STATE_SHUTDOWN || net != NULL);
/* If just deleting the peers for a NI, get rid of any routes these
LASSERT(lpni->lpni_rtr_refcount == 0);
LASSERT(list_empty(&lpni->lpni_txq));
LASSERT(lpni->lpni_txqnob == 0);
+ LASSERT(list_empty(&lpni->lpni_peer_nis));
+ LASSERT(list_empty(&lpni->lpni_on_remote_peer_ni_list));
lpn = lpni->lpni_peer_net;
lpni->lpni_peer_net = NULL;
*/
static void lnet_peer_discovery_complete(struct lnet_peer *lp)
{
- struct lnet_msg *msg = NULL;
+ struct lnet_msg *msg, *tmp;
int rc = 0;
struct list_head pending_msgs;
lnet_net_unlock(LNET_LOCK_EX);
/* iterate through all pending messages and send them again */
- list_for_each_entry(msg, &pending_msgs, msg_list) {
+ list_for_each_entry_safe(msg, tmp, &pending_msgs, msg_list) {
+ list_del_init(&msg->msg_list);
if (lp->lp_dc_error) {
lnet_finalize(msg, lp->lp_dc_error);
continue;
break;
if (!list_empty(&the_lnet.ln_dc_request))
break;
+ if (!list_empty(&the_lnet.ln_msg_resend))
+ break;
if (lnet_peer_dc_timed_out(ktime_get_real_seconds()))
break;
lnet_net_unlock(cpt);
return rc;
}
+/*
+ * Messages that were pending on a destroyed peer will be put on a global
+ * resend list. The message resend list will be checked by
+ * the discovery thread when it wakes up, and will resend messages. These
+ * messages can still be sendable in the case the lpni which was the initial
+ * cause of the message re-queue was transfered to another peer.
+ *
+ * It is possible that LNet could be shutdown while we're iterating
+ * through the list. lnet_shudown_lndnets() will attempt to access the
+ * resend list, but will have to wait until the spinlock is released, by
+ * which time there shouldn't be any more messages on the resend list.
+ * During shutdown lnet_send() will fail and lnet_finalize() will be called
+ * for the messages so they can be released. The other case is that
+ * lnet_shudown_lndnets() can finalize all the messages before this
+ * function can visit the resend list, in which case this function will be
+ * a no-op.
+ */
+static void lnet_resend_msgs(void)
+{
+ struct lnet_msg *msg, *tmp;
+ struct list_head resend;
+ int rc;
+
+ INIT_LIST_HEAD(&resend);
+
+ spin_lock(&the_lnet.ln_msg_resend_lock);
+ list_splice(&the_lnet.ln_msg_resend, &resend);
+ spin_unlock(&the_lnet.ln_msg_resend_lock);
+
+ list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
+ list_del_init(&msg->msg_list);
+ rc = lnet_send(msg->msg_src_nid_param, msg,
+ msg->msg_rtr_nid_param);
+ if (rc < 0) {
+ CNETERR("Error sending %s to %s: %d\n",
+ lnet_msgtyp2str(msg->msg_type),
+ libcfs_id2str(msg->msg_target), rc);
+ lnet_finalize(msg, rc);
+ }
+ }
+}
+
/* The discovery thread. */
static int lnet_peer_discovery(void *arg)
{
if (lnet_peer_discovery_wait_for_work())
break;
+ lnet_resend_msgs();
+
if (lnet_push_target_resize_needed())
lnet_push_target_resize();