Whamcloud - gitweb
LU-7734 lnet: proper cpt locking
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index e9a63eb..ce62ff9 100644 (file)
@@ -627,12 +627,11 @@ lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
        if (len != 0)
                lnet_setpayloadbuffer(msg);
 
-       memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr));
-       msg->msg_hdr.type           = cpu_to_le32(type);
-       msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
-       msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
+       memset (&msg->msg_hdr, 0, sizeof (msg->msg_hdr));
+       msg->msg_hdr.type           = cpu_to_le32(type);
+       msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
        /* src_nid will be set later */
-       msg->msg_hdr.src_pid        = cpu_to_le32(the_lnet.ln_pid);
+       msg->msg_hdr.src_pid        = cpu_to_le32(the_lnet.ln_pid);
        msg->msg_hdr.payload_length = cpu_to_le32(len);
 }
 
@@ -667,7 +666,7 @@ lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
        if (rc != 0) {
                CERROR("recv from %s / send to %s aborted: "
                       "eager_recv failed %d\n",
-                      libcfs_nid2str(msg->msg_rxpeer->lp_nid),
+                      libcfs_nid2str(msg->msg_rxpeer->lpni_nid),
                       libcfs_id2str(msg->msg_target), rc);
                LASSERT(rc < 0); /* required by my callers */
        }
@@ -675,28 +674,36 @@ lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
        return rc;
 }
 
-/* NB: caller shall hold a ref on 'lp' as I'd drop lnet_net_lock */
+/*
+ * This function can be called from two paths:
+ *     1. when sending a message
+ *     2. when decommiting a message (lnet_msg_decommit_tx())
+ * In both these cases the peer_ni should have it's reference count
+ * acquired by the caller and therefore it is safe to drop the spin
+ * lock before calling lnd_query()
+ */
 static void
-lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
+lnet_ni_query_locked(lnet_ni_t *ni, struct lnet_peer_ni *lp)
 {
        cfs_time_t last_alive = 0;
+       int cpt = lnet_cpt_of_nid_locked(lp->lpni_nid, ni);
 
        LASSERT(lnet_peer_aliveness_enabled(lp));
        LASSERT(ni->ni_net->net_lnd->lnd_query != NULL);
 
-       lnet_net_unlock(lp->lp_cpt);
-       (ni->ni_net->net_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
-       lnet_net_lock(lp->lp_cpt);
+       lnet_net_unlock(cpt);
+       (ni->ni_net->net_lnd->lnd_query)(ni, lp->lpni_nid, &last_alive);
+       lnet_net_lock(cpt);
 
-       lp->lp_last_query = cfs_time_current();
+       lp->lpni_last_query = cfs_time_current();
 
        if (last_alive != 0) /* NI has updated timestamp */
-               lp->lp_last_alive = last_alive;
+               lp->lpni_last_alive = last_alive;
 }
 
 /* NB: always called with lnet_net_lock held */
 static inline int
-lnet_peer_is_alive (lnet_peer_t *lp, cfs_time_t now)
+lnet_peer_is_alive (struct lnet_peer_ni *lp, cfs_time_t now)
 {
        int        alive;
        cfs_time_t deadline;
@@ -707,24 +714,31 @@ lnet_peer_is_alive (lnet_peer_t *lp, cfs_time_t now)
         * Trust lnet_notify() if it has more recent aliveness news, but
         * ignore the initial assumed death (see lnet_peers_start_down()).
         */
-       if (!lp->lp_alive && lp->lp_alive_count > 0 &&
-           cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
+       spin_lock(&lp->lpni_lock);
+       if (!lp->lpni_alive && lp->lpni_alive_count > 0 &&
+           cfs_time_aftereq(lp->lpni_timestamp, lp->lpni_last_alive)) {
+               spin_unlock(&lp->lpni_lock);
                return 0;
+       }
 
        deadline =
-         cfs_time_add(lp->lp_last_alive,
-                      cfs_time_seconds(lp->lp_net->net_tunables.
+         cfs_time_add(lp->lpni_last_alive,
+                      cfs_time_seconds(lp->lpni_net->net_tunables.
                                        lct_peer_timeout));
        alive = cfs_time_after(deadline, now);
 
        /*
         * Update obsolete lp_alive except for routers assumed to be dead
         * initially, because router checker would update aliveness in this
-        * case, and moreover lp_last_alive at peer creation is assumed.
+        * case, and moreover lpni_last_alive at peer creation is assumed.
         */
-       if (alive && !lp->lp_alive &&
-           !(lnet_isrouter(lp) && lp->lp_alive_count == 0))
-               lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
+       if (alive && !lp->lpni_alive &&
+           !(lnet_isrouter(lp) && lp->lpni_alive_count == 0)) {
+               spin_unlock(&lp->lpni_lock);
+               lnet_notify_locked(lp, 0, 1, lp->lpni_last_alive);
+       } else {
+               spin_unlock(&lp->lpni_lock);
+       }
 
        return alive;
 }
@@ -733,7 +747,7 @@ lnet_peer_is_alive (lnet_peer_t *lp, cfs_time_t now)
 /* NB: returns 1 when alive, 0 when dead, negative when error;
  *     may drop the lnet_net_lock */
 static int
-lnet_peer_alive_locked (struct lnet_ni *ni, lnet_peer_t *lp)
+lnet_peer_alive_locked (struct lnet_ni *ni, struct lnet_peer_ni *lp)
 {
        cfs_time_t now = cfs_time_current();
 
@@ -747,21 +761,21 @@ lnet_peer_alive_locked (struct lnet_ni *ni, lnet_peer_t *lp)
         * Peer appears dead, but we should avoid frequent NI queries (at
         * most once per lnet_queryinterval seconds).
         */
-       if (lp->lp_last_query != 0) {
+       if (lp->lpni_last_query != 0) {
                static const int lnet_queryinterval = 1;
 
                cfs_time_t next_query =
-                          cfs_time_add(lp->lp_last_query,
+                          cfs_time_add(lp->lpni_last_query,
                                        cfs_time_seconds(lnet_queryinterval));
 
                if (cfs_time_before(now, next_query)) {
-                       if (lp->lp_alive)
+                       if (lp->lpni_alive)
                                CWARN("Unexpected aliveness of peer %s: "
                                      "%d < %d (%d/%d)\n",
-                                     libcfs_nid2str(lp->lp_nid),
+                                     libcfs_nid2str(lp->lpni_nid),
                                      (int)now, (int)next_query,
                                      lnet_queryinterval,
-                                     lp->lp_net->net_tunables.lct_peer_timeout);
+                                     lp->lpni_net->net_tunables.lct_peer_timeout);
                        return 0;
                }
        }
@@ -772,7 +786,7 @@ lnet_peer_alive_locked (struct lnet_ni *ni, lnet_peer_t *lp)
        if (lnet_peer_is_alive(lp, now))
                return 1;
 
-       lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
+       lnet_notify_locked(lp, 0, 0, lp->lpni_last_alive);
        return 0;
 }
 
@@ -790,8 +804,8 @@ lnet_peer_alive_locked (struct lnet_ni *ni, lnet_peer_t *lp)
 static int
 lnet_post_send_locked(lnet_msg_t *msg, int do_send)
 {
-       lnet_peer_t             *lp = msg->msg_txpeer;
-       lnet_ni_t               *ni = msg->msg_txni;
+       struct lnet_peer_ni     *lp = msg->msg_txpeer;
+       struct lnet_ni          *ni = msg->msg_txni;
        int                     cpt = msg->msg_tx_cpt;
        struct lnet_tx_queue    *tq = ni->ni_tx_queues[cpt];
 
@@ -806,6 +820,10 @@ lnet_post_send_locked(lnet_msg_t *msg, int do_send)
                the_lnet.ln_counters[cpt]->drop_count++;
                the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
                lnet_net_unlock(cpt);
+               if (msg->msg_txpeer)
+                       atomic_inc(&msg->msg_txpeer->lpni_stats.drop_count);
+               if (msg->msg_txni)
+                       atomic_inc(&msg->msg_txni->ni_stats.drop_count);
 
                CNETERR("Dropping message for %s: peer not alive\n",
                        libcfs_id2str(msg->msg_target));
@@ -831,19 +849,19 @@ lnet_post_send_locked(lnet_msg_t *msg, int do_send)
        }
 
        if (!msg->msg_peertxcredit) {
-               LASSERT((lp->lp_txcredits < 0) ==
-                       !list_empty(&lp->lp_txq));
+               LASSERT((lp->lpni_txcredits < 0) ==
+                       !list_empty(&lp->lpni_txq));
 
                msg->msg_peertxcredit = 1;
-               lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
-               lp->lp_txcredits--;
+               lp->lpni_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
+               lp->lpni_txcredits--;
 
-               if (lp->lp_txcredits < lp->lp_mintxcredits)
-                       lp->lp_mintxcredits = lp->lp_txcredits;
+               if (lp->lpni_txcredits < lp->lpni_mintxcredits)
+                       lp->lpni_mintxcredits = lp->lpni_txcredits;
 
-               if (lp->lp_txcredits < 0) {
+               if (lp->lpni_txcredits < 0) {
                        msg->msg_tx_delayed = 1;
-                       list_add_tail(&msg->msg_list, &lp->lp_txq);
+                       list_add_tail(&msg->msg_list, &lp->lpni_txq);
                        return LNET_CREDIT_WAIT;
                }
        }
@@ -854,6 +872,7 @@ lnet_post_send_locked(lnet_msg_t *msg, int do_send)
 
                msg->msg_txcredit = 1;
                tq->tq_credits--;
+               atomic_dec(&ni->ni_tx_credits);
 
                if (tq->tq_credits < tq->tq_credits_min)
                        tq->tq_credits_min = tq->tq_credits;
@@ -901,34 +920,34 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
         * sets do_recv FALSE and I don't do the unlock/send/lock bit.
         * I return LNET_CREDIT_WAIT if msg blocked and LNET_CREDIT_OK if
         * received or OK to receive */
-       lnet_peer_t         *lp = msg->msg_rxpeer;
+       struct lnet_peer_ni *lp = msg->msg_rxpeer;
        lnet_rtrbufpool_t   *rbp;
-       lnet_rtrbuf_t       *rb;
+       lnet_rtrbuf_t       *rb;
 
-       LASSERT(msg->msg_iov == NULL);
-       LASSERT(msg->msg_kiov == NULL);
-       LASSERT(msg->msg_niov == 0);
-       LASSERT(msg->msg_routing);
-       LASSERT(msg->msg_receiving);
-       LASSERT(!msg->msg_sending);
+       LASSERT (msg->msg_iov == NULL);
+       LASSERT (msg->msg_kiov == NULL);
+       LASSERT (msg->msg_niov == 0);
+       LASSERT (msg->msg_routing);
+       LASSERT (msg->msg_receiving);
+       LASSERT (!msg->msg_sending);
 
        /* non-lnet_parse callers only receive delayed messages */
        LASSERT(!do_recv || msg->msg_rx_delayed);
 
        if (!msg->msg_peerrtrcredit) {
-               LASSERT((lp->lp_rtrcredits < 0) ==
-                       !list_empty(&lp->lp_rtrq));
+               LASSERT((lp->lpni_rtrcredits < 0) ==
+                       !list_empty(&lp->lpni_rtrq));
 
                msg->msg_peerrtrcredit = 1;
-               lp->lp_rtrcredits--;
-               if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
-                       lp->lp_minrtrcredits = lp->lp_rtrcredits;
+               lp->lpni_rtrcredits--;
+               if (lp->lpni_rtrcredits < lp->lpni_minrtrcredits)
+                       lp->lpni_minrtrcredits = lp->lpni_rtrcredits;
 
-               if (lp->lp_rtrcredits < 0) {
+               if (lp->lpni_rtrcredits < 0) {
                        /* must have checked eager_recv before here */
                        LASSERT(msg->msg_rx_ready_delay);
                        msg->msg_rx_delayed = 1;
-                       list_add_tail(&msg->msg_list, &lp->lp_rtrq);
+                       list_add_tail(&msg->msg_list, &lp->lpni_rtrq);
                        return LNET_CREDIT_WAIT;
                }
        }
@@ -971,9 +990,9 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
 void
 lnet_return_tx_credits_locked(lnet_msg_t *msg)
 {
-       lnet_peer_t     *txpeer = msg->msg_txpeer;
-       lnet_msg_t      *msg2;
-       struct lnet_ni  *txni = msg->msg_txni;
+       struct lnet_peer_ni     *txpeer = msg->msg_txpeer;
+       struct lnet_ni          *txni = msg->msg_txni;
+       lnet_msg_t              *msg2;
 
        if (msg->msg_txcredit) {
                struct lnet_ni       *ni = msg->msg_txni;
@@ -986,6 +1005,7 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
                        !list_empty(&tq->tq_delayed));
 
                tq->tq_credits++;
+               atomic_inc(&ni->ni_tx_credits);
                if (tq->tq_credits <= 0) {
                        msg2 = list_entry(tq->tq_delayed.next,
                                          lnet_msg_t, msg_list);
@@ -1002,16 +1022,16 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
                /* give back peer txcredits */
                msg->msg_peertxcredit = 0;
 
-               LASSERT((txpeer->lp_txcredits < 0) ==
-                       !list_empty(&txpeer->lp_txq));
+               LASSERT((txpeer->lpni_txcredits < 0) ==
+                       !list_empty(&txpeer->lpni_txq));
 
-               txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
-               LASSERT(txpeer->lp_txqnob >= 0);
+               txpeer->lpni_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
+               LASSERT (txpeer->lpni_txqnob >= 0);
 
-               txpeer->lp_txcredits++;
-               if (txpeer->lp_txcredits <= 0) {
-                       msg2 = list_entry(txpeer->lp_txq.next,
-                                             lnet_msg_t, msg_list);
+               txpeer->lpni_txcredits++;
+               if (txpeer->lpni_txcredits <= 0) {
+                       msg2 = list_entry(txpeer->lpni_txq.next,
+                                              lnet_msg_t, msg_list);
                        list_del(&msg2->msg_list);
 
                        LASSERT(msg2->msg_txpeer == txpeer);
@@ -1027,8 +1047,17 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
        }
 
        if (txpeer != NULL) {
+               /*
+                * TODO:
+                * Once the patch for the health comes in we need to set
+                * the health of the peer ni to bad when we fail to send
+                * a message.
+                * int status = msg->msg_ev.status;
+                * if (status != 0)
+                *      lnet_set_peer_ni_health_locked(txpeer, false)
+                */
                msg->msg_txpeer = NULL;
-               lnet_peer_decref_locked(txpeer);
+               lnet_peer_ni_decref_locked(txpeer);
        }
 }
 
@@ -1072,9 +1101,9 @@ lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
 void
 lnet_return_rx_credits_locked(lnet_msg_t *msg)
 {
-       lnet_peer_t     *rxpeer = msg->msg_rxpeer;
-       struct lnet_ni  *rxni = msg->msg_rxni;
-       lnet_msg_t      *msg2;
+       struct lnet_peer_ni     *rxpeer = msg->msg_rxpeer;
+       struct lnet_ni          *rxni = msg->msg_rxni;
+       lnet_msg_t              *msg2;
 
        if (msg->msg_rtrcredit) {
                /* give back global router credits */
@@ -1125,18 +1154,18 @@ routing_off:
                /* give back peer router credits */
                msg->msg_peerrtrcredit = 0;
 
-               LASSERT((rxpeer->lp_rtrcredits < 0) ==
-                       !list_empty(&rxpeer->lp_rtrq));
+               LASSERT((rxpeer->lpni_rtrcredits < 0) ==
+                       !list_empty(&rxpeer->lpni_rtrq));
 
-               rxpeer->lp_rtrcredits++;
+               rxpeer->lpni_rtrcredits++;
 
                /* drop all messages which are queued to be routed on that
                 * peer. */
                if (!the_lnet.ln_routing) {
-                       lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq,
+                       lnet_drop_routed_msgs_locked(&rxpeer->lpni_rtrq,
                                                     msg->msg_rx_cpt);
-               } else if (rxpeer->lp_rtrcredits <= 0) {
-                       msg2 = list_entry(rxpeer->lp_rtrq.next,
+               } else if (rxpeer->lpni_rtrcredits <= 0) {
+                       msg2 = list_entry(rxpeer->lpni_rtrq.next,
                                          lnet_msg_t, msg_list);
                        list_del(&msg2->msg_list);
 
@@ -1149,49 +1178,60 @@ routing_off:
        }
        if (rxpeer != NULL) {
                msg->msg_rxpeer = NULL;
-               lnet_peer_decref_locked(rxpeer);
+               lnet_peer_ni_decref_locked(rxpeer);
        }
 }
 
 static int
+lnet_compare_peers(struct lnet_peer_ni *p1, struct lnet_peer_ni *p2)
+{
+       if (p1->lpni_txqnob < p2->lpni_txqnob)
+               return 1;
+
+       if (p1->lpni_txqnob > p2->lpni_txqnob)
+               return -1;
+
+       if (p1->lpni_txcredits > p2->lpni_txcredits)
+               return 1;
+
+       if (p1->lpni_txcredits < p2->lpni_txcredits)
+               return -1;
+
+       return 0;
+}
+
+static int
 lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
 {
-       lnet_peer_t *p1 = r1->lr_gateway;
-       lnet_peer_t *p2 = r2->lr_gateway;
+       struct lnet_peer_ni *p1 = r1->lr_gateway;
+       struct lnet_peer_ni *p2 = r2->lr_gateway;
        int r1_hops = (r1->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r1->lr_hops;
        int r2_hops = (r2->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r2->lr_hops;
+       int rc;
 
        if (r1->lr_priority < r2->lr_priority)
                return 1;
 
        if (r1->lr_priority > r2->lr_priority)
-               return -ERANGE;
+               return -1;
 
        if (r1_hops < r2_hops)
                return 1;
 
        if (r1_hops > r2_hops)
-               return -ERANGE;
-
-       if (p1->lp_txqnob < p2->lp_txqnob)
-               return 1;
-
-       if (p1->lp_txqnob > p2->lp_txqnob)
-               return -ERANGE;
-
-       if (p1->lp_txcredits > p2->lp_txcredits)
-               return 1;
+               return -1;
 
-       if (p1->lp_txcredits < p2->lp_txcredits)
-               return -ERANGE;
+       rc = lnet_compare_peers(p1, p2);
+       if (rc)
+               return rc;
 
        if (r1->lr_seq - r2->lr_seq <= 0)
                return 1;
 
-       return -ERANGE;
+       return -1;
 }
 
-static lnet_peer_t *
+static struct lnet_peer_ni *
 lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
                       lnet_nid_t rtr_nid)
 {
@@ -1199,8 +1239,8 @@ lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
        lnet_route_t            *route;
        lnet_route_t            *best_route;
        lnet_route_t            *last_route;
-       struct lnet_peer        *lp_best;
-       struct lnet_peer        *lp;
+       struct lnet_peer_ni     *lpni_best;
+       struct lnet_peer_ni     *lp;
        int                     rc;
 
        /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
@@ -1210,7 +1250,7 @@ lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
        if (rnet == NULL)
                return NULL;
 
-       lp_best = NULL;
+       lpni_best = NULL;
        best_route = last_route = NULL;
        list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
                lp = route->lr_gateway;
@@ -1218,15 +1258,15 @@ lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
                if (!lnet_is_route_alive(route))
                        continue;
 
-               if (net != NULL && lp->lp_net != net)
+               if (net != NULL && lp->lpni_net != net)
                        continue;
 
-               if (lp->lp_nid == rtr_nid) /* it's pre-determined router */
+               if (lp->lpni_nid == rtr_nid) /* it's pre-determined router */
                        return lp;
 
-               if (lp_best == NULL) {
+               if (lpni_best == NULL) {
                        best_route = last_route = route;
-                       lp_best = lp;
+                       lpni_best = lp;
                        continue;
                }
 
@@ -1239,7 +1279,7 @@ lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
                        continue;
 
                best_route = route;
-               lp_best = lp;
+               lpni_best = lp;
        }
 
        /* set sequence number on the best router to the latest sequence + 1
@@ -1247,169 +1287,562 @@ lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
         * harmless and functional  */
        if (best_route != NULL)
                best_route->lr_seq = last_route->lr_seq + 1;
-       return lp_best;
+       return lpni_best;
 }
 
-int
-lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
+static int
+lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
+                   struct lnet_msg *msg, lnet_nid_t rtr_nid, bool *lo_sent)
 {
-       lnet_nid_t              dst_nid = msg->msg_target.nid;
-       struct lnet_ni          *src_ni;
-       struct lnet_ni          *local_ni;
-       struct lnet_peer        *lp;
-       int                     cpt;
-       int                     cpt2;
-       int                     rc;
-
-       /* NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
-        * but we might want to use pre-determined router for ACK/REPLY
-        * in the future */
-       /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
-       LASSERT(msg->msg_txpeer == NULL);
-       LASSERT(!msg->msg_sending);
-       LASSERT(!msg->msg_target_is_router);
-       LASSERT(!msg->msg_receiving);
+       struct lnet_ni          *best_ni = NULL;
+       struct lnet_peer_ni     *best_lpni = NULL;
+       struct lnet_peer_ni     *net_gw = NULL;
+       struct lnet_peer_ni     *best_gw = NULL;
+       struct lnet_peer_ni     *lpni;
+       struct lnet_peer        *peer = NULL;
+       struct lnet_peer_net    *peer_net;
+       struct lnet_net         *local_net;
+       struct lnet_ni          *ni = NULL;
+       int                     cpt, cpt2, rc;
+       bool                    routing = false;
+       bool                    ni_is_pref = false;
+       bool                    preferred = false;
+       int                     best_credits = 0;
+       __u32                   seq, seq2;
+       int                     best_lpni_credits = INT_MIN;
+       int                     md_cpt = 0;
+       int                     shortest_distance = INT_MAX;
+       int                     distance = 0;
+       bool                    found_ir = false;
+
+again:
+       /*
+        * get an initial CPT to use for locking. The idea here is not to
+        * serialize the calls to select_pathway, so that as many
+        * operations can run concurrently as possible. To do that we use
+        * the CPT where this call is being executed. Later on when we
+        * determine the CPT to use in lnet_message_commit, we switch the
+        * lock and check if there was any configuration changes, if none,
+        * then we proceed, if there is, then we'll need to update the cpt
+        * and redo the operation.
+        */
+       cpt = lnet_net_lock_current();
 
-       msg->msg_sending = 1;
+       best_gw = NULL;
+       routing = false;
+       local_net = NULL;
+       best_ni = NULL;
+       shortest_distance = INT_MAX;
+       found_ir = false;
 
-       LASSERT(!msg->msg_tx_committed);
-       local_ni = lnet_net2ni(LNET_NIDNET(dst_nid));
-       cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid,
-                             local_ni);
- again:
-       if (the_lnet.ln_shutdown)
+       if (the_lnet.ln_shutdown) {
+               lnet_net_unlock(cpt);
                return -ESHUTDOWN;
-       lnet_net_lock(cpt);
+       }
 
-       if (src_nid == LNET_NID_ANY) {
-               src_ni = NULL;
-       } else {
-               src_ni = lnet_nid2ni_locked(src_nid, cpt);
-               if (src_ni == NULL) {
+       if (msg->msg_md != NULL)
+               /* get the cpt of the MD, used during NUMA based selection */
+               md_cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
+       else
+               md_cpt = CFS_CPT_ANY;
+
+       /*
+        * initialize the variables which could be reused if we go to
+        * again
+        */
+       lpni = NULL;
+       seq = lnet_get_dlc_seq_locked();
+
+       peer = lnet_find_or_create_peer_locked(dst_nid, cpt);
+       if (IS_ERR(peer)) {
+               lnet_net_unlock(cpt);
+               return PTR_ERR(peer);
+       }
+
+       /* If peer is not healthy then can not send anything to it */
+       if (!lnet_is_peer_healthy_locked(peer)) {
+               lnet_net_unlock(cpt);
+               return -EHOSTUNREACH;
+       }
+
+       if (!peer->lp_multi_rail && lnet_get_num_peer_nis(peer) > 1) {
+               CERROR("peer %s is declared to be non MR capable, "
+                      "yet configured with more than one NID\n",
+                      libcfs_nid2str(dst_nid));
+               return -EINVAL;
+       }
+
+       /*
+        * STEP 1: first jab at determineing best_ni
+        * if src_nid is explicitly specified, then best_ni is already
+        * pre-determiend for us. Otherwise we need to select the best
+        * one to use later on
+        */
+       if (src_nid != LNET_NID_ANY) {
+               best_ni = lnet_nid2ni_locked(src_nid, cpt);
+               if (!best_ni) {
                        lnet_net_unlock(cpt);
                        LCONSOLE_WARN("Can't send to %s: src %s is not a "
                                      "local nid\n", libcfs_nid2str(dst_nid),
                                      libcfs_nid2str(src_nid));
                        return -EINVAL;
                }
-               LASSERT(!msg->msg_routing);
-       }
-
-       /* Is this for someone on a local network? */
-       local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid), cpt);
 
-       if (local_ni != NULL) {
-               if (src_ni == NULL) {
-                       src_ni = local_ni;
-                       src_nid = src_ni->ni_nid;
-               } else if (src_ni != local_ni) {
+               if (best_ni->ni_net->net_id != LNET_NIDNET(dst_nid)) {
                        lnet_net_unlock(cpt);
                        LCONSOLE_WARN("No route to %s via from %s\n",
                                      libcfs_nid2str(dst_nid),
                                      libcfs_nid2str(src_nid));
                        return -EINVAL;
                }
+       }
 
-               LASSERT(src_nid != LNET_NID_ANY);
-               lnet_msg_commit(msg, cpt);
+       if (best_ni)
+               goto pick_peer;
 
-               if (!msg->msg_routing)
-                       msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
+       /*
+        * Decide whether we need to route to peer_ni.
+        * Get the local net that I need to be on to be able to directly
+        * send to that peer.
+        *
+        * a. Find the peer which the dst_nid belongs to.
+        * b. Iterate through each of the peer_nets/nis to decide
+        * the best peer/local_ni pair to use
+        */
+       list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_on_peer_list) {
+               if (!lnet_is_peer_net_healthy_locked(peer_net))
+                       continue;
 
-               if (src_ni == the_lnet.ln_loni) {
-                       /* No send credit hassles with LOLND */
-                       lnet_net_unlock(cpt);
-                       lnet_ni_send(src_ni, msg);
-                       return 0;
-               }
+               local_net = lnet_get_net_locked(peer_net->lpn_net_id);
+               if (!local_net) {
+                       /*
+                        * go through each peer_ni on that peer_net and
+                        * determine the best possible gw to go through
+                        */
+                       list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
+                                           lpni_on_peer_net_list) {
+                               net_gw = lnet_find_route_locked(NULL,
+                                                               lpni->lpni_nid,
+                                                               rtr_nid);
+
+                               /*
+                                * if no route is found for that network then
+                                * move onto the next peer_ni in the peer
+                                */
+                               if (!net_gw)
+                                       continue;
+
+                               if (!best_gw) {
+                                       best_gw = net_gw;
+                                       best_lpni = lpni;
+                               } else  {
+                                       rc = lnet_compare_peers(net_gw,
+                                                               best_gw);
+                                       if (rc > 0) {
+                                               best_gw = net_gw;
+                                               best_lpni = lpni;
+                                       }
+                               }
+                       }
 
-               rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
-               if (rc != 0) {
-                       lnet_net_unlock(cpt);
-                       LCONSOLE_WARN("Error %d finding peer %s\n", rc,
-                                     libcfs_nid2str(dst_nid));
-                       /* ENOMEM or shutting down */
-                       return rc;
-               }
-               LASSERT (lp->lp_net == src_ni->ni_net);
-       } else {
-               /* sending to a remote network */
-               lp = lnet_find_route_locked(src_ni != NULL ?
-                                           src_ni->ni_net : NULL,
-                                           dst_nid, rtr_nid);
-               if (lp == NULL) {
-                       lnet_net_unlock(cpt);
+                       if (!best_gw)
+                               continue;
 
-                       LCONSOLE_WARN("No route to %s via %s "
-                                     "(all routers down)\n",
-                                     libcfs_id2str(msg->msg_target),
-                                     libcfs_nid2str(src_nid));
-                       return -EHOSTUNREACH;
+                       local_net = lnet_get_net_locked
+                                       (LNET_NIDNET(best_gw->lpni_nid));
+                       routing = true;
+               } else {
+                       routing = false;
+                       best_gw = NULL;
                }
 
-               /* rtr_nid is LNET_NID_ANY or NID of pre-determined router,
-                * it's possible that rtr_nid isn't LNET_NID_ANY and lp isn't
-                * pre-determined router, this can happen if router table
-                * was changed when we release the lock */
-               if (rtr_nid != lp->lp_nid) {
-                       cpt2 = lp->lp_cpt;
-                       if (cpt2 != cpt) {
-                               lnet_net_unlock(cpt);
-
-                               rtr_nid = lp->lp_nid;
-                               cpt = cpt2;
-                               goto again;
+               /* no routable net found go on to a different net */
+               if (!local_net)
+                       continue;
+
+               /*
+                * Iterate through the NIs in this local Net and select
+                * the NI to send from. The selection is determined by
+                * these 3 criterion in the following priority:
+                *      1. NUMA
+                *      2. NI available credits
+                *      3. Round Robin
+                */
+               while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
+                       int ni_credits;
+
+                       if (!lnet_is_ni_healthy_locked(ni))
+                               continue;
+
+                       ni_credits = atomic_read(&ni->ni_tx_credits);
+
+                       /*
+                        * calculate the distance from the cpt on which
+                        * the message memory is allocated to the CPT of
+                        * the NI's physical device
+                        */
+                       distance = cfs_cpt_distance(lnet_cpt_table(),
+                                                   md_cpt,
+                                                   ni->dev_cpt);
+
+                       /*
+                        * If we already have a closer NI within the NUMA
+                        * range provided, then there is no need to
+                        * consider the current NI. Move on to the next
+                        * one.
+                        */
+                       if (distance > shortest_distance &&
+                           distance > lnet_get_numa_range())
+                               continue;
+
+                       if (distance < shortest_distance &&
+                           distance > lnet_get_numa_range()) {
+                               /*
+                                * The current NI is the closest one that we
+                                * have found, even though it's not in the
+                                * NUMA range specified. This occurs if
+                                * the NUMA range is less than the least
+                                * of the distances in the system.
+                                * In effect NUMA range consideration is
+                                * turned off.
+                                */
+                               shortest_distance = distance;
+                       } else if ((distance <= shortest_distance &&
+                                   distance < lnet_get_numa_range()) ||
+                                  distance == shortest_distance) {
+                               /*
+                                * This NI is either within range or it's
+                                * equidistant. In both of these cases we
+                                * would want to select the NI based on
+                                * its available credits first, and then
+                                * via Round Robin.
+                                */
+                               if (distance <= shortest_distance &&
+                                   distance < lnet_get_numa_range()) {
+                                       /*
+                                        * If this is the first NI that's
+                                        * within range, then set the
+                                        * shortest distance to the range
+                                        * specified by the user. In
+                                        * effect we're saying that all
+                                        * NIs that fall within this NUMA
+                                        * range shall be dealt with as
+                                        * having equal NUMA weight. Which
+                                        * will mean that we should select
+                                        * through that set by their
+                                        * available credits first
+                                        * followed by Round Robin.
+                                        *
+                                        * And since this is the first NI
+                                        * in the range, let's just set it
+                                        * as our best_ni for now. The
+                                        * following NIs found in the
+                                        * range will be dealt with as
+                                        * mentioned previously.
+                                        */
+                                       shortest_distance = lnet_get_numa_range();
+                                       if (!found_ir) {
+                                               found_ir = true;
+                                               goto set_ni;
+                                       }
+                               }
+                               /*
+                                * This NI is NUMA equidistant let's
+                                * select using credits followed by Round
+                                * Robin.
+                                */
+                               if (ni_credits < best_credits) {
+                                       continue;
+                               } else if (ni_credits == best_credits) {
+                                       if (best_ni) {
+                                               if (best_ni->ni_seq <= ni->ni_seq)
+                                                       continue;
+                                       }
+                               }
                        }
+set_ni:
+                       best_ni = ni;
+                       best_credits = ni_credits;
                }
+       }
+       /*
+        * if the peer is not MR capable, then we should always send to it
+        * using the first NI in the NET we determined.
+        */
+       if (!peer->lp_multi_rail && local_net != NULL)
+               best_ni = lnet_net2ni_locked(local_net->net_id, cpt);
 
-               CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
-                      libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
-                      lnet_msgtyp2str(msg->msg_type), msg->msg_len);
+       if (!best_ni) {
+               lnet_net_unlock(cpt);
+               LCONSOLE_WARN("No local ni found to send from to %s\n",
+                       libcfs_nid2str(dst_nid));
+               return -EINVAL;
+       }
+
+       /*
+        * Now that we selected the NI to use increment its sequence
+        * number so the Round Robin algorithm will detect that it has
+        * been used and pick the next NI.
+        */
+       best_ni->ni_seq++;
+
+       if (routing)
+               goto send;
+
+pick_peer:
+       if (best_ni == the_lnet.ln_loni) {
+               /* No send credit hassles with LOLND */
+               lnet_ni_addref_locked(best_ni, cpt);
+               msg->msg_hdr.dest_nid = cpu_to_le64(best_ni->ni_nid);
+               if (!msg->msg_routing)
+                       msg->msg_hdr.src_nid = cpu_to_le64(best_ni->ni_nid);
+               msg->msg_target.nid = best_ni->ni_nid;
+               lnet_msg_commit(msg, cpt);
+
+               lnet_net_unlock(cpt);
+               msg->msg_txni = best_ni;
+               lnet_ni_send(best_ni, msg);
 
-               if (src_ni == NULL) {
-                       src_ni = lnet_get_next_ni_locked(lp->lp_net, NULL);
-                       LASSERT(src_ni != NULL);
-                       src_nid = src_ni->ni_nid;
+               *lo_sent = true;
+               return 0;
+       }
+
+       lpni = NULL;
+
+       if (msg->msg_type == LNET_MSG_REPLY ||
+           msg->msg_type == LNET_MSG_ACK) {
+               /*
+                * for replies we want to respond on the same peer_ni we
+                * received the message on if possible. If not, then pick
+                * a peer_ni to send to
+                */
+               best_lpni = lnet_find_peer_ni_locked(dst_nid);
+               if (best_lpni) {
+                       lnet_peer_ni_decref_locked(best_lpni);
+                       goto send;
                } else {
-                       LASSERT (src_ni->ni_net == lp->lp_net);
+                       CDEBUG(D_NET, "unable to send msg_type %d to "
+                             "originating %s\n", msg->msg_type,
+                             libcfs_nid2str(dst_nid));
+               }
+       }
+
+       peer_net = lnet_peer_get_net_locked(peer,
+                                           best_ni->ni_net->net_id);
+       /*
+        * peer_net is not available or the src_nid is explicitly defined
+        * and the peer_net for that src_nid is unhealthy. find a route to
+        * the destination nid.
+        */
+       if (!peer_net ||
+           (src_nid != LNET_NID_ANY &&
+            !lnet_is_peer_net_healthy_locked(peer_net))) {
+               best_gw = lnet_find_route_locked(best_ni->ni_net,
+                                                dst_nid,
+                                                rtr_nid);
+               /*
+                * if no route is found for that network then
+                * move onto the next peer_ni in the peer
+                */
+               if (!best_gw) {
+                       lnet_net_unlock(cpt);
+                       LCONSOLE_WARN("No route to peer from %s\n",
+                               libcfs_nid2str(best_ni->ni_nid));
+                       return -EHOSTUNREACH;
                }
 
-               lnet_peer_addref_locked(lp);
+               CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
+                       libcfs_nid2str(dst_nid),
+                       libcfs_nid2str(best_gw->lpni_nid),
+                       lnet_msgtyp2str(msg->msg_type), msg->msg_len);
 
-               LASSERT(src_nid != LNET_NID_ANY);
-               lnet_msg_commit(msg, cpt);
+               best_lpni = lnet_find_peer_ni_locked(dst_nid);
+               LASSERT(best_lpni != NULL);
+               lnet_peer_ni_decref_locked(best_lpni);
 
-               if (!msg->msg_routing) {
-                       /* I'm the source and now I know which NI to send on */
-                       msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
+               routing = true;
+
+               goto send;
+       } else if (!lnet_is_peer_net_healthy_locked(peer_net)) {
+               /*
+                * this peer_net is unhealthy but we still have an opportunity
+                * to find another peer_net that we can use
+                */
+               __u32 net_id = peer_net->lpn_net_id;
+               lnet_net_unlock(cpt);
+               if (!best_lpni)
+                       LCONSOLE_WARN("peer net %s unhealthy\n",
+                                     libcfs_net2str(net_id));
+               goto again;
+       }
+
+       best_lpni = NULL;
+       while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
+               /*
+                * if this peer ni is not healthy just skip it, no point in
+                * examining it further
+                */
+               if (!lnet_is_peer_ni_healthy_locked(lpni))
+                       continue;
+               ni_is_pref = lnet_peer_is_ni_pref_locked(lpni, best_ni);
+
+               /* if this is a preferred peer use it */
+               if (!preferred && ni_is_pref) {
+                       preferred = true;
+               } else if (preferred && !ni_is_pref) {
+                       /*
+                        * this is not the preferred peer so let's ignore
+                        * it.
+                        */
+                       continue;
+               } if (lpni->lpni_txcredits < best_lpni_credits)
+                       /*
+                        * We already have a peer that has more credits
+                        * available than this one. No need to consider
+                        * this peer further.
+                        */
+                       continue;
+               else if (lpni->lpni_txcredits == best_lpni_credits) {
+                       /*
+                        * The best peer found so far and the current peer
+                        * have the same number of available credits let's
+                        * make sure to select between them using Round
+                        * Robin
+                        */
+                       if (best_lpni) {
+                               if (best_lpni->lpni_seq <= lpni->lpni_seq)
+                                       continue;
+                       }
                }
 
-               msg->msg_target_is_router = 1;
-               msg->msg_target.nid = lp->lp_nid;
-               msg->msg_target.pid = LNET_PID_LUSTRE;
+               best_lpni = lpni;
+               best_lpni_credits = lpni->lpni_txcredits;
        }
 
-       /* 'lp' is our best choice of peer */
+       /*
+        * Increment sequence number of the peer selected so that we can
+        * pick the next one in Round Robin.
+        */
+       best_lpni->lpni_seq++;
+
+       /* if we still can't find a peer ni then we can't reach it */
+       if (!best_lpni) {
+               __u32 net_id = peer_net->lpn_net_id;
+               lnet_net_unlock(cpt);
+               LCONSOLE_WARN("no peer_ni found on peer net %s\n",
+                               libcfs_net2str(net_id));
+               return -EHOSTUNREACH;
+       }
 
-       LASSERT(!msg->msg_peertxcredit);
-       LASSERT(!msg->msg_txcredit);
-       LASSERT(msg->msg_txpeer == NULL);
+send:
+       /*
+        * Use lnet_cpt_of_nid() to determine the CPT used to commit the
+        * message. This ensures that we get a CPT that is correct for
+        * the NI when the NI has been restricted to a subset of all CPTs.
+        * If the selected CPT differs from the one currently locked, we
+        * must unlock and relock the lnet_net_lock(), and then check whether
+        * the configuration has changed. We don't have a hold on the best_ni
+        * or best_peer_ni yet, and they may have vanished.
+        */
+       cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
+       if (cpt != cpt2) {
+               lnet_net_unlock(cpt);
+               cpt = cpt2;
+               lnet_net_lock(cpt);
+               seq2 = lnet_get_dlc_seq_locked();
+               if (seq2 != seq) {
+                       lnet_net_unlock(cpt);
+                       goto again;
+               }
+       }
 
-       msg->msg_txpeer = lp;                   /* msg takes my ref on lp */
-       /* set the NI for this message */
-       msg->msg_txni = src_ni;
+       /*
+        * store the best_lpni in the message right away to avoid having
+        * to do the same operation under different conditions
+        */
+       msg->msg_txpeer = (routing) ? best_gw : best_lpni;
+       msg->msg_txni = best_ni;
+       /*
+        * grab a reference for the best_ni since now it's in use in this
+        * send. the reference will need to be dropped when the message is
+        * finished in lnet_finalize()
+        */
        lnet_ni_addref_locked(msg->msg_txni, cpt);
+       lnet_peer_ni_addref_locked(msg->msg_txpeer);
+
+       /*
+        * set the destination nid in the message here because it's
+        * possible that we'd be sending to a different nid than the one
+        * originaly given.
+        */
+       msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
+
+       /*
+        * Always set the target.nid to the best peer picked. Either the
+        * nid will be one of the preconfigured NIDs, or the same NID as
+        * what was originaly set in the target or it will be the NID of
+        * a router if this message should be routed
+        */
+       msg->msg_target.nid = msg->msg_txpeer->lpni_nid;
+
+       /*
+        * lnet_msg_commit assigns the correct cpt to the message, which
+        * is used to decrement the correct refcount on the ni when it's
+        * time to return the credits
+        */
+       lnet_msg_commit(msg, cpt);
+
+       /*
+        * If we are routing the message then we don't need to overwrite
+        * the src_nid since it would've been set at the origin. Otherwise
+        * we are the originator so we need to set it.
+        */
+       if (!msg->msg_routing)
+               msg->msg_hdr.src_nid = cpu_to_le64(msg->msg_txni->ni_nid);
+
+       if (routing) {
+               msg->msg_target_is_router = 1;
+               msg->msg_target.pid = LNET_PID_LUSTRE;
+       }
 
        rc = lnet_post_send_locked(msg, 0);
+
        lnet_net_unlock(cpt);
 
-       if (rc < 0)
+       return rc;
+}
+
+int
+lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
+{
+       lnet_nid_t              dst_nid = msg->msg_target.nid;
+       int                     rc;
+       bool                    lo_sent = false;
+
+       /*
+        * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
+        * but we might want to use pre-determined router for ACK/REPLY
+        * in the future
+        */
+       /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
+       LASSERT (msg->msg_txpeer == NULL);
+       LASSERT (!msg->msg_sending);
+       LASSERT (!msg->msg_target_is_router);
+       LASSERT (!msg->msg_receiving);
+
+       msg->msg_sending = 1;
+
+       LASSERT(!msg->msg_tx_committed);
+
+       rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid, &lo_sent);
+       if (rc < 0 || lo_sent)
                return rc;
 
        if (rc == LNET_CREDIT_OK)
-               lnet_ni_send(src_ni, msg);
+               lnet_ni_send(msg->msg_txni, msg);
 
-       return 0; /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT */
+       /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT */
+       return 0;
 }
 
 void
@@ -1455,14 +1888,15 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
        hdr->msg.put.ptl_index  = le32_to_cpu(hdr->msg.put.ptl_index);
        hdr->msg.put.offset     = le32_to_cpu(hdr->msg.put.offset);
 
-       info.mi_id.nid  = hdr->src_nid;
+       /* Primary peer NID. */
+       info.mi_id.nid  = msg->msg_initiator;
        info.mi_id.pid  = hdr->src_pid;
        info.mi_opc     = LNET_MD_OP_PUT;
        info.mi_portal  = hdr->msg.put.ptl_index;
        info.mi_rlength = hdr->payload_length;
        info.mi_roffset = hdr->msg.put.offset;
        info.mi_mbits   = hdr->msg.put.match_bits;
-       info.mi_cpt     = msg->msg_rxpeer->lp_cpt;
+       info.mi_cpt     = lnet_cpt_of_nid(msg->msg_rxpeer->lpni_nid, ni);
 
        msg->msg_rx_ready_delay = ni->ni_net->net_lnd->lnd_eager_recv == NULL;
        ready_delay = msg->msg_rx_ready_delay;
@@ -1505,6 +1939,7 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
 {
        struct lnet_match_info  info;
        lnet_hdr_t              *hdr = &msg->msg_hdr;
+       lnet_process_id_t       source_id;
        struct lnet_handle_wire reply_wmd;
        int                     rc;
 
@@ -1514,7 +1949,10 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
        hdr->msg.get.sink_length  = le32_to_cpu(hdr->msg.get.sink_length);
        hdr->msg.get.src_offset   = le32_to_cpu(hdr->msg.get.src_offset);
 
-       info.mi_id.nid  = hdr->src_nid;
+       source_id.nid = hdr->src_nid;
+       source_id.pid = hdr->src_pid;
+       /* Primary peer NID */
+       info.mi_id.nid  = msg->msg_initiator;
        info.mi_id.pid  = hdr->src_pid;
        info.mi_opc     = LNET_MD_OP_GET;
        info.mi_portal  = hdr->msg.get.ptl_index;
@@ -1537,7 +1975,7 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
 
        reply_wmd = hdr->msg.get.return_wmd;
 
-       lnet_prep_send(msg, LNET_MSG_REPLY, info.mi_id,
+       lnet_prep_send(msg, LNET_MSG_REPLY, source_id,
                       msg->msg_offset, msg->msg_wanted);
 
        msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
@@ -1695,7 +2133,7 @@ lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
        if (!the_lnet.ln_routing)
                return -ECANCELED;
 
-       if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
+       if (msg->msg_rxpeer->lpni_rtrcredits <= 0 ||
            lnet_msg2bufpool(msg)->rbp_credits <= 0) {
                if (ni->ni_net->net_lnd->lnd_eager_recv == NULL) {
                        msg->msg_rx_ready_delay = 1;
@@ -1830,8 +2268,9 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
        lnet_pid_t     dest_pid;
        lnet_nid_t     dest_nid;
        lnet_nid_t     src_nid;
-       __u32          payload_length;
-       __u32          type;
+       struct lnet_peer_ni *lpni;
+       __u32          payload_length;
+       __u32          type;
 
        LASSERT (!in_interrupt ());
 
@@ -1989,21 +2428,24 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
                msg->msg_hdr.dest_pid   = dest_pid;
                msg->msg_hdr.payload_length = payload_length;
        }
+       /* Multi-Rail: Primary NID of source. */
+       msg->msg_initiator = lnet_peer_primary_nid(src_nid);
 
        lnet_net_lock(cpt);
-       rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
-       if (rc != 0) {
+       lpni = lnet_nid2peerni_locked(from_nid, cpt);
+       if (IS_ERR(lpni)) {
                lnet_net_unlock(cpt);
                CERROR("%s, src %s: Dropping %s "
-                      "(error %d looking up sender)\n",
+                      "(error %ld looking up sender)\n",
                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
-                      lnet_msgtyp2str(type), rc);
+                      lnet_msgtyp2str(type), PTR_ERR(lpni));
                lnet_msg_free(msg);
                if (rc == -ESHUTDOWN)
                        /* We are shutting down.  Don't do anything more */
                        return 0;
                goto drop;
        }
+       msg->msg_rxpeer = lpni;
        msg->msg_rxni = ni;
        lnet_ni_addref_locked(ni, cpt);
 
@@ -2090,8 +2532,7 @@ lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
                 * called lnet_drop_message(), so I just hang onto msg as well
                 * until that's done */
 
-               lnet_drop_message(msg->msg_rxni,
-                                 msg->msg_rxpeer->lp_cpt,
+               lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt,
                                  msg->msg_private, msg->msg_len);
                /*
                 * NB: message will not generate event because w/o attached MD,
@@ -2307,6 +2748,8 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
               libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
 
        /* setup information for lnet_build_msg_event */
+       msg->msg_initiator = lnet_peer_primary_nid(peer_id.nid);
+       /* Cheaper: msg->msg_initiator = getmsg->msg_txpeer->lp_nid; */
        msg->msg_from = peer_id.nid;
        msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
        msg->msg_hdr.src_nid = peer_id.nid;
@@ -2553,7 +2996,7 @@ LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
                        hops = shortest_hops;
                        if (srcnidp != NULL) {
                                ni = lnet_get_next_ni_locked(
-                                       shortest->lr_gateway->lp_net,
+                                       shortest->lr_gateway->lpni_net,
                                        NULL);
                                *srcnidp = ni->ni_nid;
                        }