Whamcloud - gitweb
LU-9120 lnet: health error simulation
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
index ca5df09..8fed27f 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #include <lnet/lib-lnet.h>
 
 void
-lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
+lnet_build_unlink_event(struct lnet_libmd *md, struct lnet_event *ev)
 {
-        ENTRY;
+       ENTRY;
 
-        memset(ev, 0, sizeof(*ev));
+       memset(ev, 0, sizeof(*ev));
 
-        ev->status   = 0;
-        ev->unlinked = 1;
-        ev->type     = LNET_EVENT_UNLINK;
-        lnet_md_deconstruct(md, &ev->md);
-        lnet_md2handle(&ev->md_handle, md);
-        EXIT;
+       ev->status   = 0;
+       ev->unlinked = 1;
+       ev->type     = LNET_EVENT_UNLINK;
+       lnet_md_deconstruct(md, &ev->md);
+       lnet_md2handle(&ev->md_handle, md);
+       EXIT;
 }
 
 /*
  * Don't need any lock, must be called after lnet_commit_md
  */
 void
-lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
+lnet_build_msg_event(struct lnet_msg *msg, enum lnet_event_kind ev_type)
 {
-       lnet_hdr_t      *hdr = &msg->msg_hdr;
-       lnet_event_t    *ev  = &msg->msg_ev;
+       struct lnet_hdr *hdr = &msg->msg_hdr;
+       struct lnet_event *ev = &msg->msg_ev;
 
        LASSERT(!msg->msg_routing);
 
        ev->type = ev_type;
+       ev->msg_type = msg->msg_type;
 
        if (ev_type == LNET_EVENT_SEND) {
                /* event for active message */
-               ev->target.nid    = le64_to_cpu(hdr->dest_nid);
-               ev->target.pid    = le32_to_cpu(hdr->dest_pid);
+               ev->target.nid    = le64_to_cpu(hdr->dest_nid);
+               ev->target.pid    = le32_to_cpu(hdr->dest_pid);
                ev->initiator.nid = LNET_NID_ANY;
                ev->initiator.pid = the_lnet.ln_pid;
+               ev->source.nid    = LNET_NID_ANY;
+               ev->source.pid    = the_lnet.ln_pid;
                ev->sender        = LNET_NID_ANY;
-
        } else {
                /* event for passive message */
-               ev->target.pid    = hdr->dest_pid;
-               ev->target.nid    = hdr->dest_nid;
+               ev->target.pid    = hdr->dest_pid;
+               ev->target.nid    = hdr->dest_nid;
                ev->initiator.pid = hdr->src_pid;
-               ev->initiator.nid = hdr->src_nid;
+               /* Multi-Rail: resolve src_nid to "primary" peer NID */
+               ev->initiator.nid = msg->msg_initiator;
+               /* Multi-Rail: track source NID. */
+               ev->source.pid    = hdr->src_pid;
+               ev->source.nid    = hdr->src_nid;
                ev->rlength       = hdr->payload_length;
                ev->sender        = msg->msg_from;
                ev->mlength       = msg->msg_wanted;
@@ -137,17 +139,21 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
 }
 
 void
-lnet_msg_commit(lnet_msg_t *msg, int cpt)
+lnet_msg_commit(struct lnet_msg *msg, int cpt)
 {
        struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt];
-       lnet_counters_t           *counters  = the_lnet.ln_counters[cpt];
+       struct lnet_counters *counters = the_lnet.ln_counters[cpt];
+       s64 timeout_ns;
+
+       /* set the message deadline */
+       timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
+       msg->msg_deadline = ktime_add_ns(ktime_get(), timeout_ns);
 
        /* routed message can be committed for both receiving and sending */
        LASSERT(!msg->msg_tx_committed);
 
        if (msg->msg_sending) {
                LASSERT(!msg->msg_receiving);
-
                msg->msg_tx_cpt = cpt;
                msg->msg_tx_committed = 1;
                if (msg->msg_rx_committed) { /* routed message REPLY */
@@ -161,8 +167,9 @@ lnet_msg_commit(lnet_msg_t *msg, int cpt)
        }
 
        LASSERT(!msg->msg_onactivelist);
+
        msg->msg_onactivelist = 1;
-       list_add(&msg->msg_activelist, &container->msc_active);
+       list_add_tail(&msg->msg_activelist, &container->msc_active);
 
        counters->msgs_alloc++;
        if (counters->msgs_alloc > counters->msgs_max)
@@ -170,10 +177,10 @@ lnet_msg_commit(lnet_msg_t *msg, int cpt)
 }
 
 static void
-lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
+lnet_msg_decommit_tx(struct lnet_msg *msg, int status)
 {
-       lnet_counters_t *counters;
-       lnet_event_t    *ev = &msg->msg_ev;
+       struct lnet_counters *counters;
+       struct lnet_event *ev = &msg->msg_ev;
 
        LASSERT(msg->msg_tx_committed);
        if (status != 0)
@@ -188,7 +195,7 @@ lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
 
                counters->route_length += msg->msg_len;
                counters->route_count++;
-               goto out;
+               goto incr_stats;
 
        case LNET_EVENT_PUT:
                /* should have been decommitted */
@@ -214,16 +221,26 @@ lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
        }
 
        counters->send_count++;
+
+incr_stats:
+       if (msg->msg_txpeer)
+               lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
+                               msg->msg_type,
+                               LNET_STATS_TYPE_SEND);
+       if (msg->msg_txni)
+               lnet_incr_stats(&msg->msg_txni->ni_stats,
+                               msg->msg_type,
+                               LNET_STATS_TYPE_SEND);
  out:
        lnet_return_tx_credits_locked(msg);
        msg->msg_tx_committed = 0;
 }
 
 static void
-lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
+lnet_msg_decommit_rx(struct lnet_msg *msg, int status)
 {
-       lnet_counters_t *counters;
-       lnet_event_t    *ev = &msg->msg_ev;
+       struct lnet_counters *counters;
+       struct lnet_event *ev = &msg->msg_ev;
 
        LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */
        LASSERT(msg->msg_rx_committed);
@@ -236,7 +253,7 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
        default:
                LASSERT(ev->type == 0);
                LASSERT(msg->msg_routing);
-               goto out;
+               goto incr_stats;
 
        case LNET_EVENT_ACK:
                LASSERT(msg->msg_type == LNET_MSG_ACK);
@@ -265,6 +282,16 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
        }
 
        counters->recv_count++;
+
+incr_stats:
+       if (msg->msg_rxpeer)
+               lnet_incr_stats(&msg->msg_rxpeer->lpni_stats,
+                               msg->msg_type,
+                               LNET_STATS_TYPE_RECV);
+       if (msg->msg_rxni)
+               lnet_incr_stats(&msg->msg_rxni->ni_stats,
+                               msg->msg_type,
+                               LNET_STATS_TYPE_RECV);
        if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
                counters->recv_length += msg->msg_wanted;
 
@@ -274,7 +301,7 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
 }
 
 void
-lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status)
+lnet_msg_decommit(struct lnet_msg *msg, int cpt, int status)
 {
        int     cpt2 = cpt;
 
@@ -308,7 +335,7 @@ lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status)
 }
 
 void
-lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
+lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md,
                   unsigned int offset, unsigned int mlen)
 {
        /* NB: @offset and @len are only useful for receiving */
@@ -336,10 +363,10 @@ lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
 }
 
 void
-lnet_msg_detach_md(lnet_msg_t *msg, int status)
+lnet_msg_detach_md(struct lnet_msg *msg, int status)
 {
-       lnet_libmd_t    *md = msg->msg_md;
-       int             unlink;
+       struct lnet_libmd *md = msg->msg_md;
+       int unlink;
 
        /* Now it's safe to drop my caller's ref */
        md->md_refcount--;
@@ -359,32 +386,32 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status)
 }
 
 static int
-lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
+lnet_complete_msg_locked(struct lnet_msg *msg, int cpt)
 {
-        lnet_handle_wire_t ack_wmd;
-        int                rc;
-        int                status = msg->msg_ev.status;
+       struct lnet_handle_wire ack_wmd;
+       int                rc;
+       int                status = msg->msg_ev.status;
 
-        LASSERT (msg->msg_onactivelist);
+       LASSERT(msg->msg_onactivelist);
 
-        if (status == 0 && msg->msg_ack) {
-                /* Only send an ACK if the PUT completed successfully */
+       if (status == 0 && msg->msg_ack) {
+               /* Only send an ACK if the PUT completed successfully */
 
                lnet_msg_decommit(msg, cpt, 0);
 
                msg->msg_ack = 0;
                lnet_net_unlock(cpt);
 
-                LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
-                LASSERT(!msg->msg_routing);
+               LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
+               LASSERT(!msg->msg_routing);
 
-                ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
+               ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
 
-                lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
+               lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.source, 0, 0);
 
-                msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
-                msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
-                msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
+               msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
+               msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
+               msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
 
                /* NB: we probably want to use NID of msg::msg_from as 3rd
                 * parameter (router NID) if it's routed message */
@@ -430,51 +457,439 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
        }
 
        lnet_msg_decommit(msg, cpt, status);
-       lnet_msg_free_locked(msg);
+       lnet_msg_free(msg);
        return 0;
 }
 
+static void
+lnet_dec_healthv_locked(atomic_t *healthv)
+{
+       int h = atomic_read(healthv);
+
+       if (h < lnet_health_sensitivity) {
+               atomic_set(healthv, 0);
+       } else {
+               h -= lnet_health_sensitivity;
+               atomic_set(healthv, h);
+       }
+}
+
+static void
+lnet_handle_local_failure(struct lnet_msg *msg)
+{
+       struct lnet_ni *local_ni;
+
+       local_ni = msg->msg_txni;
+
+       /*
+        * the lnet_net_lock(0) is used to protect the addref on the ni
+        * and the recovery queue.
+        */
+       lnet_net_lock(0);
+       /* the mt could've shutdown and cleaned up the queues */
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
+               lnet_net_unlock(0);
+               return;
+       }
+
+       lnet_dec_healthv_locked(&local_ni->ni_healthv);
+       /*
+        * add the NI to the recovery queue if it's not already there
+        * and it's health value is actually below the maximum. It's
+        * possible that the sensitivity might be set to 0, and the health
+        * value will not be reduced. In this case, there is no reason to
+        * invoke recovery
+        */
+       if (list_empty(&local_ni->ni_recovery) &&
+           atomic_read(&local_ni->ni_healthv) < LNET_MAX_HEALTH_VALUE) {
+               CERROR("ni %s added to recovery queue. Health = %d\n",
+                       libcfs_nid2str(local_ni->ni_nid),
+                       atomic_read(&local_ni->ni_healthv));
+               list_add_tail(&local_ni->ni_recovery,
+                             &the_lnet.ln_mt_localNIRecovq);
+               lnet_ni_addref_locked(local_ni, 0);
+       }
+       lnet_net_unlock(0);
+}
+
+static void
+lnet_handle_remote_failure(struct lnet_msg *msg)
+{
+       struct lnet_peer_ni *lpni;
+
+       lpni = msg->msg_txpeer;
+
+       /* lpni could be NULL if we're in the LOLND case */
+       if (!lpni)
+               return;
+
+       lnet_net_lock(0);
+       lnet_dec_healthv_locked(&lpni->lpni_healthv);
+       /*
+        * add the peer NI to the recovery queue if it's not already there
+        * and it's health value is actually below the maximum. It's
+        * possible that the sensitivity might be set to 0, and the health
+        * value will not be reduced. In this case, there is no reason to
+        * invoke recovery
+        */
+       lnet_peer_ni_add_to_recoveryq_locked(lpni);
+       lnet_net_unlock(0);
+}
+
+static void
+lnet_incr_hstats(struct lnet_msg *msg, enum lnet_msg_hstatus hstatus)
+{
+       struct lnet_ni *ni = msg->msg_txni;
+       struct lnet_peer_ni *lpni = msg->msg_txpeer;
+       struct lnet_counters *counters = the_lnet.ln_counters[0];
+
+       switch (hstatus) {
+       case LNET_MSG_STATUS_LOCAL_INTERRUPT:
+               atomic_inc(&ni->ni_hstats.hlt_local_interrupt);
+               counters->local_interrupt_count++;
+               break;
+       case LNET_MSG_STATUS_LOCAL_DROPPED:
+               atomic_inc(&ni->ni_hstats.hlt_local_dropped);
+               counters->local_dropped_count++;
+               break;
+       case LNET_MSG_STATUS_LOCAL_ABORTED:
+               atomic_inc(&ni->ni_hstats.hlt_local_aborted);
+               counters->local_aborted_count++;
+               break;
+       case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
+               atomic_inc(&ni->ni_hstats.hlt_local_no_route);
+               counters->local_no_route_count++;
+               break;
+       case LNET_MSG_STATUS_LOCAL_TIMEOUT:
+               atomic_inc(&ni->ni_hstats.hlt_local_timeout);
+               counters->local_timeout_count++;
+               break;
+       case LNET_MSG_STATUS_LOCAL_ERROR:
+               atomic_inc(&ni->ni_hstats.hlt_local_error);
+               counters->local_error_count++;
+               break;
+       case LNET_MSG_STATUS_REMOTE_DROPPED:
+               if (lpni)
+                       atomic_inc(&lpni->lpni_hstats.hlt_remote_dropped);
+               counters->remote_dropped_count++;
+               break;
+       case LNET_MSG_STATUS_REMOTE_ERROR:
+               if (lpni)
+                       atomic_inc(&lpni->lpni_hstats.hlt_remote_error);
+               counters->remote_error_count++;
+               break;
+       case LNET_MSG_STATUS_REMOTE_TIMEOUT:
+               if (lpni)
+                       atomic_inc(&lpni->lpni_hstats.hlt_remote_timeout);
+               counters->remote_timeout_count++;
+               break;
+       case LNET_MSG_STATUS_NETWORK_TIMEOUT:
+               if (lpni)
+                       atomic_inc(&lpni->lpni_hstats.hlt_network_timeout);
+               counters->network_timeout_count++;
+               break;
+       case LNET_MSG_STATUS_OK:
+               break;
+       default:
+               LBUG();
+       }
+}
+
+/*
+ * Do a health check on the message:
+ * return -1 if we're not going to handle the error or
+ *   if we've reached the maximum number of retries.
+ *   success case will return -1 as well
+ * return 0 if it the message is requeued for send
+ */
+static int
+lnet_health_check(struct lnet_msg *msg)
+{
+       enum lnet_msg_hstatus hstatus = msg->msg_health_status;
+       bool lo = false;
+
+       /* if we're shutting down no point in handling health. */
+       if (the_lnet.ln_state != LNET_STATE_RUNNING)
+               return -1;
+
+       LASSERT(msg->msg_txni);
+
+       /*
+        * if we're sending to the LOLND then the msg_txpeer will not be
+        * set. So no need to sanity check it.
+        */
+       if (LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) != LOLND)
+               LASSERT(msg->msg_txpeer);
+       else
+               lo = true;
+
+       if (hstatus != LNET_MSG_STATUS_OK &&
+           ktime_compare(ktime_get(), msg->msg_deadline) >= 0)
+               return -1;
+
+       /*
+        * stats are only incremented for errors so avoid wasting time
+        * incrementing statistics if there is no error.
+        */
+       if (hstatus != LNET_MSG_STATUS_OK) {
+               lnet_net_lock(0);
+               lnet_incr_hstats(msg, hstatus);
+               lnet_net_unlock(0);
+       }
+
+       CDEBUG(D_NET, "health check: %s->%s: %s: %s\n",
+              libcfs_nid2str(msg->msg_txni->ni_nid),
+              (lo) ? "self" : libcfs_nid2str(msg->msg_txpeer->lpni_nid),
+              lnet_msgtyp2str(msg->msg_type),
+              lnet_health_error2str(hstatus));
+
+       switch (hstatus) {
+       case LNET_MSG_STATUS_OK:
+               lnet_inc_healthv(&msg->msg_txni->ni_healthv);
+               /*
+                * It's possible msg_txpeer is NULL in the LOLND
+                * case.
+                */
+               if (msg->msg_txpeer)
+                       lnet_inc_healthv(&msg->msg_txpeer->lpni_healthv);
+
+               /* we can finalize this message */
+               return -1;
+       case LNET_MSG_STATUS_LOCAL_INTERRUPT:
+       case LNET_MSG_STATUS_LOCAL_DROPPED:
+       case LNET_MSG_STATUS_LOCAL_ABORTED:
+       case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
+       case LNET_MSG_STATUS_LOCAL_TIMEOUT:
+               lnet_handle_local_failure(msg);
+               /* add to the re-send queue */
+               goto resend;
+
+       /*
+        * These errors will not trigger a resend so simply
+        * finalize the message
+        */
+       case LNET_MSG_STATUS_LOCAL_ERROR:
+               lnet_handle_local_failure(msg);
+               return -1;
+
+       /*
+        * TODO: since the remote dropped the message we can
+        * attempt a resend safely.
+        */
+       case LNET_MSG_STATUS_REMOTE_DROPPED:
+               lnet_handle_remote_failure(msg);
+               goto resend;
+
+       case LNET_MSG_STATUS_REMOTE_ERROR:
+       case LNET_MSG_STATUS_REMOTE_TIMEOUT:
+       case LNET_MSG_STATUS_NETWORK_TIMEOUT:
+               lnet_handle_remote_failure(msg);
+               return -1;
+       default:
+               LBUG();
+       }
+
+resend:
+       /* don't resend recovery messages */
+       if (msg->msg_recovery)
+               return -1;
+
+       /*
+        * if we explicitly indicated we don't want to resend then just
+        * return
+        */
+       if (msg->msg_no_resend)
+               return -1;
+
+       /* check if the message has exceeded the number of retries */
+       if (msg->msg_retry_count >= lnet_retry_count)
+               return -1;
+       msg->msg_retry_count++;
+
+       lnet_net_lock(msg->msg_tx_cpt);
+
+       /*
+        * remove message from the active list and reset it in preparation
+        * for a resend. Two exception to this
+        *
+        * 1. the router case, whe a message is committed for rx when
+        * received, then tx when it is sent. When committed to both tx and
+        * rx we don't want to remove it from the active list.
+        *
+        * 2. The REPLY case since it uses the same msg block for the GET
+        * that was received.
+        */
+       if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) {
+               list_del_init(&msg->msg_activelist);
+               msg->msg_onactivelist = 0;
+       }
+       /*
+        * The msg_target.nid which was originally set
+        * when calling LNetGet() or LNetPut() might've
+        * been overwritten if we're routing this message.
+        * Call lnet_return_tx_credits_locked() to return
+        * the credit this message consumed. The message will
+        * consume another credit when it gets resent.
+        */
+       msg->msg_target.nid = msg->msg_hdr.dest_nid;
+       lnet_msg_decommit_tx(msg, -EAGAIN);
+       msg->msg_sending = 0;
+       msg->msg_receiving = 0;
+       msg->msg_target_is_router = 0;
+
+       CDEBUG(D_NET, "%s->%s:%s:%s - queuing for resend\n",
+              libcfs_nid2str(msg->msg_hdr.src_nid),
+              libcfs_nid2str(msg->msg_hdr.dest_nid),
+              lnet_msgtyp2str(msg->msg_type),
+              lnet_health_error2str(hstatus));
+
+       list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]);
+       lnet_net_unlock(msg->msg_tx_cpt);
+
+       wake_up(&the_lnet.ln_mt_waitq);
+       return 0;
+}
+
+static void
+lnet_detach_md(struct lnet_msg *msg, int status)
+{
+       int cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
+
+       lnet_res_lock(cpt);
+       lnet_msg_detach_md(msg, status);
+       lnet_res_unlock(cpt);
+}
+
+static bool
+lnet_is_health_check(struct lnet_msg *msg)
+{
+       bool hc;
+       int status = msg->msg_ev.status;
+
+       /*
+        * perform a health check for any message committed for transmit
+        */
+       hc = msg->msg_tx_committed;
+
+       /* Check for status inconsistencies */
+       if (hc &&
+           ((!status && msg->msg_health_status != LNET_MSG_STATUS_OK) ||
+            (status && msg->msg_health_status == LNET_MSG_STATUS_OK))) {
+               CERROR("Msg is in inconsistent state, don't perform health "
+                      "checking (%d, %d)\n", status, msg->msg_health_status);
+               hc = false;
+       }
+
+       CDEBUG(D_NET, "health check = %d, status = %d, hstatus = %d\n",
+              hc, status, msg->msg_health_status);
+
+       return hc;
+}
+
+char *
+lnet_health_error2str(enum lnet_msg_hstatus hstatus)
+{
+       switch (hstatus) {
+       case LNET_MSG_STATUS_LOCAL_INTERRUPT:
+               return "LOCAL_INTERRUPT";
+       case LNET_MSG_STATUS_LOCAL_DROPPED:
+               return "LOCAL_DROPPED";
+       case LNET_MSG_STATUS_LOCAL_ABORTED:
+               return "LOCAL_ABORTED";
+       case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
+               return "LOCAL_NO_ROUTE";
+       case LNET_MSG_STATUS_LOCAL_TIMEOUT:
+               return "LOCAL_TIMEOUT";
+       case LNET_MSG_STATUS_LOCAL_ERROR:
+               return "LOCAL_ERROR";
+       case LNET_MSG_STATUS_REMOTE_DROPPED:
+               return "REMOTE_DROPPED";
+       case LNET_MSG_STATUS_REMOTE_ERROR:
+               return "REMOTE_ERROR";
+       case LNET_MSG_STATUS_REMOTE_TIMEOUT:
+               return "REMOTE_TIMEOUT";
+       case LNET_MSG_STATUS_NETWORK_TIMEOUT:
+               return "NETWORK_TIMEOUT";
+       case LNET_MSG_STATUS_OK:
+               return "OK";
+       default:
+               return "<UNKNOWN>";
+       }
+}
+
+bool
+lnet_send_error_simulation(struct lnet_msg *msg,
+                          enum lnet_msg_hstatus *hstatus)
+{
+       if (!msg)
+               return false;
+
+       if (list_empty(&the_lnet.ln_drop_rules))
+           return false;
+
+       /* match only health rules */
+       if (!lnet_drop_rule_match(&msg->msg_hdr, hstatus))
+               return false;
+
+       CDEBUG(D_NET, "src %s, dst %s: %s simulate health error: %s\n",
+               libcfs_nid2str(msg->msg_hdr.src_nid),
+               libcfs_nid2str(msg->msg_hdr.dest_nid),
+               lnet_msgtyp2str(msg->msg_type),
+               lnet_health_error2str(*hstatus));
+
+       return true;
+}
+EXPORT_SYMBOL(lnet_send_error_simulation);
+
 void
-lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
+lnet_finalize(struct lnet_msg *msg, int status)
 {
-       struct lnet_msg_container       *container;
-       int                             my_slot;
-       int                             cpt;
-       int                             rc;
-       int                             i;
+       struct lnet_msg_container *container;
+       int my_slot;
+       int cpt;
+       int rc;
+       int i;
+       bool hc;
 
-       LASSERT (!in_interrupt ());
+       LASSERT(!in_interrupt());
 
        if (msg == NULL)
                return;
-#if 0
-        CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
-               lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
-               msg->msg_target_is_router ? "t" : "",
-               msg->msg_routing ? "X" : "",
-               msg->msg_ack ? "A" : "",
-               msg->msg_sending ? "S" : "",
-               msg->msg_receiving ? "R" : "",
-               msg->msg_delayed ? "d" : "",
-               msg->msg_txcredit ? "C" : "",
-               msg->msg_peertxcredit ? "c" : "",
-               msg->msg_rtrcredit ? "F" : "",
-               msg->msg_peerrtrcredit ? "f" : "",
-               msg->msg_onactivelist ? "!" : "",
-               msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
-               msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
-#endif
-        msg->msg_ev.status = status;
-
-       if (msg->msg_md != NULL) {
-               cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
 
-               lnet_res_lock(cpt);
-               lnet_msg_detach_md(msg, status);
-               lnet_res_unlock(cpt);
+       msg->msg_ev.status = status;
+
+       /*
+        * if this is an ACK or a REPLY then make sure to remove the
+        * response tracker.
+        */
+       if (msg->msg_ev.type == LNET_EVENT_REPLY ||
+           msg->msg_ev.type == LNET_EVENT_ACK) {
+               cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
+               lnet_detach_rsp_tracker(msg->msg_md, cpt);
        }
 
- again:
+       /* if the message is successfully sent, no need to keep the MD around */
+       if (msg->msg_md != NULL && !status)
+               lnet_detach_md(msg, status);
+
+again:
+       hc = lnet_is_health_check(msg);
+
+       /*
+        * the MD would've been detached from the message if it was
+        * successfully sent. However, if it wasn't successfully sent the
+        * MD would be around. And since we recalculate whether to
+        * health check or not, it's possible that we change our minds and
+        * we don't want to health check this message. In this case also
+        * free the MD.
+        *
+        * If the message is successful we're going to
+        * go through the lnet_health_check() function, but that'll just
+        * increment the appropriate health value and return.
+        */
+       if (msg->msg_md != NULL && !hc)
+               lnet_detach_md(msg, status);
+
        rc = 0;
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
                /* not committed to network yet */
@@ -483,6 +898,30 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                return;
        }
 
+       if (hc) {
+               /*
+                * Check the health status of the message. If it has one
+                * of the errors that we're supposed to handle, and it has
+                * not timed out, then
+                *      1. Decrement the appropriate health_value
+                *      2. queue the message on the resend queue
+
+                * if the message send is success, timed out or failed in the
+                * health check for any reason then we'll just finalize the
+                * message. Otherwise just return since the message has been
+                * put on the resend queue.
+                */
+               if (!lnet_health_check(msg))
+                       return;
+
+               /*
+                * if we get here then we need to clean up the md because we're
+                * finalizing the message.
+               */
+               if (msg->msg_md != NULL)
+                       lnet_detach_md(msg, status);
+       }
+
        /*
         * NB: routed message can be committed for both receiving and sending,
         * we should finalize in LIFO order and keep counters correct.
@@ -497,7 +936,6 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        /* Recursion breaker.  Don't complete the message here if I am (or
         * enough other threads are) already completing messages */
 
-#ifdef __KERNEL__
        my_slot = -1;
        for (i = 0; i < container->msc_nfinalizers; i++) {
                if (container->msc_finalizers[i] == current)
@@ -513,22 +951,12 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        }
 
        container->msc_finalizers[my_slot] = current;
-#else
-       LASSERT(container->msc_nfinalizers == 1);
-       if (container->msc_finalizers[0] != NULL) {
-               lnet_net_unlock(cpt);
-               return;
-       }
-
-       my_slot = i = 0;
-       container->msc_finalizers[0] = (struct lnet_msg_container *)1;
-#endif
 
        while (!list_empty(&container->msc_finalizing)) {
                msg = list_entry(container->msc_finalizing.next,
-                                lnet_msg_t, msg_list);
+                                struct lnet_msg, msg_list);
 
-               list_del(&msg->msg_list);
+               list_del_init(&msg->msg_list);
 
                /* NB drops and regains the lnet lock if it actually does
                 * anything, so my finalizing friends can chomp along too */
@@ -554,18 +982,19 @@ EXPORT_SYMBOL(lnet_finalize);
 void
 lnet_msg_container_cleanup(struct lnet_msg_container *container)
 {
-       int     count = 0;
+       int     count = 0;
 
        if (container->msc_init == 0)
                return;
 
        while (!list_empty(&container->msc_active)) {
-               lnet_msg_t *msg = list_entry(container->msc_active.next,
-                                            lnet_msg_t, msg_activelist);
+               struct lnet_msg *msg;
 
+               msg  = list_entry(container->msc_active.next,
+                                 struct lnet_msg, msg_activelist);
                LASSERT(msg->msg_onactivelist);
                msg->msg_onactivelist = 0;
-               list_del(&msg->msg_activelist);
+               list_del_init(&msg->msg_activelist);
                lnet_msg_free(msg);
                count++;
        }
@@ -579,37 +1008,23 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container)
                            sizeof(*container->msc_finalizers));
                container->msc_finalizers = NULL;
        }
-#ifdef LNET_USE_LIB_FREELIST
-       lnet_freelist_fini(&container->msc_freelist);
-#endif
        container->msc_init = 0;
 }
 
 int
 lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
 {
-       int     rc;
+       int rc = 0;
 
        container->msc_init = 1;
 
        INIT_LIST_HEAD(&container->msc_active);
        INIT_LIST_HEAD(&container->msc_finalizing);
 
-#ifdef LNET_USE_LIB_FREELIST
-       memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
-
-       rc = lnet_freelist_init(&container->msc_freelist,
-                               LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
-       if (rc != 0) {
-               CERROR("Failed to init freelist for message container\n");
-               lnet_msg_container_cleanup(container);
-               return rc;
-       }
-#else
-       rc = 0;
-#endif
        /* number of CPUs */
        container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
+       if (container->msc_nfinalizers == 0)
+               container->msc_nfinalizers = 1;
 
        LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt,
                         container->msc_nfinalizers *
@@ -628,7 +1043,7 @@ void
 lnet_msg_containers_destroy(void)
 {
        struct lnet_msg_container *container;
-       int     i;
+       int     i;
 
        if (the_lnet.ln_msg_containers == NULL)
                return;