Whamcloud - gitweb
LU-7734 lnet: protect peer_ni credits
authorAmir Shehata <amir.shehata@intel.com>
Thu, 9 Jun 2016 08:17:45 +0000 (01:17 -0700)
committerAmir Shehata <amir.shehata@intel.com>
Wed, 25 Jan 2017 03:10:15 +0000 (19:10 -0800)
Currently multiple NIs can talk to the same peer_ni. The per-CPT
lnet_net_lock therefore no longer protects the lpni against
concurrent updates. To resolve this issue a spinlock is added
to the lnet_peer_ni, which must be locked when the peer NI
credits, delayed message queue, and delayed routed message queue
are modified. The lock is not taken when reporting credits.

Signed-off-by: Amir Shehata <amir.shehata@intel.com>
Signed-off-by: Olaf Weber <olaf@sgi.com>
Change-Id: I52153680a74d43e595314b63487026cc3f6a5a8f
Reviewed-on: http://review.whamcloud.com/20702

lnet/include/lnet/lib-types.h
lnet/lnet/lib-move.c
lnet/lnet/peer.c
lnet/lnet/router.c
lnet/lnet/router_proc.c

index 76f5d31..bbf0ab4 100644 (file)
@@ -465,6 +465,8 @@ struct lnet_peer_ni {
        int                     lpni_rtrcredits;
        /* low water mark */
        int                     lpni_minrtrcredits;
+       /* bytes queued for sending */
+       long                    lpni_txqnob;
        /* alive/dead? */
        bool                    lpni_alive;
        /* notification outstanding? */
@@ -477,8 +479,6 @@ struct lnet_peer_ni {
        bool                    lpni_ping_notsent;
        /* # times router went dead<->alive. Protected with lpni_lock */
        int                     lpni_alive_count;
-       /* bytes queued for sending */
-       long                    lpni_txqnob;
        /* time of last aliveness news */
        cfs_time_t              lpni_timestamp;
        /* time of last ping attempt */
index ce62ff9..04d540e 100644 (file)
@@ -849,6 +849,7 @@ lnet_post_send_locked(lnet_msg_t *msg, int do_send)
        }
 
        if (!msg->msg_peertxcredit) {
+               spin_lock(&lp->lpni_lock);
                LASSERT((lp->lpni_txcredits < 0) ==
                        !list_empty(&lp->lpni_txq));
 
@@ -862,8 +863,10 @@ lnet_post_send_locked(lnet_msg_t *msg, int do_send)
                if (lp->lpni_txcredits < 0) {
                        msg->msg_tx_delayed = 1;
                        list_add_tail(&msg->msg_list, &lp->lpni_txq);
+                       spin_unlock(&lp->lpni_lock);
                        return LNET_CREDIT_WAIT;
                }
+               spin_unlock(&lp->lpni_lock);
        }
 
        if (!msg->msg_txcredit) {
@@ -935,6 +938,7 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
        LASSERT(!do_recv || msg->msg_rx_delayed);
 
        if (!msg->msg_peerrtrcredit) {
+               spin_lock(&lp->lpni_lock);
                LASSERT((lp->lpni_rtrcredits < 0) ==
                        !list_empty(&lp->lpni_rtrq));
 
@@ -948,8 +952,10 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
                        LASSERT(msg->msg_rx_ready_delay);
                        msg->msg_rx_delayed = 1;
                        list_add_tail(&msg->msg_list, &lp->lpni_rtrq);
+                       spin_unlock(&lp->lpni_lock);
                        return LNET_CREDIT_WAIT;
                }
+               spin_unlock(&lp->lpni_lock);
        }
 
        rbp = lnet_msg2bufpool(msg);
@@ -1013,6 +1019,7 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
 
                        LASSERT(msg2->msg_txni == ni);
                        LASSERT(msg2->msg_tx_delayed);
+                       LASSERT(msg2->msg_tx_cpt == msg->msg_tx_cpt);
 
                        (void) lnet_post_send_locked(msg2, 1);
                }
@@ -1022,24 +1029,36 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
                /* give back peer txcredits */
                msg->msg_peertxcredit = 0;
 
+               spin_lock(&txpeer->lpni_lock);
                LASSERT((txpeer->lpni_txcredits < 0) ==
                        !list_empty(&txpeer->lpni_txq));
 
                txpeer->lpni_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
-               LASSERT (txpeer->lpni_txqnob >= 0);
+               LASSERT(txpeer->lpni_txqnob >= 0);
 
                txpeer->lpni_txcredits++;
                if (txpeer->lpni_txcredits <= 0) {
                        msg2 = list_entry(txpeer->lpni_txq.next,
                                               lnet_msg_t, msg_list);
                        list_del(&msg2->msg_list);
+                       spin_unlock(&txpeer->lpni_lock);
 
                        LASSERT(msg2->msg_txpeer == txpeer);
                        LASSERT(msg2->msg_tx_delayed);
 
-                       (void) lnet_post_send_locked(msg2, 1);
+                       if (msg2->msg_tx_cpt != msg->msg_tx_cpt) {
+                               lnet_net_unlock(msg->msg_tx_cpt);
+                               lnet_net_lock(msg2->msg_tx_cpt);
+                       }
+                        (void) lnet_post_send_locked(msg2, 1);
+                       if (msg2->msg_tx_cpt != msg->msg_tx_cpt) {
+                               lnet_net_unlock(msg2->msg_tx_cpt);
+                               lnet_net_lock(msg->msg_tx_cpt);
+                       }
+                } else {
+                       spin_unlock(&txpeer->lpni_lock);
                }
-       }
+        }
 
        if (txni != NULL) {
                msg->msg_txni = NULL;
@@ -1078,17 +1097,12 @@ lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp)
 void
 lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
 {
-       lnet_msg_t       *msg;
-       lnet_msg_t       *tmp;
-       struct list_head drop;
-
-       INIT_LIST_HEAD(&drop);
-
-       list_splice_init(list, &drop);
+       lnet_msg_t *msg;
+       lnet_msg_t *tmp;
 
        lnet_net_unlock(cpt);
 
-       list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
+       list_for_each_entry_safe(msg, tmp, list, msg_list) {
                lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
                             0, 0, 0, msg->msg_hdr.payload_length);
                list_del_init(&msg->msg_list);
@@ -1154,6 +1168,7 @@ routing_off:
                /* give back peer router credits */
                msg->msg_peerrtrcredit = 0;
 
+               spin_lock(&rxpeer->lpni_lock);
                LASSERT((rxpeer->lpni_rtrcredits < 0) ==
                        !list_empty(&rxpeer->lpni_rtrq));
 
@@ -1162,14 +1177,19 @@ routing_off:
                /* drop all messages which are queued to be routed on that
                 * peer. */
                if (!the_lnet.ln_routing) {
-                       lnet_drop_routed_msgs_locked(&rxpeer->lpni_rtrq,
-                                                    msg->msg_rx_cpt);
+                       struct list_head drop;
+                       INIT_LIST_HEAD(&drop);
+                       list_splice_init(&rxpeer->lpni_rtrq, &drop);
+                       spin_unlock(&rxpeer->lpni_lock);
+                       lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt);
                } else if (rxpeer->lpni_rtrcredits <= 0) {
                        msg2 = list_entry(rxpeer->lpni_rtrq.next,
                                          lnet_msg_t, msg_list);
                        list_del(&msg2->msg_list);
-
+                       spin_unlock(&rxpeer->lpni_lock);
                        (void) lnet_post_routed_recv_locked(msg2, 1);
+               } else {
+                       spin_unlock(&rxpeer->lpni_lock);
                }
        }
        if (rxni != NULL) {
@@ -1695,14 +1715,14 @@ pick_peer:
                         * it.
                         */
                        continue;
-               } if (lpni->lpni_txcredits < best_lpni_credits)
+               } else if (lpni->lpni_txcredits < best_lpni_credits) {
                        /*
                         * We already have a peer that has more credits
                         * available than this one. No need to consider
                         * this peer further.
                         */
                        continue;
-               else if (lpni->lpni_txcredits == best_lpni_credits) {
+               else if (lpni->lpni_txcredits == best_lpni_credits) {
                        /*
                         * The best peer found so far and the current peer
                         * have the same number of available credits let's
index d5d4986..827fa62 100644 (file)
@@ -56,12 +56,15 @@ lnet_peer_net_added(struct lnet_net *net)
 
                if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) {
                        lpni->lpni_net = net;
+
+                       spin_lock(&lpni->lpni_lock);
                        lpni->lpni_txcredits =
-                       lpni->lpni_mintxcredits =
                                lpni->lpni_net->net_tunables.lct_peer_tx_credits;
+                       lpni->lpni_mintxcredits = lpni->lpni_txcredits;
                        lpni->lpni_rtrcredits =
-                       lpni->lpni_minrtrcredits =
                                lnet_peer_buffer_credits(lpni->lpni_net);
+                       lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
+                       spin_unlock(&lpni->lpni_lock);
 
                        lnet_peer_remove_from_remote_list(lpni);
                }
index 71903cf..c0dd0a9 100644 (file)
@@ -1398,7 +1398,8 @@ lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp, int cpt)
        INIT_LIST_HEAD(&tmp);
 
        lnet_net_lock(cpt);
-       lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt);
+       list_splice_init(&rbp->rbp_msgs, &tmp);
+       lnet_drop_routed_msgs_locked(&tmp, cpt);
        list_splice_init(&rbp->rbp_bufs, &tmp);
        rbp->rbp_req_nbuffers = 0;
        rbp->rbp_nbuffers = rbp->rbp_credits = 0;
index b2dfe42..7636202 100644 (file)
@@ -513,7 +513,7 @@ proc_lnet_peers(struct ctl_table *table, int write, void __user *buffer,
                                aliveness = peer->lpni_alive ? "up" : "down";
 
                        if (lnet_peer_aliveness_enabled(peer)) {
-                               cfs_time_t     now = cfs_time_current();
+                               cfs_time_t now = cfs_time_current();
                                cfs_duration_t delta;
 
                                delta = cfs_time_sub(now, peer->lpni_last_alive);