Whamcloud - gitweb
LU-7734 lnet: proper cpt locking
authorAmir Shehata <amir.shehata@intel.com>
Thu, 9 Jun 2016 06:08:06 +0000 (23:08 -0700)
committerAmir Shehata <amir.shehata@intel.com>
Wed, 25 Jan 2017 03:10:15 +0000 (19:10 -0800)
1. add a per NI credits, which is just the total credits
   assigned on NI creation
2. Whenever percpt credits are added or decremented, we
   mirror that in the NI credits
3. We use the NI credits to determine best NI
4. After we have completed the peer_ni/ni selection we
   determine the cpt to use for locking:
cpt_of_nid(lpni->nid, ni)

The lpni_cpt is not enough to protect all the fields in the
lnet_peer_ni structure. This is due to the fact that multiple
NIs can talk to the same peer, and functions can be called with
different cpts locked. To properly protect the fields in the
lnet_peer_ni structure, a spin lock is introduced for the
purpose.

Signed-off-by: Amir Shehata <amir.shehata@intel.com>
Change-Id: Ief7868c3c8ff7e00ea9e908dd50d8cef77d9f9a4
Reviewed-on: http://review.whamcloud.com/20701

lnet/include/lnet/lib-types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-move.c
lnet/lnet/peer.c
lnet/lnet/router.c

index 2e4820d..76f5d31 100644 (file)
@@ -363,6 +363,9 @@ typedef struct lnet_ni {
        /* instance-specific data */
        void                    *ni_data;
 
        /* instance-specific data */
        void                    *ni_data;
 
+       /* per ni credits */
+       atomic_t                ni_tx_credits;
+
        /* percpt TX queues */
        struct lnet_tx_queue    **ni_tx_queues;
 
        /* percpt TX queues */
        struct lnet_tx_queue    **ni_tx_queues;
 
@@ -452,6 +455,8 @@ struct lnet_peer_ni {
        struct lnet_peer_net    *lpni_peer_net;
        /* statistics kept on each peer NI */
        struct lnet_element_stats lpni_stats;
        struct lnet_peer_net    *lpni_peer_net;
        /* statistics kept on each peer NI */
        struct lnet_element_stats lpni_stats;
+       /* spin lock protecting credits and lpni_txq / lpni_rtrq */
+       spinlock_t              lpni_lock;
        /* # tx credits available */
        int                     lpni_txcredits;
        /* low water mark */
        /* # tx credits available */
        int                     lpni_txcredits;
        /* low water mark */
@@ -461,16 +466,16 @@ struct lnet_peer_ni {
        /* low water mark */
        int                     lpni_minrtrcredits;
        /* alive/dead? */
        /* low water mark */
        int                     lpni_minrtrcredits;
        /* alive/dead? */
-       unsigned int            lpni_alive:1;
+       bool                    lpni_alive;
        /* notification outstanding? */
        /* notification outstanding? */
-       unsigned int            lpni_notify:1;
+       bool                    lpni_notify;
        /* outstanding notification for LND? */
        /* outstanding notification for LND? */
-       unsigned int            lpni_notifylnd:1;
+       bool                    lpni_notifylnd;
        /* some thread is handling notification */
        /* some thread is handling notification */
-       unsigned int            lpni_notifying:1;
+       bool                    lpni_notifying;
        /* SEND event outstanding from ping */
        /* SEND event outstanding from ping */
-       unsigned int            lpni_ping_notsent;
-       /* # times router went dead<->alive */
+       bool                    lpni_ping_notsent;
+       /* # times router went dead<->alive. Protected with lpni_lock */
        int                     lpni_alive_count;
        /* bytes queued for sending */
        long                    lpni_txqnob;
        int                     lpni_alive_count;
        /* bytes queued for sending */
        long                    lpni_txqnob;
@@ -498,7 +503,7 @@ struct lnet_peer_ni {
        __u32                   lpni_seq;
        /* health flag */
        bool                    lpni_healthy;
        __u32                   lpni_seq;
        /* health flag */
        bool                    lpni_healthy;
-       /* returned RC ping features */
+       /* returned RC ping features. Protected with lpni_lock */
        unsigned int            lpni_ping_feats;
        /* routes on this peer */
        struct list_head        lpni_routes;
        unsigned int            lpni_ping_feats;
        /* routes on this peer */
        struct list_head        lpni_routes;
index 6a213b3..8d318cd 100644 (file)
@@ -1444,6 +1444,9 @@ lnet_startup_lndni(struct lnet_ni *ni, struct lnet_lnd_tunables *tun)
                tq->tq_credits = lnet_ni_tq_credits(ni);
        }
 
                tq->tq_credits = lnet_ni_tq_credits(ni);
        }
 
+       atomic_set(&ni->ni_tx_credits,
+                  lnet_ni_tq_credits(ni) * ni->ni_ncpts);
+
        CDEBUG(D_LNI, "Added LNI %s [%d/%d/%d/%d]\n",
                libcfs_nid2str(ni->ni_nid),
                ni->ni_net->net_tunables.lct_peer_tx_credits,
        CDEBUG(D_LNI, "Added LNI %s [%d/%d/%d/%d]\n",
                libcfs_nid2str(ni->ni_nid),
                ni->ni_net->net_tunables.lct_peer_tx_credits,
index ee167ea..ce62ff9 100644 (file)
@@ -674,18 +674,26 @@ lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
        return rc;
 }
 
        return rc;
 }
 
-/* NB: caller shall hold a ref on 'lp' as I'd drop lnet_net_lock */
+/*
+ * This function can be called from two paths:
+ *     1. when sending a message
+ *     2. when decommiting a message (lnet_msg_decommit_tx())
+ * In both these cases the peer_ni should have it's reference count
+ * acquired by the caller and therefore it is safe to drop the spin
+ * lock before calling lnd_query()
+ */
 static void
 lnet_ni_query_locked(lnet_ni_t *ni, struct lnet_peer_ni *lp)
 {
        cfs_time_t last_alive = 0;
 static void
 lnet_ni_query_locked(lnet_ni_t *ni, struct lnet_peer_ni *lp)
 {
        cfs_time_t last_alive = 0;
+       int cpt = lnet_cpt_of_nid_locked(lp->lpni_nid, ni);
 
        LASSERT(lnet_peer_aliveness_enabled(lp));
        LASSERT(ni->ni_net->net_lnd->lnd_query != NULL);
 
 
        LASSERT(lnet_peer_aliveness_enabled(lp));
        LASSERT(ni->ni_net->net_lnd->lnd_query != NULL);
 
-       lnet_net_unlock(lp->lpni_cpt);
+       lnet_net_unlock(cpt);
        (ni->ni_net->net_lnd->lnd_query)(ni, lp->lpni_nid, &last_alive);
        (ni->ni_net->net_lnd->lnd_query)(ni, lp->lpni_nid, &last_alive);
-       lnet_net_lock(lp->lpni_cpt);
+       lnet_net_lock(cpt);
 
        lp->lpni_last_query = cfs_time_current();
 
 
        lp->lpni_last_query = cfs_time_current();
 
@@ -706,9 +714,12 @@ lnet_peer_is_alive (struct lnet_peer_ni *lp, cfs_time_t now)
         * Trust lnet_notify() if it has more recent aliveness news, but
         * ignore the initial assumed death (see lnet_peers_start_down()).
         */
         * Trust lnet_notify() if it has more recent aliveness news, but
         * ignore the initial assumed death (see lnet_peers_start_down()).
         */
+       spin_lock(&lp->lpni_lock);
        if (!lp->lpni_alive && lp->lpni_alive_count > 0 &&
        if (!lp->lpni_alive && lp->lpni_alive_count > 0 &&
-           cfs_time_aftereq(lp->lpni_timestamp, lp->lpni_last_alive))
+           cfs_time_aftereq(lp->lpni_timestamp, lp->lpni_last_alive)) {
+               spin_unlock(&lp->lpni_lock);
                return 0;
                return 0;
+       }
 
        deadline =
          cfs_time_add(lp->lpni_last_alive,
 
        deadline =
          cfs_time_add(lp->lpni_last_alive,
@@ -722,8 +733,12 @@ lnet_peer_is_alive (struct lnet_peer_ni *lp, cfs_time_t now)
         * case, and moreover lpni_last_alive at peer creation is assumed.
         */
        if (alive && !lp->lpni_alive &&
         * case, and moreover lpni_last_alive at peer creation is assumed.
         */
        if (alive && !lp->lpni_alive &&
-           !(lnet_isrouter(lp) && lp->lpni_alive_count == 0))
+           !(lnet_isrouter(lp) && lp->lpni_alive_count == 0)) {
+               spin_unlock(&lp->lpni_lock);
                lnet_notify_locked(lp, 0, 1, lp->lpni_last_alive);
                lnet_notify_locked(lp, 0, 1, lp->lpni_last_alive);
+       } else {
+               spin_unlock(&lp->lpni_lock);
+       }
 
        return alive;
 }
 
        return alive;
 }
@@ -857,6 +872,7 @@ lnet_post_send_locked(lnet_msg_t *msg, int do_send)
 
                msg->msg_txcredit = 1;
                tq->tq_credits--;
 
                msg->msg_txcredit = 1;
                tq->tq_credits--;
+               atomic_dec(&ni->ni_tx_credits);
 
                if (tq->tq_credits < tq->tq_credits_min)
                        tq->tq_credits_min = tq->tq_credits;
 
                if (tq->tq_credits < tq->tq_credits_min)
                        tq->tq_credits_min = tq->tq_credits;
@@ -989,6 +1005,7 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
                        !list_empty(&tq->tq_delayed));
 
                tq->tq_credits++;
                        !list_empty(&tq->tq_delayed));
 
                tq->tq_credits++;
+               atomic_inc(&ni->ni_tx_credits);
                if (tq->tq_credits <= 0) {
                        msg2 = list_entry(tq->tq_delayed.next,
                                          lnet_msg_t, msg_list);
                if (tq->tq_credits <= 0) {
                        msg2 = list_entry(tq->tq_delayed.next,
                                          lnet_msg_t, msg_list);
@@ -1452,9 +1469,13 @@ again:
                 *      3. Round Robin
                 */
                while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
                 *      3. Round Robin
                 */
                while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
+                       int ni_credits;
+
                        if (!lnet_is_ni_healthy_locked(ni))
                                continue;
 
                        if (!lnet_is_ni_healthy_locked(ni))
                                continue;
 
+                       ni_credits = atomic_read(&ni->ni_tx_credits);
+
                        /*
                         * calculate the distance from the cpt on which
                         * the message memory is allocated to the CPT of
                        /*
                         * calculate the distance from the cpt on which
                         * the message memory is allocated to the CPT of
@@ -1530,11 +1551,9 @@ again:
                                 * select using credits followed by Round
                                 * Robin.
                                 */
                                 * select using credits followed by Round
                                 * Robin.
                                 */
-                               if (ni->ni_tx_queues[cpt]->tq_credits <
-                                       best_credits) {
+                               if (ni_credits < best_credits) {
                                        continue;
                                        continue;
-                               } else if (ni->ni_tx_queues[cpt]->tq_credits ==
-                                               best_credits) {
+                               } else if (ni_credits == best_credits) {
                                        if (best_ni) {
                                                if (best_ni->ni_seq <= ni->ni_seq)
                                                        continue;
                                        if (best_ni) {
                                                if (best_ni->ni_seq <= ni->ni_seq)
                                                        continue;
@@ -1543,7 +1562,7 @@ again:
                        }
 set_ni:
                        best_ni = ni;
                        }
 set_ni:
                        best_ni = ni;
-                       best_credits = ni->ni_tx_queues[cpt]->tq_credits;
+                       best_credits = ni_credits;
                }
        }
        /*
                }
        }
        /*
@@ -1717,13 +1736,15 @@ pick_peer:
 
 send:
        /*
 
 send:
        /*
-        * determine the cpt to use and if it has changed then
-        * lock the new cpt and check if the config has changed.
-        * If it has changed then repeat the algorithm since the
-        * ni or peer list could have changed and the algorithm
-        * would endup picking a different ni/peer_ni pair.
+        * Use lnet_cpt_of_nid() to determine the CPT used to commit the
+        * message. This ensures that we get a CPT that is correct for
+        * the NI when the NI has been restricted to a subset of all CPTs.
+        * If the selected CPT differs from the one currently locked, we
+        * must unlock and relock the lnet_net_lock(), and then check whether
+        * the configuration has changed. We don't have a hold on the best_ni
+        * or best_peer_ni yet, and they may have vanished.
         */
         */
-       cpt2 = best_lpni->lpni_cpt;
+       cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
        if (cpt != cpt2) {
                lnet_net_unlock(cpt);
                cpt = cpt2;
        if (cpt != cpt2) {
                lnet_net_unlock(cpt);
                cpt = cpt2;
@@ -1875,7 +1896,7 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
        info.mi_rlength = hdr->payload_length;
        info.mi_roffset = hdr->msg.put.offset;
        info.mi_mbits   = hdr->msg.put.match_bits;
        info.mi_rlength = hdr->payload_length;
        info.mi_roffset = hdr->msg.put.offset;
        info.mi_mbits   = hdr->msg.put.match_bits;
-       info.mi_cpt     = msg->msg_rxpeer->lpni_cpt;
+       info.mi_cpt     = lnet_cpt_of_nid(msg->msg_rxpeer->lpni_nid, ni);
 
        msg->msg_rx_ready_delay = ni->ni_net->net_lnd->lnd_eager_recv == NULL;
        ready_delay = msg->msg_rx_ready_delay;
 
        msg->msg_rx_ready_delay = ni->ni_net->net_lnd->lnd_eager_recv == NULL;
        ready_delay = msg->msg_rx_ready_delay;
@@ -2511,8 +2532,7 @@ lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
                 * called lnet_drop_message(), so I just hang onto msg as well
                 * until that's done */
 
                 * called lnet_drop_message(), so I just hang onto msg as well
                 * until that's done */
 
-               lnet_drop_message(msg->msg_rxni,
-                                 msg->msg_rxpeer->lpni_cpt,
+               lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt,
                                  msg->msg_private, msg->msg_len);
                /*
                 * NB: message will not generate event because w/o attached MD,
                                  msg->msg_private, msg->msg_len);
                /*
                 * NB: message will not generate event because w/o attached MD,
index 56584fc..d5d4986 100644 (file)
@@ -152,6 +152,8 @@ lnet_peer_ni_alloc(lnet_nid_t nid)
        INIT_LIST_HEAD(&lpni->lpni_on_peer_net_list);
        INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
 
        INIT_LIST_HEAD(&lpni->lpni_on_peer_net_list);
        INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
 
+       spin_lock_init(&lpni->lpni_lock);
+
        lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
        lpni->lpni_last_alive = cfs_time_current(); /* assumes alive */
        lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
        lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
        lpni->lpni_last_alive = cfs_time_current(); /* assumes alive */
        lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
index 3fdaa21..71903cf 100644 (file)
@@ -106,11 +106,20 @@ lnet_notify_locked(struct lnet_peer_ni *lp, int notifylnd, int alive,
                return;
        }
 
                return;
        }
 
+       /*
+        * This function can be called with different cpt locks being
+        * held. lpni_alive_count modification needs to be properly protected.
+        * Significant reads to lpni_alive_count are also protected with
+        * the same lock
+        */
+       spin_lock(&lp->lpni_lock);
+
        lp->lpni_timestamp = when;                /* update timestamp */
        lp->lpni_ping_deadline = 0;               /* disable ping timeout */
 
        if (lp->lpni_alive_count != 0 &&          /* got old news */
            (!lp->lpni_alive) == (!alive)) {      /* new date for old news */
        lp->lpni_timestamp = when;                /* update timestamp */
        lp->lpni_ping_deadline = 0;               /* disable ping timeout */
 
        if (lp->lpni_alive_count != 0 &&          /* got old news */
            (!lp->lpni_alive) == (!alive)) {      /* new date for old news */
+               spin_unlock(&lp->lpni_lock);
                CDEBUG(D_NET, "Old news\n");
                return;
        }
                CDEBUG(D_NET, "Old news\n");
                return;
        }
@@ -118,30 +127,43 @@ lnet_notify_locked(struct lnet_peer_ni *lp, int notifylnd, int alive,
        /* Flag that notification is outstanding */
 
        lp->lpni_alive_count++;
        /* Flag that notification is outstanding */
 
        lp->lpni_alive_count++;
-       lp->lpni_alive = !(!alive);               /* 1 bit! */
+       lp->lpni_alive = (alive) ? 1 : 0;
        lp->lpni_notify = 1;
        lp->lpni_notify = 1;
-       lp->lpni_notifylnd |= notifylnd;
+       lp->lpni_notifylnd = notifylnd;
        if (lp->lpni_alive)
                lp->lpni_ping_feats = LNET_PING_FEAT_INVAL; /* reset */
 
        if (lp->lpni_alive)
                lp->lpni_ping_feats = LNET_PING_FEAT_INVAL; /* reset */
 
+       spin_unlock(&lp->lpni_lock);
+
        CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lpni_nid), alive);
 }
 
        CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lpni_nid), alive);
 }
 
+/*
+ * This function will always be called with lp->lpni_cpt lock held.
+ */
 static void
 lnet_ni_notify_locked(lnet_ni_t *ni, struct lnet_peer_ni *lp)
 {
 static void
 lnet_ni_notify_locked(lnet_ni_t *ni, struct lnet_peer_ni *lp)
 {
-       int        alive;
-       int        notifylnd;
+       int alive;
+       int notifylnd;
 
        /* Notify only in 1 thread at any time to ensure ordered notification.
         * NB individual events can be missed; the only guarantee is that you
         * always get the most recent news */
 
 
        /* Notify only in 1 thread at any time to ensure ordered notification.
         * NB individual events can be missed; the only guarantee is that you
         * always get the most recent news */
 
-       if (lp->lpni_notifying || ni == NULL)
+       spin_lock(&lp->lpni_lock);
+
+       if (lp->lpni_notifying || ni == NULL) {
+               spin_unlock(&lp->lpni_lock);
                return;
                return;
+       }
 
        lp->lpni_notifying = 1;
 
 
        lp->lpni_notifying = 1;
 
+       /*
+        * lp->lpni_notify needs to be protected because it can be set in
+        * lnet_notify_locked().
+        */
        while (lp->lpni_notify) {
                alive     = lp->lpni_alive;
                notifylnd = lp->lpni_notifylnd;
        while (lp->lpni_notify) {
                alive     = lp->lpni_alive;
                notifylnd = lp->lpni_notifylnd;
@@ -150,6 +172,7 @@ lnet_ni_notify_locked(lnet_ni_t *ni, struct lnet_peer_ni *lp)
                lp->lpni_notify    = 0;
 
                if (notifylnd && ni->ni_net->net_lnd->lnd_notify != NULL) {
                lp->lpni_notify    = 0;
 
                if (notifylnd && ni->ni_net->net_lnd->lnd_notify != NULL) {
+                       spin_unlock(&lp->lpni_lock);
                        lnet_net_unlock(lp->lpni_cpt);
 
                        /* A new notification could happen now; I'll handle it
                        lnet_net_unlock(lp->lpni_cpt);
 
                        /* A new notification could happen now; I'll handle it
@@ -159,13 +182,14 @@ lnet_ni_notify_locked(lnet_ni_t *ni, struct lnet_peer_ni *lp)
                                                          alive);
 
                        lnet_net_lock(lp->lpni_cpt);
                                                          alive);
 
                        lnet_net_lock(lp->lpni_cpt);
+                       spin_lock(&lp->lpni_lock);
                }
        }
 
        lp->lpni_notifying = 0;
                }
        }
 
        lp->lpni_notifying = 0;
+       spin_unlock(&lp->lpni_lock);
 }
 
 }
 
-
 static void
 lnet_rtr_addref_locked(struct lnet_peer_ni *lp)
 {
 static void
 lnet_rtr_addref_locked(struct lnet_peer_ni *lp)
 {
@@ -656,6 +680,12 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
        if (!gw->lpni_alive)
                return;
 
        if (!gw->lpni_alive)
                return;
 
+       /*
+        * Protect gw->lpni_ping_feats. This can be set from
+        * lnet_notify_locked with different locks being held
+        */
+       spin_lock(&gw->lpni_lock);
+
        if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
                lnet_swap_pinginfo(info);
 
        if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
                lnet_swap_pinginfo(info);
 
@@ -664,6 +694,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
                CDEBUG(D_NET, "%s: Unexpected magic %08x\n",
                       libcfs_nid2str(gw->lpni_nid), info->pi_magic);
                gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
                CDEBUG(D_NET, "%s: Unexpected magic %08x\n",
                       libcfs_nid2str(gw->lpni_nid), info->pi_magic);
                gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
+               spin_unlock(&gw->lpni_lock);
                return;
        }
 
                return;
        }
 
@@ -671,11 +702,14 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
        if ((gw->lpni_ping_feats & LNET_PING_FEAT_MASK) == 0) {
                CDEBUG(D_NET, "%s: Unexpected features 0x%x\n",
                       libcfs_nid2str(gw->lpni_nid), gw->lpni_ping_feats);
        if ((gw->lpni_ping_feats & LNET_PING_FEAT_MASK) == 0) {
                CDEBUG(D_NET, "%s: Unexpected features 0x%x\n",
                       libcfs_nid2str(gw->lpni_nid), gw->lpni_ping_feats);
+               spin_unlock(&gw->lpni_lock);
                return; /* nothing I can understand */
        }
 
                return; /* nothing I can understand */
        }
 
-       if ((gw->lpni_ping_feats & LNET_PING_FEAT_NI_STATUS) == 0)
+       if ((gw->lpni_ping_feats & LNET_PING_FEAT_NI_STATUS) == 0) {
+               spin_unlock(&gw->lpni_lock);
                return; /* can't carry NI status info */
                return; /* can't carry NI status info */
+       }
 
        list_for_each_entry(rte, &gw->lpni_routes, lr_gwlist) {
                int     down = 0;
 
        list_for_each_entry(rte, &gw->lpni_routes, lr_gwlist) {
                int     down = 0;
@@ -695,6 +729,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
                                CDEBUG(D_NET, "%s: unexpected LNET_NID_ANY\n",
                                       libcfs_nid2str(gw->lpni_nid));
                                gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
                                CDEBUG(D_NET, "%s: unexpected LNET_NID_ANY\n",
                                       libcfs_nid2str(gw->lpni_nid));
                                gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
+                               spin_unlock(&gw->lpni_lock);
                                return;
                        }
 
                                return;
                        }
 
@@ -717,6 +752,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
                        CDEBUG(D_NET, "%s: Unexpected status 0x%x\n",
                               libcfs_nid2str(gw->lpni_nid), stat->ns_status);
                        gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
                        CDEBUG(D_NET, "%s: Unexpected status 0x%x\n",
                               libcfs_nid2str(gw->lpni_nid), stat->ns_status);
                        gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
+                       spin_unlock(&gw->lpni_lock);
                        return;
                }
 
                        return;
                }
 
@@ -731,13 +767,15 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
 
                rte->lr_downis = down;
        }
 
                rte->lr_downis = down;
        }
+
+       spin_unlock(&gw->lpni_lock);
 }
 
 static void
 lnet_router_checker_event(lnet_event_t *event)
 {
 }
 
 static void
 lnet_router_checker_event(lnet_event_t *event)
 {
-       lnet_rc_data_t          *rcd = event->md.user_ptr;
-       struct lnet_peer_ni     *lp;
+       lnet_rc_data_t *rcd = event->md.user_ptr;
+       struct lnet_peer_ni *lp;
 
        LASSERT(rcd != NULL);
 
 
        LASSERT(rcd != NULL);
 
@@ -803,10 +841,14 @@ lnet_wait_known_routerstate(void)
                        rtr = list_entry(entry, struct lnet_peer_ni,
                                         lpni_rtr_list);
 
                        rtr = list_entry(entry, struct lnet_peer_ni,
                                         lpni_rtr_list);
 
+                       spin_lock(&rtr->lpni_lock);
+
                        if (rtr->lpni_alive_count == 0) {
                                all_known = 0;
                        if (rtr->lpni_alive_count == 0) {
                                all_known = 0;
+                               spin_unlock(&rtr->lpni_lock);
                                break;
                        }
                                break;
                        }
+                       spin_unlock(&rtr->lpni_lock);
                }
 
                lnet_net_unlock(cpt);
                }
 
                lnet_net_unlock(cpt);
@@ -1723,9 +1765,9 @@ lnet_rtrpools_disable(void)
 int
 lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
 {
 int
 lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
 {
-       struct lnet_peer_ni     *lp = NULL;
-       cfs_time_t              now = cfs_time_current();
-       int                     cpt = lnet_cpt_of_nid(nid, ni);
+       struct lnet_peer_ni *lp = NULL;
+       cfs_time_t now = cfs_time_current();
+       int cpt = lnet_cpt_of_nid(nid, ni);
 
        LASSERT (!in_interrupt ());
 
 
        LASSERT (!in_interrupt ());
 
@@ -1773,6 +1815,18 @@ lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
                return 0;
        }
 
                return 0;
        }
 
+       /*
+        * It is possible for this function to be called for the same peer
+        * but with different NIs. We want to synchronize the notification
+        * between the different calls. So we will use the lpni_cpt to
+        * grab the net lock.
+        */
+       if (lp->lpni_cpt != cpt) {
+               lnet_net_unlock(cpt);
+               cpt = lp->lpni_cpt;
+               lnet_net_lock(cpt);
+       }
+
        /* We can't fully trust LND on reporting exact peer last_alive
         * if he notifies us about dead peer. For example ksocklnd can
         * call us with when == _time_when_the_node_was_booted_ if
        /* We can't fully trust LND on reporting exact peer last_alive
         * if he notifies us about dead peer. For example ksocklnd can
         * call us with when == _time_when_the_node_was_booted_ if