Whamcloud - gitweb
LU-9120 lnet: handle remote errors in LNet 67/32767/15
authorAmir Shehata <amir.shehata@intel.com>
Fri, 22 Jun 2018 17:42:23 +0000 (10:42 -0700)
committerAmir Shehata <ashehata@whamcloud.com>
Fri, 17 Aug 2018 19:55:19 +0000 (19:55 +0000)
Add health value in the peer NI structure. Decrement the
value whenever there is an error sending to the peer.
Modify the selection algorithm to look at the peer NI health
value when selecting the best peer NI to send to.

Put the peer NI on the recovery queue whenever there is
an error sending to it. Attempt only to resend on REMOTE
DROPPED since we're sure the message was never received by
the peer. For other errors finalize the message.

Test-Parameters: forbuildonly
Signed-off-by: Amir Shehata <ashehata@whamcloud.com>
Change-Id: Ibcb41b3fb538e76b973bcb10fcd07638c118acb9
Reviewed-on: https://review.whamcloud.com/32767
Tested-by: Jenkins
Reviewed-by: Olaf Weber <olaf.weber@hpe.com>
Reviewed-by: Sonia Sharma <sharmaso@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/lnet/peer.c

index 9536840..2d84c19 100644 (file)
@@ -1007,6 +1007,12 @@ lnet_peer_needs_push(struct lnet_peer *lp)
        return false;
 }
 
+static inline void
+lnet_inc_healthv(atomic_t *healthv)
+{
+       atomic_add_unless(healthv, 1, LNET_MAX_HEALTH_VALUE);
+}
+
 void lnet_incr_stats(struct lnet_element_stats *stats,
                     enum lnet_msg_type msg_type,
                     enum lnet_stats_type stats_type);
index dee0a46..3b06af2 100644 (file)
@@ -495,6 +495,8 @@ struct lnet_peer_ni {
        struct list_head        lpni_peer_nis;
        /* chain on remote peer list */
        struct list_head        lpni_on_remote_peer_ni_list;
+       /* chain on recovery queue */
+       struct list_head        lpni_recovery;
        /* chain on peer hash */
        struct list_head        lpni_hashlist;
        /* messages blocking for tx credits */
@@ -547,6 +549,10 @@ struct lnet_peer_ni {
        lnet_nid_t              lpni_nid;
        /* # refs */
        atomic_t                lpni_refcount;
+       /* health value for the peer */
+       atomic_t                lpni_healthv;
+       /* recovery ping mdh */
+       struct lnet_handle_md   lpni_recovery_ping_mdh;
        /* CPT this peer attached on */
        int                     lpni_cpt;
        /* state flags -- protected by lpni_lock */
@@ -576,6 +582,10 @@ struct lnet_peer_ni {
 
 /* Preferred path added due to traffic on non-MR peer_ni */
 #define LNET_PEER_NI_NON_MR_PREF       (1 << 0)
+/* peer is being recovered. */
+#define LNET_PEER_NI_RECOVERY_PENDING  (1 << 1)
+/* peer is being deleted */
+#define LNET_PEER_NI_DELETING          (1 << 2)
 
 struct lnet_peer {
        /* chain on pt_peer_list */
@@ -1071,6 +1081,8 @@ struct lnet {
        struct list_head                **ln_mt_resendqs;
        /* local NIs to recover */
        struct list_head                ln_mt_localNIRecovq;
+       /* local NIs to recover */
+       struct list_head                ln_mt_peerNIRecovq;
        /* recovery eq handler */
        struct lnet_handle_eq           ln_mt_eqh;
 
index c1718ce..a36cf3f 100644 (file)
@@ -838,6 +838,7 @@ lnet_prepare(lnet_pid_t requested_pid)
        INIT_LIST_HEAD(&the_lnet.ln_dc_working);
        INIT_LIST_HEAD(&the_lnet.ln_dc_expired);
        INIT_LIST_HEAD(&the_lnet.ln_mt_localNIRecovq);
+       INIT_LIST_HEAD(&the_lnet.ln_mt_peerNIRecovq);
        init_waitqueue_head(&the_lnet.ln_dc_waitq);
 
        rc = lnet_descriptor_setup();
index 969cb07..12ac396 100644 (file)
@@ -1212,15 +1212,6 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg)
        }
 
        if (txpeer != NULL) {
-               /*
-                * TODO:
-                * Once the patch for the health comes in we need to set
-                * the health of the peer ni to bad when we fail to send
-                * a message.
-                * int status = msg->msg_ev.status;
-                * if (status != 0)
-                *      lnet_set_peer_ni_health_locked(txpeer, false)
-                */
                msg->msg_txpeer = NULL;
                lnet_peer_ni_decref_locked(txpeer);
        }
@@ -1732,6 +1723,8 @@ lnet_select_peer_ni(struct lnet_send_data *sd, struct lnet_peer *peer,
        int best_lpni_credits = INT_MIN;
        bool preferred = false;
        bool ni_is_pref;
+       int best_lpni_healthv = 0;
+       int lpni_healthv;
 
        while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
                /*
@@ -1741,6 +1734,8 @@ lnet_select_peer_ni(struct lnet_send_data *sd, struct lnet_peer *peer,
                ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
                                                          best_ni->ni_nid);
 
+               lpni_healthv = atomic_read(&lpni->lpni_healthv);
+
                CDEBUG(D_NET, "%s ni_is_pref = %d\n",
                       libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
 
@@ -1750,8 +1745,13 @@ lnet_select_peer_ni(struct lnet_send_data *sd, struct lnet_peer *peer,
                                lpni->lpni_txcredits, best_lpni_credits,
                                lpni->lpni_seq, best_lpni->lpni_seq);
 
+               /* pick the healthiest peer ni */
+               if (lpni_healthv < best_lpni_healthv) {
+                       continue;
+               } else if (lpni_healthv > best_lpni_healthv) {
+                       best_lpni_healthv = lpni_healthv;
                /* if this is a preferred peer use it */
-               if (!preferred && ni_is_pref) {
+               } else if (!preferred && ni_is_pref) {
                        preferred = true;
                } else if (preferred && !ni_is_pref) {
                        /*
@@ -2643,6 +2643,16 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
        return 0;
 }
 
+enum lnet_mt_event_type {
+       MT_TYPE_LOCAL_NI = 0,
+       MT_TYPE_PEER_NI
+};
+
+struct lnet_mt_event_info {
+       enum lnet_mt_event_type mt_type;
+       lnet_nid_t mt_nid;
+};
+
 static void
 lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt)
 {
@@ -2740,6 +2750,7 @@ lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt)
 static void
 lnet_recover_local_nis(void)
 {
+       struct lnet_mt_event_info *ev_info;
        struct list_head processed_list;
        struct list_head local_queue;
        struct lnet_handle_md mdh;
@@ -2789,16 +2800,25 @@ lnet_recover_local_nis(void)
                lnet_ni_unlock(ni);
                lnet_net_unlock(0);
 
-               /*
-                * protect the ni->ni_state field. Once we call the
-                * lnet_send_ping function it's possible we receive
-                * a response before we check the rc. The lock ensures
-                * a stable value for the ni_state RECOVERY_PENDING bit
-                */
+
+               CDEBUG(D_NET, "attempting to recover local ni: %s\n",
+                      libcfs_nid2str(ni->ni_nid));
+
                lnet_ni_lock(ni);
                if (!(ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING)) {
                        ni->ni_state |= LNET_NI_STATE_RECOVERY_PENDING;
                        lnet_ni_unlock(ni);
+
+                       LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
+                       if (!ev_info) {
+                               CERROR("out of memory. Can't recover %s\n",
+                                      libcfs_nid2str(ni->ni_nid));
+                               lnet_ni_lock(ni);
+                               ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+                               lnet_ni_unlock(ni);
+                               continue;
+                       }
+
                        mdh = ni->ni_ping_mdh;
                        /*
                         * Invalidate the ni mdh in case it's deleted.
@@ -2829,9 +2849,10 @@ lnet_recover_local_nis(void)
                        lnet_ni_decref_locked(ni, 0);
                        lnet_net_unlock(0);
 
-                       rc = lnet_send_ping(nid, &mdh,
-                                           LNET_INTERFACES_MIN, (void *)nid,
-                                           the_lnet.ln_mt_eqh, true);
+                       ev_info->mt_type = MT_TYPE_LOCAL_NI;
+                       ev_info->mt_nid = nid;
+                       rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+                                           ev_info, the_lnet.ln_mt_eqh, true);
                        /* lookup the nid again */
                        lnet_net_lock(0);
                        ni = lnet_nid2ni_locked(nid, 0);
@@ -2938,6 +2959,44 @@ lnet_clean_local_ni_recoveryq(void)
 }
 
 static void
+lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt)
+{
+       struct lnet_handle_md recovery_mdh;
+
+       LNetInvalidateMDHandle(&recovery_mdh);
+
+       if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) {
+               recovery_mdh = lpni->lpni_recovery_ping_mdh;
+               LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+       }
+       spin_unlock(&lpni->lpni_lock);
+       lnet_net_unlock(cpt);
+       if (!LNetMDHandleIsInvalid(recovery_mdh))
+               LNetMDUnlink(recovery_mdh);
+       lnet_net_lock(cpt);
+       spin_lock(&lpni->lpni_lock);
+}
+
+static void
+lnet_clean_peer_ni_recoveryq(void)
+{
+       struct lnet_peer_ni *lpni, *tmp;
+
+       lnet_net_lock(LNET_LOCK_EX);
+
+       list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_mt_peerNIRecovq,
+                                lpni_recovery) {
+               list_del_init(&lpni->lpni_recovery);
+               spin_lock(&lpni->lpni_lock);
+               lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX);
+               spin_unlock(&lpni->lpni_lock);
+               lnet_peer_ni_decref_locked(lpni);
+       }
+
+       lnet_net_unlock(LNET_LOCK_EX);
+}
+
+static void
 lnet_clean_resendqs(void)
 {
        struct lnet_msg *msg, *tmp;
@@ -2960,6 +3019,130 @@ lnet_clean_resendqs(void)
        cfs_percpt_free(the_lnet.ln_mt_resendqs);
 }
 
+static void
+lnet_recover_peer_nis(void)
+{
+       struct lnet_mt_event_info *ev_info;
+       struct list_head processed_list;
+       struct list_head local_queue;
+       struct lnet_handle_md mdh;
+       struct lnet_peer_ni *lpni;
+       struct lnet_peer_ni *tmp;
+       lnet_nid_t nid;
+       int healthv;
+       int rc;
+
+       INIT_LIST_HEAD(&local_queue);
+       INIT_LIST_HEAD(&processed_list);
+
+       /*
+        * Always use cpt 0 for locking across all interactions with
+        * ln_mt_peerNIRecovq
+        */
+       lnet_net_lock(0);
+       list_splice_init(&the_lnet.ln_mt_peerNIRecovq,
+                        &local_queue);
+       lnet_net_unlock(0);
+
+       list_for_each_entry_safe(lpni, tmp, &local_queue,
+                                lpni_recovery) {
+               /*
+                * The same protection strategy is used here as is in the
+                * local recovery case.
+                */
+               lnet_net_lock(0);
+               healthv = atomic_read(&lpni->lpni_healthv);
+               spin_lock(&lpni->lpni_lock);
+               if (lpni->lpni_state & LNET_PEER_NI_DELETING ||
+                   healthv == LNET_MAX_HEALTH_VALUE) {
+                       list_del_init(&lpni->lpni_recovery);
+                       lnet_unlink_lpni_recovery_mdh_locked(lpni, 0);
+                       spin_unlock(&lpni->lpni_lock);
+                       lnet_peer_ni_decref_locked(lpni);
+                       lnet_net_unlock(0);
+                       continue;
+               }
+               spin_unlock(&lpni->lpni_lock);
+               lnet_net_unlock(0);
+
+               /*
+                * NOTE: we're racing with peer deletion from user space.
+                * It's possible that a peer is deleted after we check its
+                * state. In this case the recovery can create a new peer
+                */
+               spin_lock(&lpni->lpni_lock);
+               if (!(lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) &&
+                   !(lpni->lpni_state & LNET_PEER_NI_DELETING)) {
+                       lpni->lpni_state |= LNET_PEER_NI_RECOVERY_PENDING;
+                       spin_unlock(&lpni->lpni_lock);
+
+                       LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
+                       if (!ev_info) {
+                               CERROR("out of memory. Can't recover %s\n",
+                                      libcfs_nid2str(lpni->lpni_nid));
+                               spin_lock(&lpni->lpni_lock);
+                               lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+                               spin_unlock(&lpni->lpni_lock);
+                               continue;
+                       }
+
+                       /* look at the comments in lnet_recover_local_nis() */
+                       mdh = lpni->lpni_recovery_ping_mdh;
+                       LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+                       nid = lpni->lpni_nid;
+                       lnet_net_lock(0);
+                       list_del_init(&lpni->lpni_recovery);
+                       lnet_peer_ni_decref_locked(lpni);
+                       lnet_net_unlock(0);
+
+                       ev_info->mt_type = MT_TYPE_PEER_NI;
+                       ev_info->mt_nid = nid;
+                       rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+                                           ev_info, the_lnet.ln_mt_eqh, true);
+                       lnet_net_lock(0);
+                       /*
+                        * lnet_find_peer_ni_locked() grabs a refcount for
+                        * us. No need to take it explicitly.
+                        */
+                       lpni = lnet_find_peer_ni_locked(nid);
+                       if (!lpni) {
+                               lnet_net_unlock(0);
+                               LNetMDUnlink(mdh);
+                               continue;
+                       }
+
+                       lpni->lpni_recovery_ping_mdh = mdh;
+                       /*
+                        * While we're unlocked the lpni could've been
+                        * readded on the recovery queue. In this case we
+                        * don't need to add it to the local queue, since
+                        * it's already on there and the thread that added
+                        * it would've incremented the refcount on the
+                        * peer, which means we need to decref the refcount
+                        * that was implicitly grabbed by find_peer_ni_locked.
+                        * Otherwise, if the lpni is still not on
+                        * the recovery queue, then we'll add it to the
+                        * processed list.
+                        */
+                       if (list_empty(&lpni->lpni_recovery))
+                               list_add_tail(&lpni->lpni_recovery, &processed_list);
+                       else
+                               lnet_peer_ni_decref_locked(lpni);
+                       lnet_net_unlock(0);
+
+                       spin_lock(&lpni->lpni_lock);
+                       if (rc)
+                               lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+               }
+               spin_unlock(&lpni->lpni_lock);
+       }
+
+       list_splice_init(&processed_list, &local_queue);
+       lnet_net_lock(0);
+       list_splice(&local_queue, &the_lnet.ln_mt_peerNIRecovq);
+       lnet_net_unlock(0);
+}
+
 static int
 lnet_monitor_thread(void *arg)
 {
@@ -2983,6 +3166,8 @@ lnet_monitor_thread(void *arg)
 
                lnet_recover_local_nis();
 
+               lnet_recover_peer_nis();
+
                /*
                 * TODO do we need to check if we should sleep without
                 * timeout?  Technically, an active system will always
@@ -3073,10 +3258,62 @@ fail_error:
 }
 
 static void
+lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
+                          int status)
+{
+       lnet_nid_t nid = ev_info->mt_nid;
+
+       if (ev_info->mt_type == MT_TYPE_LOCAL_NI) {
+               struct lnet_ni *ni;
+
+               lnet_net_lock(0);
+               ni = lnet_nid2ni_locked(nid, 0);
+               if (!ni) {
+                       lnet_net_unlock(0);
+                       return;
+               }
+               lnet_ni_lock(ni);
+               ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+               lnet_ni_unlock(ni);
+               lnet_net_unlock(0);
+
+               if (status != 0) {
+                       CERROR("local NI recovery failed with %d\n", status);
+                       return;
+               }
+               /*
+                * need to increment healthv for the ni here, because in
+                * the lnet_finalize() path we don't have access to this
+                * NI. And in order to get access to it, we'll need to
+                * carry forward too much information.
+                * In the peer case, it'll naturally be incremented
+                */
+               lnet_inc_healthv(&ni->ni_healthv);
+       } else {
+               struct lnet_peer_ni *lpni;
+               int cpt;
+
+               cpt = lnet_net_lock_current();
+               lpni = lnet_find_peer_ni_locked(nid);
+               if (!lpni) {
+                       lnet_net_unlock(cpt);
+                       return;
+               }
+               spin_lock(&lpni->lpni_lock);
+               lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+               spin_unlock(&lpni->lpni_lock);
+               lnet_peer_ni_decref_locked(lpni);
+               lnet_net_unlock(cpt);
+
+               if (status != 0)
+                       CERROR("peer NI recovery failed with %d\n", status);
+       }
+}
+
+static void
 lnet_mt_event_handler(struct lnet_event *event)
 {
-       lnet_nid_t nid = (lnet_nid_t) event->md.user_ptr;
-       struct lnet_ni *ni;
+       struct lnet_mt_event_info *ev_info = event->md.user_ptr;
        struct lnet_ping_buffer *pbuf;
 
        /* TODO: remove assert */
@@ -3088,38 +3325,24 @@ lnet_mt_event_handler(struct lnet_event *event)
               event->status);
 
        switch (event->type) {
+       case LNET_EVENT_UNLINK:
+               CDEBUG(D_NET, "%s recovery ping unlinked\n",
+                      libcfs_nid2str(ev_info->mt_nid));
        case LNET_EVENT_REPLY:
-               /*
-                * If the NI has been restored completely then remove from
-                * the recovery queue
-                */
-               lnet_net_lock(0);
-               ni = lnet_nid2ni_locked(nid, 0);
-               if (!ni) {
-                       lnet_net_unlock(0);
-                       break;
-               }
-               lnet_ni_lock(ni);
-               ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
-               lnet_ni_unlock(ni);
-               lnet_net_unlock(0);
+               lnet_handle_recovery_reply(ev_info, event->status);
                break;
        case LNET_EVENT_SEND:
                CDEBUG(D_NET, "%s recovery message sent %s:%d\n",
-                              libcfs_nid2str(nid),
+                              libcfs_nid2str(ev_info->mt_nid),
                               (event->status) ? "unsuccessfully" :
                               "successfully", event->status);
                break;
-       case LNET_EVENT_UNLINK:
-               /* nothing to do */
-               CDEBUG(D_NET, "%s recovery ping unlinked\n",
-                      libcfs_nid2str(nid));
-               break;
        default:
                CERROR("Unexpected event: %d\n", event->type);
-               return;
+               break;
        }
        if (event->unlinked) {
+               LIBCFS_FREE(ev_info, sizeof(*ev_info));
                pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
                lnet_ping_buffer_decref(pbuf);
        }
@@ -3171,14 +3394,16 @@ clean_thread:
        lnet_router_cleanup();
 free_mem:
        the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
-       lnet_clean_resendqs();
        lnet_clean_local_ni_recoveryq();
+       lnet_clean_peer_ni_recoveryq();
+       lnet_clean_resendqs();
        LNetEQFree(the_lnet.ln_mt_eqh);
        LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
        return rc;
 clean_queues:
-       lnet_clean_resendqs();
        lnet_clean_local_ni_recoveryq();
+       lnet_clean_peer_ni_recoveryq();
+       lnet_clean_resendqs();
        return rc;
 }
 
@@ -3201,8 +3426,9 @@ void lnet_monitor_thr_stop(void)
 
        /* perform cleanup tasks */
        lnet_router_cleanup();
-       lnet_clean_resendqs();
        lnet_clean_local_ni_recoveryq();
+       lnet_clean_peer_ni_recoveryq();
+       lnet_clean_resendqs();
        rc = LNetEQFree(the_lnet.ln_mt_eqh);
        LASSERT(rc == 0);
        return;
index 12a8c0c..9b3358e 100644 (file)
@@ -474,12 +474,6 @@ lnet_dec_healthv_locked(atomic_t *healthv)
        }
 }
 
-static inline void
-lnet_inc_healthv(atomic_t *healthv)
-{
-       atomic_add_unless(healthv, 1, LNET_MAX_HEALTH_VALUE);
-}
-
 static void
 lnet_handle_local_failure(struct lnet_msg *msg)
 {
@@ -518,6 +512,43 @@ lnet_handle_local_failure(struct lnet_msg *msg)
        lnet_net_unlock(0);
 }
 
+static void
+lnet_handle_remote_failure(struct lnet_msg *msg)
+{
+       struct lnet_peer_ni *lpni;
+
+       lpni = msg->msg_txpeer;
+
+       /* lpni could be NULL if we're in the LOLND case */
+       if (!lpni)
+               return;
+
+       lnet_net_lock(0);
+       /* the mt could've shutdown and cleaned up the queues */
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
+               lnet_net_unlock(0);
+               return;
+       }
+
+       lnet_dec_healthv_locked(&lpni->lpni_healthv);
+       /*
+        * add the peer NI to the recovery queue if it's not already there
+        * and it's health value is actually below the maximum. It's
+        * possible that the sensitivity might be set to 0, and the health
+        * value will not be reduced. In this case, there is no reason to
+        * invoke recovery
+        */
+       if (list_empty(&lpni->lpni_recovery) &&
+           atomic_read(&lpni->lpni_healthv) < LNET_MAX_HEALTH_VALUE) {
+               CERROR("lpni %s added to recovery queue. Health = %d\n",
+                       libcfs_nid2str(lpni->lpni_nid),
+                       atomic_read(&lpni->lpni_healthv));
+               list_add_tail(&lpni->lpni_recovery, &the_lnet.ln_mt_peerNIRecovq);
+               lnet_peer_ni_addref_locked(lpni);
+       }
+       lnet_net_unlock(0);
+}
+
 /*
  * Do a health check on the message:
  * return -1 if we're not going to handle the error
@@ -528,11 +559,21 @@ static int
 lnet_health_check(struct lnet_msg *msg)
 {
        enum lnet_msg_hstatus hstatus = msg->msg_health_status;
+       bool lo = false;
 
        /* TODO: lnet_incr_hstats(hstatus); */
 
        LASSERT(msg->msg_txni);
 
+       /*
+        * if we're sending to the LOLND then the msg_txpeer will not be
+        * set. So no need to sanity check it.
+        */
+       if (LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) != LOLND)
+               LASSERT(msg->msg_txpeer);
+       else
+               lo = true;
+
        if (hstatus != LNET_MSG_STATUS_OK &&
            ktime_compare(ktime_get(), msg->msg_deadline) >= 0)
                return -1;
@@ -541,9 +582,22 @@ lnet_health_check(struct lnet_msg *msg)
        if (the_lnet.ln_state != LNET_STATE_RUNNING)
                return -1;
 
+       CDEBUG(D_NET, "health check: %s->%s: %s: %s\n",
+              libcfs_nid2str(msg->msg_txni->ni_nid),
+              (lo) ? "self" : libcfs_nid2str(msg->msg_txpeer->lpni_nid),
+              lnet_msgtyp2str(msg->msg_type),
+              lnet_health_error2str(hstatus));
+
        switch (hstatus) {
        case LNET_MSG_STATUS_OK:
                lnet_inc_healthv(&msg->msg_txni->ni_healthv);
+               /*
+                * It's possible msg_txpeer is NULL in the LOLND
+                * case.
+                */
+               if (msg->msg_txpeer)
+                       lnet_inc_healthv(&msg->msg_txpeer->lpni_healthv);
+
                /* we can finalize this message */
                return -1;
        case LNET_MSG_STATUS_LOCAL_INTERRUPT:
@@ -556,23 +610,28 @@ lnet_health_check(struct lnet_msg *msg)
                goto resend;
 
        /*
-               * TODO: since the remote dropped the message we can
-               * attempt a resend safely.
-               */
-       case LNET_MSG_STATUS_REMOTE_DROPPED:
-       break;
-
-       /*
-               * These errors will not trigger a resend so simply
-               * finalize the message
-               */
+        * These errors will not trigger a resend so simply
+        * finalize the message
+        */
        case LNET_MSG_STATUS_LOCAL_ERROR:
                lnet_handle_local_failure(msg);
                return -1;
+
+       /*
+        * TODO: since the remote dropped the message we can
+        * attempt a resend safely.
+        */
+       case LNET_MSG_STATUS_REMOTE_DROPPED:
+               lnet_handle_remote_failure(msg);
+               goto resend;
+
        case LNET_MSG_STATUS_REMOTE_ERROR:
        case LNET_MSG_STATUS_REMOTE_TIMEOUT:
        case LNET_MSG_STATUS_NETWORK_TIMEOUT:
+               lnet_handle_remote_failure(msg);
                return -1;
+       default:
+               LBUG();
        }
 
 resend:
index 80dd177..b20f8ed 100644 (file)
@@ -166,6 +166,7 @@ lnet_peer_ni_alloc(lnet_nid_t nid)
        INIT_LIST_HEAD(&lpni->lpni_routes);
        INIT_LIST_HEAD(&lpni->lpni_hashlist);
        INIT_LIST_HEAD(&lpni->lpni_peer_nis);
+       INIT_LIST_HEAD(&lpni->lpni_recovery);
        INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
 
        spin_lock_init(&lpni->lpni_lock);
@@ -175,6 +176,7 @@ lnet_peer_ni_alloc(lnet_nid_t nid)
        lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
        lpni->lpni_nid = nid;
        lpni->lpni_cpt = cpt;
+       atomic_set(&lpni->lpni_healthv, LNET_MAX_HEALTH_VALUE);
        lnet_set_peer_ni_health_locked(lpni, true);
 
        net = lnet_get_net_locked(LNET_NIDNET(nid));
@@ -374,6 +376,14 @@ lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
        /* remove peer ni from the hash list. */
        list_del_init(&lpni->lpni_hashlist);
 
+       /*
+        * indicate the peer is being deleted so the monitor thread can
+        * remove it from the recovery queue.
+        */
+       spin_lock(&lpni->lpni_lock);
+       lpni->lpni_state |= LNET_PEER_NI_DELETING;
+       spin_unlock(&lpni->lpni_lock);
+
        /* decrement the ref count on the peer table */
        ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
        LASSERT(ptable->pt_number > 0);