From: Amir Shehata Date: Thu, 9 Jun 2016 08:17:45 +0000 (-0700) Subject: LU-7734 lnet: protect peer_ni credits X-Git-Tag: 2.9.53~47^2~20 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=e1526ef2b8446fbd9a5f8f8d6f2d5e02d1982a75 LU-7734 lnet: protect peer_ni credits Currently multiple NIs can talk to the same peer_ni. The per-CPT lnet_net_lock therefore no longer protects the lpni against concurrent updates. To resolve this issue a spinlock is added to the lnet_peer_ni, which must be locked when the peer NI credits, delayed message queue, and delayed routed message queue are modified. The lock is not taken when reporting credits. Signed-off-by: Amir Shehata Signed-off-by: Olaf Weber Change-Id: I52153680a74d43e595314b63487026cc3f6a5a8f Reviewed-on: http://review.whamcloud.com/20702 --- diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 76f5d31..bbf0ab4 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -465,6 +465,8 @@ struct lnet_peer_ni { int lpni_rtrcredits; /* low water mark */ int lpni_minrtrcredits; + /* bytes queued for sending */ + long lpni_txqnob; /* alive/dead? */ bool lpni_alive; /* notification outstanding? */ @@ -477,8 +479,6 @@ struct lnet_peer_ni { bool lpni_ping_notsent; /* # times router went dead<->alive. Protected with lpni_lock */ int lpni_alive_count; - /* bytes queued for sending */ - long lpni_txqnob; /* time of last aliveness news */ cfs_time_t lpni_timestamp; /* time of last ping attempt */ diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index ce62ff9..04d540e1 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -849,6 +849,7 @@ lnet_post_send_locked(lnet_msg_t *msg, int do_send) } if (!msg->msg_peertxcredit) { + spin_lock(&lp->lpni_lock); LASSERT((lp->lpni_txcredits < 0) == !list_empty(&lp->lpni_txq)); @@ -862,8 +863,10 @@ lnet_post_send_locked(lnet_msg_t *msg, int do_send) if (lp->lpni_txcredits < 0) { msg->msg_tx_delayed = 1; list_add_tail(&msg->msg_list, &lp->lpni_txq); + spin_unlock(&lp->lpni_lock); return LNET_CREDIT_WAIT; } + spin_unlock(&lp->lpni_lock); } if (!msg->msg_txcredit) { @@ -935,6 +938,7 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv) LASSERT(!do_recv || msg->msg_rx_delayed); if (!msg->msg_peerrtrcredit) { + spin_lock(&lp->lpni_lock); LASSERT((lp->lpni_rtrcredits < 0) == !list_empty(&lp->lpni_rtrq)); @@ -948,8 +952,10 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv) LASSERT(msg->msg_rx_ready_delay); msg->msg_rx_delayed = 1; list_add_tail(&msg->msg_list, &lp->lpni_rtrq); + spin_unlock(&lp->lpni_lock); return LNET_CREDIT_WAIT; } + spin_unlock(&lp->lpni_lock); } rbp = lnet_msg2bufpool(msg); @@ -1013,6 +1019,7 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg) LASSERT(msg2->msg_txni == ni); LASSERT(msg2->msg_tx_delayed); + LASSERT(msg2->msg_tx_cpt == msg->msg_tx_cpt); (void) lnet_post_send_locked(msg2, 1); } @@ -1022,24 +1029,36 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg) /* give back peer txcredits */ msg->msg_peertxcredit = 0; + spin_lock(&txpeer->lpni_lock); LASSERT((txpeer->lpni_txcredits < 0) == !list_empty(&txpeer->lpni_txq)); txpeer->lpni_txqnob -= msg->msg_len + sizeof(lnet_hdr_t); - LASSERT (txpeer->lpni_txqnob >= 0); + LASSERT(txpeer->lpni_txqnob >= 0); txpeer->lpni_txcredits++; if (txpeer->lpni_txcredits <= 0) { msg2 = list_entry(txpeer->lpni_txq.next, lnet_msg_t, msg_list); list_del(&msg2->msg_list); + spin_unlock(&txpeer->lpni_lock); LASSERT(msg2->msg_txpeer == txpeer); LASSERT(msg2->msg_tx_delayed); - (void) lnet_post_send_locked(msg2, 1); + if (msg2->msg_tx_cpt != msg->msg_tx_cpt) { + lnet_net_unlock(msg->msg_tx_cpt); + lnet_net_lock(msg2->msg_tx_cpt); + } + (void) lnet_post_send_locked(msg2, 1); + if (msg2->msg_tx_cpt != msg->msg_tx_cpt) { + lnet_net_unlock(msg2->msg_tx_cpt); + lnet_net_lock(msg->msg_tx_cpt); + } + } else { + spin_unlock(&txpeer->lpni_lock); } - } + } if (txni != NULL) { msg->msg_txni = NULL; @@ -1078,17 +1097,12 @@ lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp) void lnet_drop_routed_msgs_locked(struct list_head *list, int cpt) { - lnet_msg_t *msg; - lnet_msg_t *tmp; - struct list_head drop; - - INIT_LIST_HEAD(&drop); - - list_splice_init(list, &drop); + lnet_msg_t *msg; + lnet_msg_t *tmp; lnet_net_unlock(cpt); - list_for_each_entry_safe(msg, tmp, &drop, msg_list) { + list_for_each_entry_safe(msg, tmp, list, msg_list) { lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL, 0, 0, 0, msg->msg_hdr.payload_length); list_del_init(&msg->msg_list); @@ -1154,6 +1168,7 @@ routing_off: /* give back peer router credits */ msg->msg_peerrtrcredit = 0; + spin_lock(&rxpeer->lpni_lock); LASSERT((rxpeer->lpni_rtrcredits < 0) == !list_empty(&rxpeer->lpni_rtrq)); @@ -1162,14 +1177,19 @@ routing_off: /* drop all messages which are queued to be routed on that * peer. */ if (!the_lnet.ln_routing) { - lnet_drop_routed_msgs_locked(&rxpeer->lpni_rtrq, - msg->msg_rx_cpt); + struct list_head drop; + INIT_LIST_HEAD(&drop); + list_splice_init(&rxpeer->lpni_rtrq, &drop); + spin_unlock(&rxpeer->lpni_lock); + lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt); } else if (rxpeer->lpni_rtrcredits <= 0) { msg2 = list_entry(rxpeer->lpni_rtrq.next, lnet_msg_t, msg_list); list_del(&msg2->msg_list); - + spin_unlock(&rxpeer->lpni_lock); (void) lnet_post_routed_recv_locked(msg2, 1); + } else { + spin_unlock(&rxpeer->lpni_lock); } } if (rxni != NULL) { @@ -1695,14 +1715,14 @@ pick_peer: * it. */ continue; - } if (lpni->lpni_txcredits < best_lpni_credits) + } else if (lpni->lpni_txcredits < best_lpni_credits) { /* * We already have a peer that has more credits * available than this one. No need to consider * this peer further. */ continue; - else if (lpni->lpni_txcredits == best_lpni_credits) { + } else if (lpni->lpni_txcredits == best_lpni_credits) { /* * The best peer found so far and the current peer * have the same number of available credits let's diff --git a/lnet/lnet/peer.c b/lnet/lnet/peer.c index d5d4986..827fa62 100644 --- a/lnet/lnet/peer.c +++ b/lnet/lnet/peer.c @@ -56,12 +56,15 @@ lnet_peer_net_added(struct lnet_net *net) if (LNET_NIDNET(lpni->lpni_nid) == net->net_id) { lpni->lpni_net = net; + + spin_lock(&lpni->lpni_lock); lpni->lpni_txcredits = - lpni->lpni_mintxcredits = lpni->lpni_net->net_tunables.lct_peer_tx_credits; + lpni->lpni_mintxcredits = lpni->lpni_txcredits; lpni->lpni_rtrcredits = - lpni->lpni_minrtrcredits = lnet_peer_buffer_credits(lpni->lpni_net); + lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits; + spin_unlock(&lpni->lpni_lock); lnet_peer_remove_from_remote_list(lpni); } diff --git a/lnet/lnet/router.c b/lnet/lnet/router.c index 71903cf..c0dd0a9 100644 --- a/lnet/lnet/router.c +++ b/lnet/lnet/router.c @@ -1398,7 +1398,8 @@ lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp, int cpt) INIT_LIST_HEAD(&tmp); lnet_net_lock(cpt); - lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt); + list_splice_init(&rbp->rbp_msgs, &tmp); + lnet_drop_routed_msgs_locked(&tmp, cpt); list_splice_init(&rbp->rbp_bufs, &tmp); rbp->rbp_req_nbuffers = 0; rbp->rbp_nbuffers = rbp->rbp_credits = 0; diff --git a/lnet/lnet/router_proc.c b/lnet/lnet/router_proc.c index b2dfe42..7636202 100644 --- a/lnet/lnet/router_proc.c +++ b/lnet/lnet/router_proc.c @@ -513,7 +513,7 @@ proc_lnet_peers(struct ctl_table *table, int write, void __user *buffer, aliveness = peer->lpni_alive ? "up" : "down"; if (lnet_peer_aliveness_enabled(peer)) { - cfs_time_t now = cfs_time_current(); + cfs_time_t now = cfs_time_current(); cfs_duration_t delta; delta = cfs_time_sub(now, peer->lpni_last_alive);