}
if (txpeer != NULL) {
- /*
- * TODO:
- * Once the patch for the health comes in we need to set
- * the health of the peer ni to bad when we fail to send
- * a message.
- * int status = msg->msg_ev.status;
- * if (status != 0)
- * lnet_set_peer_ni_health_locked(txpeer, false)
- */
msg->msg_txpeer = NULL;
lnet_peer_ni_decref_locked(txpeer);
}
int best_lpni_credits = INT_MIN;
bool preferred = false;
bool ni_is_pref;
+ int best_lpni_healthv = 0;
+ int lpni_healthv;
while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
/*
ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
best_ni->ni_nid);
+ lpni_healthv = atomic_read(&lpni->lpni_healthv);
+
CDEBUG(D_NET, "%s ni_is_pref = %d\n",
libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
lpni->lpni_txcredits, best_lpni_credits,
lpni->lpni_seq, best_lpni->lpni_seq);
+ /* pick the healthiest peer ni */
+ if (lpni_healthv < best_lpni_healthv) {
+ continue;
+ } else if (lpni_healthv > best_lpni_healthv) {
+ best_lpni_healthv = lpni_healthv;
/* if this is a preferred peer use it */
- if (!preferred && ni_is_pref) {
+ } else if (!preferred && ni_is_pref) {
preferred = true;
} else if (preferred && !ni_is_pref) {
/*
return 0;
}
+enum lnet_mt_event_type {
+ MT_TYPE_LOCAL_NI = 0,
+ MT_TYPE_PEER_NI
+};
+
+struct lnet_mt_event_info {
+ enum lnet_mt_event_type mt_type;
+ lnet_nid_t mt_nid;
+};
+
static void
lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt)
{
static void
lnet_recover_local_nis(void)
{
+ struct lnet_mt_event_info *ev_info;
struct list_head processed_list;
struct list_head local_queue;
struct lnet_handle_md mdh;
lnet_ni_unlock(ni);
lnet_net_unlock(0);
- /*
- * protect the ni->ni_state field. Once we call the
- * lnet_send_ping function it's possible we receive
- * a response before we check the rc. The lock ensures
- * a stable value for the ni_state RECOVERY_PENDING bit
- */
+
+ CDEBUG(D_NET, "attempting to recover local ni: %s\n",
+ libcfs_nid2str(ni->ni_nid));
+
lnet_ni_lock(ni);
if (!(ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING)) {
ni->ni_state |= LNET_NI_STATE_RECOVERY_PENDING;
lnet_ni_unlock(ni);
+
+ LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
+ if (!ev_info) {
+ CERROR("out of memory. Can't recover %s\n",
+ libcfs_nid2str(ni->ni_nid));
+ lnet_ni_lock(ni);
+ ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+ lnet_ni_unlock(ni);
+ continue;
+ }
+
mdh = ni->ni_ping_mdh;
/*
* Invalidate the ni mdh in case it's deleted.
lnet_ni_decref_locked(ni, 0);
lnet_net_unlock(0);
- rc = lnet_send_ping(nid, &mdh,
- LNET_INTERFACES_MIN, (void *)nid,
- the_lnet.ln_mt_eqh, true);
+ ev_info->mt_type = MT_TYPE_LOCAL_NI;
+ ev_info->mt_nid = nid;
+ rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+ ev_info, the_lnet.ln_mt_eqh, true);
/* lookup the nid again */
lnet_net_lock(0);
ni = lnet_nid2ni_locked(nid, 0);
}
static void
+lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt)
+{
+ struct lnet_handle_md recovery_mdh;
+
+ LNetInvalidateMDHandle(&recovery_mdh);
+
+ if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) {
+ recovery_mdh = lpni->lpni_recovery_ping_mdh;
+ LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+ }
+ spin_unlock(&lpni->lpni_lock);
+ lnet_net_unlock(cpt);
+ if (!LNetMDHandleIsInvalid(recovery_mdh))
+ LNetMDUnlink(recovery_mdh);
+ lnet_net_lock(cpt);
+ spin_lock(&lpni->lpni_lock);
+}
+
+static void
+lnet_clean_peer_ni_recoveryq(void)
+{
+ struct lnet_peer_ni *lpni, *tmp;
+
+ lnet_net_lock(LNET_LOCK_EX);
+
+ list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_mt_peerNIRecovq,
+ lpni_recovery) {
+ list_del_init(&lpni->lpni_recovery);
+ spin_lock(&lpni->lpni_lock);
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX);
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ }
+
+ lnet_net_unlock(LNET_LOCK_EX);
+}
+
+static void
lnet_clean_resendqs(void)
{
struct lnet_msg *msg, *tmp;
cfs_percpt_free(the_lnet.ln_mt_resendqs);
}
+static void
+lnet_recover_peer_nis(void)
+{
+ struct lnet_mt_event_info *ev_info;
+ struct list_head processed_list;
+ struct list_head local_queue;
+ struct lnet_handle_md mdh;
+ struct lnet_peer_ni *lpni;
+ struct lnet_peer_ni *tmp;
+ lnet_nid_t nid;
+ int healthv;
+ int rc;
+
+ INIT_LIST_HEAD(&local_queue);
+ INIT_LIST_HEAD(&processed_list);
+
+ /*
+ * Always use cpt 0 for locking across all interactions with
+ * ln_mt_peerNIRecovq
+ */
+ lnet_net_lock(0);
+ list_splice_init(&the_lnet.ln_mt_peerNIRecovq,
+ &local_queue);
+ lnet_net_unlock(0);
+
+ list_for_each_entry_safe(lpni, tmp, &local_queue,
+ lpni_recovery) {
+ /*
+ * The same protection strategy is used here as is in the
+ * local recovery case.
+ */
+ lnet_net_lock(0);
+ healthv = atomic_read(&lpni->lpni_healthv);
+ spin_lock(&lpni->lpni_lock);
+ if (lpni->lpni_state & LNET_PEER_NI_DELETING ||
+ healthv == LNET_MAX_HEALTH_VALUE) {
+ list_del_init(&lpni->lpni_recovery);
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, 0);
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+ continue;
+ }
+ spin_unlock(&lpni->lpni_lock);
+ lnet_net_unlock(0);
+
+ /*
+ * NOTE: we're racing with peer deletion from user space.
+ * It's possible that a peer is deleted after we check its
+ * state. In this case the recovery can create a new peer
+ */
+ spin_lock(&lpni->lpni_lock);
+ if (!(lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) &&
+ !(lpni->lpni_state & LNET_PEER_NI_DELETING)) {
+ lpni->lpni_state |= LNET_PEER_NI_RECOVERY_PENDING;
+ spin_unlock(&lpni->lpni_lock);
+
+ LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
+ if (!ev_info) {
+ CERROR("out of memory. Can't recover %s\n",
+ libcfs_nid2str(lpni->lpni_nid));
+ spin_lock(&lpni->lpni_lock);
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+ spin_unlock(&lpni->lpni_lock);
+ continue;
+ }
+
+ /* look at the comments in lnet_recover_local_nis() */
+ mdh = lpni->lpni_recovery_ping_mdh;
+ LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+ nid = lpni->lpni_nid;
+ lnet_net_lock(0);
+ list_del_init(&lpni->lpni_recovery);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+
+ ev_info->mt_type = MT_TYPE_PEER_NI;
+ ev_info->mt_nid = nid;
+ rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+ ev_info, the_lnet.ln_mt_eqh, true);
+ lnet_net_lock(0);
+ /*
+ * lnet_find_peer_ni_locked() grabs a refcount for
+ * us. No need to take it explicitly.
+ */
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (!lpni) {
+ lnet_net_unlock(0);
+ LNetMDUnlink(mdh);
+ continue;
+ }
+
+ lpni->lpni_recovery_ping_mdh = mdh;
+ /*
+ * While we're unlocked the lpni could've been
+ * readded on the recovery queue. In this case we
+ * don't need to add it to the local queue, since
+ * it's already on there and the thread that added
+ * it would've incremented the refcount on the
+ * peer, which means we need to decref the refcount
+ * that was implicitly grabbed by find_peer_ni_locked.
+ * Otherwise, if the lpni is still not on
+ * the recovery queue, then we'll add it to the
+ * processed list.
+ */
+ if (list_empty(&lpni->lpni_recovery))
+ list_add_tail(&lpni->lpni_recovery, &processed_list);
+ else
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+
+ spin_lock(&lpni->lpni_lock);
+ if (rc)
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+ }
+ spin_unlock(&lpni->lpni_lock);
+ }
+
+ list_splice_init(&processed_list, &local_queue);
+ lnet_net_lock(0);
+ list_splice(&local_queue, &the_lnet.ln_mt_peerNIRecovq);
+ lnet_net_unlock(0);
+}
+
static int
lnet_monitor_thread(void *arg)
{
lnet_recover_local_nis();
+ lnet_recover_peer_nis();
+
/*
* TODO do we need to check if we should sleep without
* timeout? Technically, an active system will always
}
static void
+lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
+ int status)
+{
+ lnet_nid_t nid = ev_info->mt_nid;
+
+ if (ev_info->mt_type == MT_TYPE_LOCAL_NI) {
+ struct lnet_ni *ni;
+
+ lnet_net_lock(0);
+ ni = lnet_nid2ni_locked(nid, 0);
+ if (!ni) {
+ lnet_net_unlock(0);
+ return;
+ }
+ lnet_ni_lock(ni);
+ ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+ lnet_ni_unlock(ni);
+ lnet_net_unlock(0);
+
+ if (status != 0) {
+ CERROR("local NI recovery failed with %d\n", status);
+ return;
+ }
+ /*
+ * need to increment healthv for the ni here, because in
+ * the lnet_finalize() path we don't have access to this
+ * NI. And in order to get access to it, we'll need to
+ * carry forward too much information.
+ * In the peer case, it'll naturally be incremented
+ */
+ lnet_inc_healthv(&ni->ni_healthv);
+ } else {
+ struct lnet_peer_ni *lpni;
+ int cpt;
+
+ cpt = lnet_net_lock_current();
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (!lpni) {
+ lnet_net_unlock(cpt);
+ return;
+ }
+ spin_lock(&lpni->lpni_lock);
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(cpt);
+
+ if (status != 0)
+ CERROR("peer NI recovery failed with %d\n", status);
+ }
+}
+
+static void
lnet_mt_event_handler(struct lnet_event *event)
{
- lnet_nid_t nid = (lnet_nid_t) event->md.user_ptr;
- struct lnet_ni *ni;
+ struct lnet_mt_event_info *ev_info = event->md.user_ptr;
struct lnet_ping_buffer *pbuf;
/* TODO: remove assert */
event->status);
switch (event->type) {
+ case LNET_EVENT_UNLINK:
+ CDEBUG(D_NET, "%s recovery ping unlinked\n",
+ libcfs_nid2str(ev_info->mt_nid));
case LNET_EVENT_REPLY:
- /*
- * If the NI has been restored completely then remove from
- * the recovery queue
- */
- lnet_net_lock(0);
- ni = lnet_nid2ni_locked(nid, 0);
- if (!ni) {
- lnet_net_unlock(0);
- break;
- }
- lnet_ni_lock(ni);
- ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
- lnet_ni_unlock(ni);
- lnet_net_unlock(0);
+ lnet_handle_recovery_reply(ev_info, event->status);
break;
case LNET_EVENT_SEND:
CDEBUG(D_NET, "%s recovery message sent %s:%d\n",
- libcfs_nid2str(nid),
+ libcfs_nid2str(ev_info->mt_nid),
(event->status) ? "unsuccessfully" :
"successfully", event->status);
break;
- case LNET_EVENT_UNLINK:
- /* nothing to do */
- CDEBUG(D_NET, "%s recovery ping unlinked\n",
- libcfs_nid2str(nid));
- break;
default:
CERROR("Unexpected event: %d\n", event->type);
- return;
+ break;
}
if (event->unlinked) {
+ LIBCFS_FREE(ev_info, sizeof(*ev_info));
pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
lnet_ping_buffer_decref(pbuf);
}
lnet_router_cleanup();
free_mem:
the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
- lnet_clean_resendqs();
lnet_clean_local_ni_recoveryq();
+ lnet_clean_peer_ni_recoveryq();
+ lnet_clean_resendqs();
LNetEQFree(the_lnet.ln_mt_eqh);
LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
return rc;
clean_queues:
- lnet_clean_resendqs();
lnet_clean_local_ni_recoveryq();
+ lnet_clean_peer_ni_recoveryq();
+ lnet_clean_resendqs();
return rc;
}
/* perform cleanup tasks */
lnet_router_cleanup();
- lnet_clean_resendqs();
lnet_clean_local_ni_recoveryq();
+ lnet_clean_peer_ni_recoveryq();
+ lnet_clean_resendqs();
rc = LNetEQFree(the_lnet.ln_mt_eqh);
LASSERT(rc == 0);
return;
}
}
-static inline void
-lnet_inc_healthv(atomic_t *healthv)
-{
- atomic_add_unless(healthv, 1, LNET_MAX_HEALTH_VALUE);
-}
-
static void
lnet_handle_local_failure(struct lnet_msg *msg)
{
lnet_net_unlock(0);
}
+static void
+lnet_handle_remote_failure(struct lnet_msg *msg)
+{
+ struct lnet_peer_ni *lpni;
+
+ lpni = msg->msg_txpeer;
+
+ /* lpni could be NULL if we're in the LOLND case */
+ if (!lpni)
+ return;
+
+ lnet_net_lock(0);
+ /* the mt could've shutdown and cleaned up the queues */
+ if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
+ lnet_net_unlock(0);
+ return;
+ }
+
+ lnet_dec_healthv_locked(&lpni->lpni_healthv);
+ /*
+ * add the peer NI to the recovery queue if it's not already there
+ * and it's health value is actually below the maximum. It's
+ * possible that the sensitivity might be set to 0, and the health
+ * value will not be reduced. In this case, there is no reason to
+ * invoke recovery
+ */
+ if (list_empty(&lpni->lpni_recovery) &&
+ atomic_read(&lpni->lpni_healthv) < LNET_MAX_HEALTH_VALUE) {
+ CERROR("lpni %s added to recovery queue. Health = %d\n",
+ libcfs_nid2str(lpni->lpni_nid),
+ atomic_read(&lpni->lpni_healthv));
+ list_add_tail(&lpni->lpni_recovery, &the_lnet.ln_mt_peerNIRecovq);
+ lnet_peer_ni_addref_locked(lpni);
+ }
+ lnet_net_unlock(0);
+}
+
/*
* Do a health check on the message:
* return -1 if we're not going to handle the error
lnet_health_check(struct lnet_msg *msg)
{
enum lnet_msg_hstatus hstatus = msg->msg_health_status;
+ bool lo = false;
/* TODO: lnet_incr_hstats(hstatus); */
LASSERT(msg->msg_txni);
+ /*
+ * if we're sending to the LOLND then the msg_txpeer will not be
+ * set. So no need to sanity check it.
+ */
+ if (LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) != LOLND)
+ LASSERT(msg->msg_txpeer);
+ else
+ lo = true;
+
if (hstatus != LNET_MSG_STATUS_OK &&
ktime_compare(ktime_get(), msg->msg_deadline) >= 0)
return -1;
if (the_lnet.ln_state != LNET_STATE_RUNNING)
return -1;
+ CDEBUG(D_NET, "health check: %s->%s: %s: %s\n",
+ libcfs_nid2str(msg->msg_txni->ni_nid),
+ (lo) ? "self" : libcfs_nid2str(msg->msg_txpeer->lpni_nid),
+ lnet_msgtyp2str(msg->msg_type),
+ lnet_health_error2str(hstatus));
+
switch (hstatus) {
case LNET_MSG_STATUS_OK:
lnet_inc_healthv(&msg->msg_txni->ni_healthv);
+ /*
+ * It's possible msg_txpeer is NULL in the LOLND
+ * case.
+ */
+ if (msg->msg_txpeer)
+ lnet_inc_healthv(&msg->msg_txpeer->lpni_healthv);
+
/* we can finalize this message */
return -1;
case LNET_MSG_STATUS_LOCAL_INTERRUPT:
goto resend;
/*
- * TODO: since the remote dropped the message we can
- * attempt a resend safely.
- */
- case LNET_MSG_STATUS_REMOTE_DROPPED:
- break;
-
- /*
- * These errors will not trigger a resend so simply
- * finalize the message
- */
+ * These errors will not trigger a resend so simply
+ * finalize the message
+ */
case LNET_MSG_STATUS_LOCAL_ERROR:
lnet_handle_local_failure(msg);
return -1;
+
+ /*
+ * TODO: since the remote dropped the message we can
+ * attempt a resend safely.
+ */
+ case LNET_MSG_STATUS_REMOTE_DROPPED:
+ lnet_handle_remote_failure(msg);
+ goto resend;
+
case LNET_MSG_STATUS_REMOTE_ERROR:
case LNET_MSG_STATUS_REMOTE_TIMEOUT:
case LNET_MSG_STATUS_NETWORK_TIMEOUT:
+ lnet_handle_remote_failure(msg);
return -1;
+ default:
+ LBUG();
}
resend: