/* instance-specific data */
void *ni_data;
+ /* per ni credits */
+ atomic_t ni_tx_credits;
+
/* percpt TX queues */
struct lnet_tx_queue **ni_tx_queues;
struct lnet_peer_net *lpni_peer_net;
/* statistics kept on each peer NI */
struct lnet_element_stats lpni_stats;
+ /* spin lock protecting credits and lpni_txq / lpni_rtrq */
+ spinlock_t lpni_lock;
/* # tx credits available */
int lpni_txcredits;
/* low water mark */
/* low water mark */
int lpni_minrtrcredits;
/* alive/dead? */
- unsigned int lpni_alive:1;
+ bool lpni_alive;
/* notification outstanding? */
- unsigned int lpni_notify:1;
+ bool lpni_notify;
/* outstanding notification for LND? */
- unsigned int lpni_notifylnd:1;
+ bool lpni_notifylnd;
/* some thread is handling notification */
- unsigned int lpni_notifying:1;
+ bool lpni_notifying;
/* SEND event outstanding from ping */
- unsigned int lpni_ping_notsent;
- /* # times router went dead<->alive */
+ bool lpni_ping_notsent;
+ /* # times router went dead<->alive. Protected with lpni_lock */
int lpni_alive_count;
/* bytes queued for sending */
long lpni_txqnob;
__u32 lpni_seq;
/* health flag */
bool lpni_healthy;
- /* returned RC ping features */
+ /* returned RC ping features. Protected with lpni_lock */
unsigned int lpni_ping_feats;
/* routes on this peer */
struct list_head lpni_routes;
tq->tq_credits = lnet_ni_tq_credits(ni);
}
+ atomic_set(&ni->ni_tx_credits,
+ lnet_ni_tq_credits(ni) * ni->ni_ncpts);
+
CDEBUG(D_LNI, "Added LNI %s [%d/%d/%d/%d]\n",
libcfs_nid2str(ni->ni_nid),
ni->ni_net->net_tunables.lct_peer_tx_credits,
return rc;
}
-/* NB: caller shall hold a ref on 'lp' as I'd drop lnet_net_lock */
+/*
+ * This function can be called from two paths:
+ * 1. when sending a message
+ * 2. when decommiting a message (lnet_msg_decommit_tx())
+ * In both these cases the peer_ni should have it's reference count
+ * acquired by the caller and therefore it is safe to drop the spin
+ * lock before calling lnd_query()
+ */
static void
lnet_ni_query_locked(lnet_ni_t *ni, struct lnet_peer_ni *lp)
{
cfs_time_t last_alive = 0;
+ int cpt = lnet_cpt_of_nid_locked(lp->lpni_nid, ni);
LASSERT(lnet_peer_aliveness_enabled(lp));
LASSERT(ni->ni_net->net_lnd->lnd_query != NULL);
- lnet_net_unlock(lp->lpni_cpt);
+ lnet_net_unlock(cpt);
(ni->ni_net->net_lnd->lnd_query)(ni, lp->lpni_nid, &last_alive);
- lnet_net_lock(lp->lpni_cpt);
+ lnet_net_lock(cpt);
lp->lpni_last_query = cfs_time_current();
* Trust lnet_notify() if it has more recent aliveness news, but
* ignore the initial assumed death (see lnet_peers_start_down()).
*/
+ spin_lock(&lp->lpni_lock);
if (!lp->lpni_alive && lp->lpni_alive_count > 0 &&
- cfs_time_aftereq(lp->lpni_timestamp, lp->lpni_last_alive))
+ cfs_time_aftereq(lp->lpni_timestamp, lp->lpni_last_alive)) {
+ spin_unlock(&lp->lpni_lock);
return 0;
+ }
deadline =
cfs_time_add(lp->lpni_last_alive,
* case, and moreover lpni_last_alive at peer creation is assumed.
*/
if (alive && !lp->lpni_alive &&
- !(lnet_isrouter(lp) && lp->lpni_alive_count == 0))
+ !(lnet_isrouter(lp) && lp->lpni_alive_count == 0)) {
+ spin_unlock(&lp->lpni_lock);
lnet_notify_locked(lp, 0, 1, lp->lpni_last_alive);
+ } else {
+ spin_unlock(&lp->lpni_lock);
+ }
return alive;
}
msg->msg_txcredit = 1;
tq->tq_credits--;
+ atomic_dec(&ni->ni_tx_credits);
if (tq->tq_credits < tq->tq_credits_min)
tq->tq_credits_min = tq->tq_credits;
!list_empty(&tq->tq_delayed));
tq->tq_credits++;
+ atomic_inc(&ni->ni_tx_credits);
if (tq->tq_credits <= 0) {
msg2 = list_entry(tq->tq_delayed.next,
lnet_msg_t, msg_list);
* 3. Round Robin
*/
while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
+ int ni_credits;
+
if (!lnet_is_ni_healthy_locked(ni))
continue;
+ ni_credits = atomic_read(&ni->ni_tx_credits);
+
/*
* calculate the distance from the cpt on which
* the message memory is allocated to the CPT of
* select using credits followed by Round
* Robin.
*/
- if (ni->ni_tx_queues[cpt]->tq_credits <
- best_credits) {
+ if (ni_credits < best_credits) {
continue;
- } else if (ni->ni_tx_queues[cpt]->tq_credits ==
- best_credits) {
+ } else if (ni_credits == best_credits) {
if (best_ni) {
if (best_ni->ni_seq <= ni->ni_seq)
continue;
}
set_ni:
best_ni = ni;
- best_credits = ni->ni_tx_queues[cpt]->tq_credits;
+ best_credits = ni_credits;
}
}
/*
send:
/*
- * determine the cpt to use and if it has changed then
- * lock the new cpt and check if the config has changed.
- * If it has changed then repeat the algorithm since the
- * ni or peer list could have changed and the algorithm
- * would endup picking a different ni/peer_ni pair.
+ * Use lnet_cpt_of_nid() to determine the CPT used to commit the
+ * message. This ensures that we get a CPT that is correct for
+ * the NI when the NI has been restricted to a subset of all CPTs.
+ * If the selected CPT differs from the one currently locked, we
+ * must unlock and relock the lnet_net_lock(), and then check whether
+ * the configuration has changed. We don't have a hold on the best_ni
+ * or best_peer_ni yet, and they may have vanished.
*/
- cpt2 = best_lpni->lpni_cpt;
+ cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
if (cpt != cpt2) {
lnet_net_unlock(cpt);
cpt = cpt2;
info.mi_rlength = hdr->payload_length;
info.mi_roffset = hdr->msg.put.offset;
info.mi_mbits = hdr->msg.put.match_bits;
- info.mi_cpt = msg->msg_rxpeer->lpni_cpt;
+ info.mi_cpt = lnet_cpt_of_nid(msg->msg_rxpeer->lpni_nid, ni);
msg->msg_rx_ready_delay = ni->ni_net->net_lnd->lnd_eager_recv == NULL;
ready_delay = msg->msg_rx_ready_delay;
* called lnet_drop_message(), so I just hang onto msg as well
* until that's done */
- lnet_drop_message(msg->msg_rxni,
- msg->msg_rxpeer->lpni_cpt,
+ lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt,
msg->msg_private, msg->msg_len);
/*
* NB: message will not generate event because w/o attached MD,
INIT_LIST_HEAD(&lpni->lpni_on_peer_net_list);
INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
+ spin_lock_init(&lpni->lpni_lock);
+
lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
lpni->lpni_last_alive = cfs_time_current(); /* assumes alive */
lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
return;
}
+ /*
+ * This function can be called with different cpt locks being
+ * held. lpni_alive_count modification needs to be properly protected.
+ * Significant reads to lpni_alive_count are also protected with
+ * the same lock
+ */
+ spin_lock(&lp->lpni_lock);
+
lp->lpni_timestamp = when; /* update timestamp */
lp->lpni_ping_deadline = 0; /* disable ping timeout */
if (lp->lpni_alive_count != 0 && /* got old news */
(!lp->lpni_alive) == (!alive)) { /* new date for old news */
+ spin_unlock(&lp->lpni_lock);
CDEBUG(D_NET, "Old news\n");
return;
}
/* Flag that notification is outstanding */
lp->lpni_alive_count++;
- lp->lpni_alive = !(!alive); /* 1 bit! */
+ lp->lpni_alive = (alive) ? 1 : 0;
lp->lpni_notify = 1;
- lp->lpni_notifylnd |= notifylnd;
+ lp->lpni_notifylnd = notifylnd;
if (lp->lpni_alive)
lp->lpni_ping_feats = LNET_PING_FEAT_INVAL; /* reset */
+ spin_unlock(&lp->lpni_lock);
+
CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lpni_nid), alive);
}
+/*
+ * This function will always be called with lp->lpni_cpt lock held.
+ */
static void
lnet_ni_notify_locked(lnet_ni_t *ni, struct lnet_peer_ni *lp)
{
- int alive;
- int notifylnd;
+ int alive;
+ int notifylnd;
/* Notify only in 1 thread at any time to ensure ordered notification.
* NB individual events can be missed; the only guarantee is that you
* always get the most recent news */
- if (lp->lpni_notifying || ni == NULL)
+ spin_lock(&lp->lpni_lock);
+
+ if (lp->lpni_notifying || ni == NULL) {
+ spin_unlock(&lp->lpni_lock);
return;
+ }
lp->lpni_notifying = 1;
+ /*
+ * lp->lpni_notify needs to be protected because it can be set in
+ * lnet_notify_locked().
+ */
while (lp->lpni_notify) {
alive = lp->lpni_alive;
notifylnd = lp->lpni_notifylnd;
lp->lpni_notify = 0;
if (notifylnd && ni->ni_net->net_lnd->lnd_notify != NULL) {
+ spin_unlock(&lp->lpni_lock);
lnet_net_unlock(lp->lpni_cpt);
/* A new notification could happen now; I'll handle it
alive);
lnet_net_lock(lp->lpni_cpt);
+ spin_lock(&lp->lpni_lock);
}
}
lp->lpni_notifying = 0;
+ spin_unlock(&lp->lpni_lock);
}
-
static void
lnet_rtr_addref_locked(struct lnet_peer_ni *lp)
{
if (!gw->lpni_alive)
return;
+ /*
+ * Protect gw->lpni_ping_feats. This can be set from
+ * lnet_notify_locked with different locks being held
+ */
+ spin_lock(&gw->lpni_lock);
+
if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
lnet_swap_pinginfo(info);
CDEBUG(D_NET, "%s: Unexpected magic %08x\n",
libcfs_nid2str(gw->lpni_nid), info->pi_magic);
gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
+ spin_unlock(&gw->lpni_lock);
return;
}
if ((gw->lpni_ping_feats & LNET_PING_FEAT_MASK) == 0) {
CDEBUG(D_NET, "%s: Unexpected features 0x%x\n",
libcfs_nid2str(gw->lpni_nid), gw->lpni_ping_feats);
+ spin_unlock(&gw->lpni_lock);
return; /* nothing I can understand */
}
- if ((gw->lpni_ping_feats & LNET_PING_FEAT_NI_STATUS) == 0)
+ if ((gw->lpni_ping_feats & LNET_PING_FEAT_NI_STATUS) == 0) {
+ spin_unlock(&gw->lpni_lock);
return; /* can't carry NI status info */
+ }
list_for_each_entry(rte, &gw->lpni_routes, lr_gwlist) {
int down = 0;
CDEBUG(D_NET, "%s: unexpected LNET_NID_ANY\n",
libcfs_nid2str(gw->lpni_nid));
gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
+ spin_unlock(&gw->lpni_lock);
return;
}
CDEBUG(D_NET, "%s: Unexpected status 0x%x\n",
libcfs_nid2str(gw->lpni_nid), stat->ns_status);
gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
+ spin_unlock(&gw->lpni_lock);
return;
}
rte->lr_downis = down;
}
+
+ spin_unlock(&gw->lpni_lock);
}
static void
lnet_router_checker_event(lnet_event_t *event)
{
- lnet_rc_data_t *rcd = event->md.user_ptr;
- struct lnet_peer_ni *lp;
+ lnet_rc_data_t *rcd = event->md.user_ptr;
+ struct lnet_peer_ni *lp;
LASSERT(rcd != NULL);
rtr = list_entry(entry, struct lnet_peer_ni,
lpni_rtr_list);
+ spin_lock(&rtr->lpni_lock);
+
if (rtr->lpni_alive_count == 0) {
all_known = 0;
+ spin_unlock(&rtr->lpni_lock);
break;
}
+ spin_unlock(&rtr->lpni_lock);
}
lnet_net_unlock(cpt);
int
lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, cfs_time_t when)
{
- struct lnet_peer_ni *lp = NULL;
- cfs_time_t now = cfs_time_current();
- int cpt = lnet_cpt_of_nid(nid, ni);
+ struct lnet_peer_ni *lp = NULL;
+ cfs_time_t now = cfs_time_current();
+ int cpt = lnet_cpt_of_nid(nid, ni);
LASSERT (!in_interrupt ());
return 0;
}
+ /*
+ * It is possible for this function to be called for the same peer
+ * but with different NIs. We want to synchronize the notification
+ * between the different calls. So we will use the lpni_cpt to
+ * grab the net lock.
+ */
+ if (lp->lpni_cpt != cpt) {
+ lnet_net_unlock(cpt);
+ cpt = lp->lpni_cpt;
+ lnet_net_lock(cpt);
+ }
+
/* We can't fully trust LND on reporting exact peer last_alive
* if he notifies us about dead peer. For example ksocklnd can
* call us with when == _time_when_the_node_was_booted_ if