#include <linux/nsproxy.h>
#include <net/net_namespace.h>
+extern unsigned int lnet_current_net_count;
+
static int local_nid_dist_zero = 1;
module_param(local_nid_dist_zero, int, 0444);
MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
return rc;
}
-/*
- * This function can be called from two paths:
- * 1. when sending a message
- * 2. when decommiting a message (lnet_msg_decommit_tx())
- * In both these cases the peer_ni should have it's reference count
- * acquired by the caller and therefore it is safe to drop the spin
- * lock before calling lnd_query()
- */
-static void
-lnet_ni_query_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp)
-{
- time64_t last_alive = 0;
- int cpt = lnet_cpt_of_nid_locked(lp->lpni_nid, ni);
-
- LASSERT(lnet_peer_aliveness_enabled(lp));
- LASSERT(ni->ni_net->net_lnd->lnd_query != NULL);
-
- lnet_net_unlock(cpt);
- (ni->ni_net->net_lnd->lnd_query)(ni, lp->lpni_nid, &last_alive);
- lnet_net_lock(cpt);
-
- lp->lpni_last_query = ktime_get_seconds();
-
- if (last_alive != 0) /* NI has updated timestamp */
- lp->lpni_last_alive = last_alive;
-}
-
-/* NB: always called with lnet_net_lock held */
-static inline int
-lnet_peer_is_alive(struct lnet_peer_ni *lp, time64_t now)
+static bool
+lnet_is_peer_deadline_passed(struct lnet_peer_ni *lpni, time64_t now)
{
- int alive;
time64_t deadline;
- LASSERT (lnet_peer_aliveness_enabled(lp));
+ deadline = lpni->lpni_last_alive +
+ lpni->lpni_net->net_tunables.lct_peer_timeout;
/*
- * Trust lnet_notify() if it has more recent aliveness news, but
- * ignore the initial assumed death (see lnet_peers_start_down()).
+ * assume peer_ni is alive as long as we're within the configured
+ * peer timeout
*/
- spin_lock(&lp->lpni_lock);
- if (!lp->lpni_alive && lp->lpni_alive_count > 0 &&
- lp->lpni_timestamp >= lp->lpni_last_alive) {
- spin_unlock(&lp->lpni_lock);
- return 0;
- }
-
- deadline = lp->lpni_last_alive +
- lp->lpni_net->net_tunables.lct_peer_timeout;
- alive = deadline > now;
+ if (deadline > now)
+ return false;
- /*
- * Update obsolete lp_alive except for routers assumed to be dead
- * initially, because router checker would update aliveness in this
- * case, and moreover lpni_last_alive at peer creation is assumed.
- */
- if (alive && !lp->lpni_alive &&
- !(lnet_isrouter(lp) && lp->lpni_alive_count == 0)) {
- spin_unlock(&lp->lpni_lock);
- lnet_notify_locked(lp, 0, 1, lp->lpni_last_alive);
- } else {
- spin_unlock(&lp->lpni_lock);
- }
-
- return alive;
+ return true;
}
-
/* NB: returns 1 when alive, 0 when dead, negative when error;
* may drop the lnet_net_lock */
static int
-lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp)
+lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lpni,
+ struct lnet_msg *msg)
{
time64_t now = ktime_get_seconds();
- if (!lnet_peer_aliveness_enabled(lp))
+ if (!lnet_peer_aliveness_enabled(lpni))
return -ENODEV;
- if (lnet_peer_is_alive(lp, now))
- return 1;
-
/*
- * Peer appears dead, but we should avoid frequent NI queries (at
- * most once per lnet_queryinterval seconds).
+ * If we're resending a message, let's attempt to send it even if
+ * the peer is down to fulfill our resend quota on the message
*/
- if (lp->lpni_last_query != 0) {
- static const int lnet_queryinterval = 1;
- time64_t next_query;
-
- next_query = lp->lpni_last_query + lnet_queryinterval;
-
- if (now < next_query) {
- if (lp->lpni_alive)
- CWARN("Unexpected aliveness of peer %s: "
- "%lld < %lld (%d/%d)\n",
- libcfs_nid2str(lp->lpni_nid),
- now, next_query,
- lnet_queryinterval,
- lp->lpni_net->net_tunables.lct_peer_timeout);
- return 0;
- }
- }
+ if (msg->msg_retry_count > 0)
+ return 1;
- /* query NI for latest aliveness news */
- lnet_ni_query_locked(ni, lp);
+ /* try and send recovery messages irregardless */
+ if (msg->msg_recovery)
+ return 1;
- if (lnet_peer_is_alive(lp, now))
+ /* always send any responses */
+ if (msg->msg_type == LNET_MSG_ACK ||
+ msg->msg_type == LNET_MSG_REPLY)
return 1;
- lnet_notify_locked(lp, 0, 0, lp->lpni_last_alive);
- return 0;
+ if (!lnet_is_peer_deadline_passed(lpni, now))
+ return true;
+
+ return lnet_is_peer_ni_alive(lpni);
}
/**
LASSERT(!do_send || msg->msg_tx_delayed);
LASSERT(!msg->msg_receiving);
LASSERT(msg->msg_tx_committed);
+ /* can't get here if we're sending to the loopback interface */
+ LASSERT(lp->lpni_nid != the_lnet.ln_loni->ni_nid);
/* NB 'lp' is always the next hop */
if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
- lnet_peer_alive_locked(ni, lp) == 0) {
- the_lnet.ln_counters[cpt]->drop_count++;
- the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
+ lnet_peer_alive_locked(ni, lp, msg) == 0) {
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length +=
+ msg->msg_len;
lnet_net_unlock(cpt);
if (msg->msg_txpeer)
lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
CNETERR("Dropping message for %s: peer not alive\n",
libcfs_id2str(msg->msg_target));
- if (do_send) {
- msg->msg_health_status = LNET_MSG_STATUS_LOCAL_DROPPED;
+ msg->msg_health_status = LNET_MSG_STATUS_REMOTE_DROPPED;
+ if (do_send)
lnet_finalize(msg, -EHOSTUNREACH);
- }
lnet_net_lock(cpt);
return -EHOSTUNREACH;
libcfs_id2str(msg->msg_target));
if (do_send) {
msg->msg_no_resend = true;
+ CDEBUG(D_NET, "msg %p to %s canceled and will not be resent\n",
+ msg, libcfs_id2str(msg->msg_target));
lnet_finalize(msg, -ECANCELED);
}
* sets do_recv FALSE and I don't do the unlock/send/lock bit.
* I return LNET_CREDIT_WAIT if msg blocked and LNET_CREDIT_OK if
* received or OK to receive */
- struct lnet_peer_ni *lp = msg->msg_rxpeer;
+ struct lnet_peer_ni *lpni = msg->msg_rxpeer;
+ struct lnet_peer *lp;
struct lnet_rtrbufpool *rbp;
struct lnet_rtrbuf *rb;
- LASSERT (msg->msg_iov == NULL);
- LASSERT (msg->msg_kiov == NULL);
- LASSERT (msg->msg_niov == 0);
- LASSERT (msg->msg_routing);
- LASSERT (msg->msg_receiving);
- LASSERT (!msg->msg_sending);
+ LASSERT(msg->msg_iov == NULL);
+ LASSERT(msg->msg_kiov == NULL);
+ LASSERT(msg->msg_niov == 0);
+ LASSERT(msg->msg_routing);
+ LASSERT(msg->msg_receiving);
+ LASSERT(!msg->msg_sending);
+ LASSERT(lpni->lpni_peer_net);
+ LASSERT(lpni->lpni_peer_net->lpn_peer);
+
+ lp = lpni->lpni_peer_net->lpn_peer;
/* non-lnet_parse callers only receive delayed messages */
LASSERT(!do_recv || msg->msg_rx_delayed);
if (!msg->msg_peerrtrcredit) {
- spin_lock(&lp->lpni_lock);
- LASSERT((lp->lpni_rtrcredits < 0) ==
- !list_empty(&lp->lpni_rtrq));
+ /* lpni_lock protects the credit manipulation */
+ spin_lock(&lpni->lpni_lock);
+ /* lp_lock protects the lp_rtrq */
+ spin_lock(&lp->lp_lock);
msg->msg_peerrtrcredit = 1;
- lp->lpni_rtrcredits--;
- if (lp->lpni_rtrcredits < lp->lpni_minrtrcredits)
- lp->lpni_minrtrcredits = lp->lpni_rtrcredits;
+ lpni->lpni_rtrcredits--;
+ if (lpni->lpni_rtrcredits < lpni->lpni_minrtrcredits)
+ lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
- if (lp->lpni_rtrcredits < 0) {
+ if (lpni->lpni_rtrcredits < 0) {
/* must have checked eager_recv before here */
LASSERT(msg->msg_rx_ready_delay);
msg->msg_rx_delayed = 1;
- list_add_tail(&msg->msg_list, &lp->lpni_rtrq);
- spin_unlock(&lp->lpni_lock);
+ list_add_tail(&msg->msg_list, &lp->lp_rtrq);
+ spin_unlock(&lp->lp_lock);
+ spin_unlock(&lpni->lpni_lock);
return LNET_CREDIT_WAIT;
}
- spin_unlock(&lp->lpni_lock);
+ spin_unlock(&lp->lp_lock);
+ spin_unlock(&lpni->lpni_lock);
}
rbp = lnet_msg2bufpool(msg);
0, 0, 0, msg->msg_hdr.payload_length);
list_del_init(&msg->msg_list);
msg->msg_no_resend = true;
+ msg->msg_health_status = LNET_MSG_STATUS_REMOTE_ERROR;
lnet_finalize(msg, -ECANCELED);
}
void
lnet_return_rx_credits_locked(struct lnet_msg *msg)
{
- struct lnet_peer_ni *rxpeer = msg->msg_rxpeer;
+ struct lnet_peer_ni *rxpeerni = msg->msg_rxpeer;
+ struct lnet_peer *lp;
struct lnet_ni *rxni = msg->msg_rxni;
struct lnet_msg *msg2;
routing_off:
if (msg->msg_peerrtrcredit) {
+ LASSERT(rxpeerni);
+ LASSERT(rxpeerni->lpni_peer_net);
+ LASSERT(rxpeerni->lpni_peer_net->lpn_peer);
+
+ lp = rxpeerni->lpni_peer_net->lpn_peer;
+
/* give back peer router credits */
msg->msg_peerrtrcredit = 0;
- spin_lock(&rxpeer->lpni_lock);
- LASSERT((rxpeer->lpni_rtrcredits < 0) ==
- !list_empty(&rxpeer->lpni_rtrq));
+ spin_lock(&rxpeerni->lpni_lock);
+ spin_lock(&lp->lp_lock);
- rxpeer->lpni_rtrcredits++;
+ rxpeerni->lpni_rtrcredits++;
/* drop all messages which are queued to be routed on that
* peer. */
if (!the_lnet.ln_routing) {
struct list_head drop;
INIT_LIST_HEAD(&drop);
- list_splice_init(&rxpeer->lpni_rtrq, &drop);
- spin_unlock(&rxpeer->lpni_lock);
+ list_splice_init(&lp->lp_rtrq, &drop);
+ spin_unlock(&lp->lp_lock);
+ spin_unlock(&rxpeerni->lpni_lock);
lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt);
- } else if (rxpeer->lpni_rtrcredits <= 0) {
- msg2 = list_entry(rxpeer->lpni_rtrq.next,
+ } else if (!list_empty(&lp->lp_rtrq)) {
+ int msg2_cpt;
+
+ msg2 = list_entry(lp->lp_rtrq.next,
struct lnet_msg, msg_list);
list_del(&msg2->msg_list);
- spin_unlock(&rxpeer->lpni_lock);
+ msg2_cpt = msg2->msg_rx_cpt;
+ spin_unlock(&lp->lp_lock);
+ spin_unlock(&rxpeerni->lpni_lock);
+ /*
+ * messages on the lp_rtrq can be from any NID in
+ * the peer, which means they might have different
+ * cpts. We need to make sure we lock the right
+ * one.
+ */
+ if (msg2_cpt != msg->msg_rx_cpt) {
+ lnet_net_unlock(msg->msg_rx_cpt);
+ lnet_net_lock(msg2_cpt);
+ }
(void) lnet_post_routed_recv_locked(msg2, 1);
+ if (msg2_cpt != msg->msg_rx_cpt) {
+ lnet_net_unlock(msg2_cpt);
+ lnet_net_lock(msg->msg_rx_cpt);
+ }
} else {
- spin_unlock(&rxpeer->lpni_lock);
+ spin_unlock(&lp->lp_lock);
+ spin_unlock(&rxpeerni->lpni_lock);
}
}
if (rxni != NULL) {
msg->msg_rxni = NULL;
lnet_ni_decref_locked(rxni, msg->msg_rx_cpt);
}
- if (rxpeer != NULL) {
+ if (rxpeerni != NULL) {
msg->msg_rxpeer = NULL;
- lnet_peer_ni_decref_locked(rxpeer);
+ lnet_peer_ni_decref_locked(rxpeerni);
}
}
return 0;
}
+static struct lnet_peer_ni *
+lnet_select_peer_ni(struct lnet_ni *best_ni, lnet_nid_t dst_nid,
+ struct lnet_peer *peer,
+ struct lnet_peer_net *peer_net)
+{
+ /*
+ * Look at the peer NIs for the destination peer that connect
+ * to the chosen net. If a peer_ni is preferred when using the
+ * best_ni to communicate, we use that one. If there is no
+ * preferred peer_ni, or there are multiple preferred peer_ni,
+ * the available transmit credits are used. If the transmit
+ * credits are equal, we round-robin over the peer_ni.
+ */
+ struct lnet_peer_ni *lpni = NULL;
+ struct lnet_peer_ni *best_lpni = NULL;
+ int best_lpni_credits = INT_MIN;
+ bool preferred = false;
+ bool ni_is_pref;
+ int best_lpni_healthv = 0;
+ int lpni_healthv;
+
+ while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
+ /*
+ * if the best_ni we've chosen aleady has this lpni
+ * preferred, then let's use it
+ */
+ if (best_ni) {
+ ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
+ best_ni->ni_nid);
+ CDEBUG(D_NET, "%s ni_is_pref = %d\n",
+ libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
+ } else {
+ ni_is_pref = false;
+ }
+
+ lpni_healthv = atomic_read(&lpni->lpni_healthv);
+
+ if (best_lpni)
+ CDEBUG(D_NET, "%s c:[%d, %d], s:[%d, %d]\n",
+ libcfs_nid2str(lpni->lpni_nid),
+ lpni->lpni_txcredits, best_lpni_credits,
+ lpni->lpni_seq, best_lpni->lpni_seq);
+
+ /* pick the healthiest peer ni */
+ if (lpni_healthv < best_lpni_healthv) {
+ continue;
+ } else if (lpni_healthv > best_lpni_healthv) {
+ best_lpni_healthv = lpni_healthv;
+ /* if this is a preferred peer use it */
+ } else if (!preferred && ni_is_pref) {
+ preferred = true;
+ } else if (preferred && !ni_is_pref) {
+ /*
+ * this is not the preferred peer so let's ignore
+ * it.
+ */
+ continue;
+ } else if (lpni->lpni_txcredits < best_lpni_credits) {
+ /*
+ * We already have a peer that has more credits
+ * available than this one. No need to consider
+ * this peer further.
+ */
+ continue;
+ } else if (lpni->lpni_txcredits == best_lpni_credits) {
+ /*
+ * The best peer found so far and the current peer
+ * have the same number of available credits let's
+ * make sure to select between them using Round
+ * Robin
+ */
+ if (best_lpni) {
+ if (best_lpni->lpni_seq <= lpni->lpni_seq)
+ continue;
+ }
+ }
+
+ best_lpni = lpni;
+ best_lpni_credits = lpni->lpni_txcredits;
+ }
+
+ /* if we still can't find a peer ni then we can't reach it */
+ if (!best_lpni) {
+ __u32 net_id = (peer_net) ? peer_net->lpn_net_id :
+ LNET_NIDNET(dst_nid);
+ CDEBUG(D_NET, "no peer_ni found on peer net %s\n",
+ libcfs_net2str(net_id));
+ return NULL;
+ }
+
+ CDEBUG(D_NET, "sd_best_lpni = %s\n",
+ libcfs_nid2str(best_lpni->lpni_nid));
+
+ return best_lpni;
+}
+
+/*
+ * Prerequisite: the best_ni should already be set in the sd
+ */
+static inline struct lnet_peer_ni *
+lnet_find_best_lpni_on_net(struct lnet_send_data *sd, struct lnet_peer *peer,
+ __u32 net_id)
+{
+ struct lnet_peer_net *peer_net;
+
+ /*
+ * The gateway is Multi-Rail capable so now we must select the
+ * proper peer_ni
+ */
+ peer_net = lnet_peer_get_net_locked(peer, net_id);
+
+ if (!peer_net) {
+ CERROR("gateway peer %s has no NI on net %s\n",
+ libcfs_nid2str(peer->lp_primary_nid),
+ libcfs_net2str(net_id));
+ return NULL;
+ }
+
+ return lnet_select_peer_ni(sd->sd_best_ni, sd->sd_dst_nid,
+ peer, peer_net);
+}
+
static int
-lnet_compare_routes(struct lnet_route *r1, struct lnet_route *r2)
+lnet_compare_routes(struct lnet_route *r1, struct lnet_route *r2,
+ struct lnet_peer_ni **best_lpni)
{
- struct lnet_peer_ni *p1 = r1->lr_gateway;
- struct lnet_peer_ni *p2 = r2->lr_gateway;
int r1_hops = (r1->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r1->lr_hops;
int r2_hops = (r2->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r2->lr_hops;
+ struct lnet_peer *lp1 = r1->lr_gateway;
+ struct lnet_peer *lp2 = r2->lr_gateway;
+ struct lnet_peer_ni *lpni1;
+ struct lnet_peer_ni *lpni2;
+ struct lnet_send_data sd;
int rc;
- if (r1->lr_priority < r2->lr_priority)
+ sd.sd_best_ni = NULL;
+ sd.sd_dst_nid = LNET_NID_ANY;
+ lpni1 = lnet_find_best_lpni_on_net(&sd, lp1, r1->lr_lnet);
+ lpni2 = lnet_find_best_lpni_on_net(&sd, lp2, r2->lr_lnet);
+ LASSERT(lpni1 && lpni2);
+
+ if (r1->lr_priority < r2->lr_priority) {
+ *best_lpni = lpni1;
return 1;
+ }
- if (r1->lr_priority > r2->lr_priority)
+ if (r1->lr_priority > r2->lr_priority) {
+ *best_lpni = lpni2;
return -1;
+ }
- if (r1_hops < r2_hops)
+ if (r1_hops < r2_hops) {
+ *best_lpni = lpni1;
return 1;
+ }
- if (r1_hops > r2_hops)
+ if (r1_hops > r2_hops) {
+ *best_lpni = lpni2;
return -1;
+ }
- rc = lnet_compare_peers(p1, p2);
- if (rc)
+ rc = lnet_compare_peers(lpni1, lpni2);
+ if (rc == 1) {
+ *best_lpni = lpni1;
+ return rc;
+ } else if (rc == -1) {
+ *best_lpni = lpni2;
return rc;
+ }
- if (r1->lr_seq - r2->lr_seq <= 0)
+ if (r1->lr_seq - r2->lr_seq <= 0) {
+ *best_lpni = lpni1;
return 1;
+ }
+ *best_lpni = lpni2;
return -1;
}
-static struct lnet_peer_ni *
+static struct lnet_route *
lnet_find_route_locked(struct lnet_net *net, __u32 remote_net,
- lnet_nid_t rtr_nid)
+ lnet_nid_t rtr_nid, struct lnet_route **prev_route,
+ struct lnet_peer_ni **gwni)
{
- struct lnet_remotenet *rnet;
- struct lnet_route *route;
- struct lnet_route *best_route;
- struct lnet_route *last_route;
- struct lnet_peer_ni *lpni_best;
- struct lnet_peer_ni *lp;
- int rc;
+ struct lnet_peer_ni *best_gw_ni = NULL;
+ struct lnet_route *best_route;
+ struct lnet_route *last_route;
+ struct lnet_remotenet *rnet;
+ struct lnet_peer *lp_best;
+ struct lnet_route *route;
+ struct lnet_peer *lp;
+ int rc;
/* If @rtr_nid is not LNET_NID_ANY, return the gateway with
* rtr_nid nid, otherwise find the best gateway I can use */
if (rnet == NULL)
return NULL;
- lpni_best = NULL;
+ lp_best = NULL;
best_route = last_route = NULL;
list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
lp = route->lr_gateway;
if (!lnet_is_route_alive(route))
continue;
- if (net != NULL && lp->lpni_net != net)
- continue;
-
- if (lp->lpni_nid == rtr_nid) /* it's pre-determined router */
- return lp;
-
- if (lpni_best == NULL) {
+ if (lp_best == NULL) {
best_route = last_route = route;
- lpni_best = lp;
- continue;
+ lp_best = lp;
}
/* no protection on below fields, but it's harmless */
if (last_route->lr_seq - route->lr_seq < 0)
last_route = route;
- rc = lnet_compare_routes(route, best_route);
+ rc = lnet_compare_routes(route, best_route, &best_gw_ni);
if (rc < 0)
continue;
best_route = route;
- lpni_best = lp;
+ lp_best = lp;
}
- /* set sequence number on the best router to the latest sequence + 1
- * so we can round-robin all routers, it's race and inaccurate but
- * harmless and functional */
- if (best_route != NULL)
- best_route->lr_seq = last_route->lr_seq + 1;
- return lpni_best;
+ *prev_route = last_route;
+ *gwni = best_gw_ni;
+
+ return best_route;
}
static struct lnet_ni *
#define SRC_ANY_ROUTER_NMR_DST (SRC_ANY | REMOTE_DST | NMR_DST)
static int
+lnet_handle_lo_send(struct lnet_send_data *sd)
+{
+ struct lnet_msg *msg = sd->sd_msg;
+ int cpt = sd->sd_cpt;
+
+ /* No send credit hassles with LOLND */
+ lnet_ni_addref_locked(the_lnet.ln_loni, cpt);
+ msg->msg_hdr.dest_nid = cpu_to_le64(the_lnet.ln_loni->ni_nid);
+ if (!msg->msg_routing)
+ msg->msg_hdr.src_nid =
+ cpu_to_le64(the_lnet.ln_loni->ni_nid);
+ msg->msg_target.nid = the_lnet.ln_loni->ni_nid;
+ lnet_msg_commit(msg, cpt);
+ msg->msg_txni = the_lnet.ln_loni;
+
+ return LNET_CREDIT_OK;
+}
+
+static int
lnet_handle_send(struct lnet_send_data *sd)
{
struct lnet_ni *best_ni = sd->sd_best_ni;
__u32 send_case = sd->sd_send_case;
int rc;
__u32 routing = send_case & REMOTE_DST;
+ struct lnet_rsp_tracker *rspt;
/*
* Increment sequence number of the selected peer so that we
* if we're not routing set the dest_nid to the best peer
* ni NID that we picked earlier in the algorithm.
*/
- msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
- }
-
- rc = lnet_post_send_locked(msg, 0);
-
- if (!rc)
- CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s\n",
- libcfs_nid2str(msg->msg_hdr.src_nid),
- libcfs_nid2str(msg->msg_txni->ni_nid),
- libcfs_nid2str(sd->sd_src_nid),
- libcfs_nid2str(msg->msg_hdr.dest_nid),
- libcfs_nid2str(sd->sd_dst_nid),
- libcfs_nid2str(msg->msg_txpeer->lpni_nid),
- lnet_msgtyp2str(msg->msg_type));
-
- return rc;
-}
-
-static struct lnet_peer_ni *
-lnet_select_peer_ni(struct lnet_send_data *sd, struct lnet_peer *peer,
- struct lnet_peer_net *peer_net)
-{
- /*
- * Look at the peer NIs for the destination peer that connect
- * to the chosen net. If a peer_ni is preferred when using the
- * best_ni to communicate, we use that one. If there is no
- * preferred peer_ni, or there are multiple preferred peer_ni,
- * the available transmit credits are used. If the transmit
- * credits are equal, we round-robin over the peer_ni.
- */
- struct lnet_peer_ni *lpni = NULL;
- struct lnet_peer_ni *best_lpni = NULL;
- struct lnet_ni *best_ni = sd->sd_best_ni;
- lnet_nid_t dst_nid = sd->sd_dst_nid;
- int best_lpni_credits = INT_MIN;
- bool preferred = false;
- bool ni_is_pref;
- int best_lpni_healthv = 0;
- int lpni_healthv;
-
- while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
- /*
- * if the best_ni we've chosen aleady has this lpni
- * preferred, then let's use it
- */
- ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
- best_ni->ni_nid);
-
- lpni_healthv = atomic_read(&lpni->lpni_healthv);
-
- CDEBUG(D_NET, "%s ni_is_pref = %d\n",
- libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
-
- if (best_lpni)
- CDEBUG(D_NET, "%s c:[%d, %d], s:[%d, %d]\n",
- libcfs_nid2str(lpni->lpni_nid),
- lpni->lpni_txcredits, best_lpni_credits,
- lpni->lpni_seq, best_lpni->lpni_seq);
-
- /* pick the healthiest peer ni */
- if (lpni_healthv < best_lpni_healthv) {
- continue;
- } else if (lpni_healthv > best_lpni_healthv) {
- best_lpni_healthv = lpni_healthv;
- /* if this is a preferred peer use it */
- } else if (!preferred && ni_is_pref) {
- preferred = true;
- } else if (preferred && !ni_is_pref) {
- /*
- * this is not the preferred peer so let's ignore
- * it.
- */
- continue;
- } else if (lpni->lpni_txcredits < best_lpni_credits) {
- /*
- * We already have a peer that has more credits
- * available than this one. No need to consider
- * this peer further.
- */
- continue;
- } else if (lpni->lpni_txcredits == best_lpni_credits) {
- /*
- * The best peer found so far and the current peer
- * have the same number of available credits let's
- * make sure to select between them using Round
- * Robin
- */
- if (best_lpni) {
- if (best_lpni->lpni_seq <= lpni->lpni_seq)
- continue;
- }
- }
-
- best_lpni = lpni;
- best_lpni_credits = lpni->lpni_txcredits;
- }
-
- /* if we still can't find a peer ni then we can't reach it */
- if (!best_lpni) {
- __u32 net_id = (peer_net) ? peer_net->lpn_net_id :
- LNET_NIDNET(dst_nid);
- CDEBUG(D_NET, "no peer_ni found on peer net %s\n",
- libcfs_net2str(net_id));
- return NULL;
- }
-
- CDEBUG(D_NET, "sd_best_lpni = %s\n",
- libcfs_nid2str(best_lpni->lpni_nid));
-
- return best_lpni;
-}
-
-/*
- * Prerequisite: the best_ni should already be set in the sd
- */
-static inline struct lnet_peer_ni *
-lnet_find_best_lpni_on_net(struct lnet_send_data *sd, struct lnet_peer *peer,
- __u32 net_id)
-{
- struct lnet_peer_net *peer_net;
-
- /*
- * The gateway is Multi-Rail capable so now we must select the
- * proper peer_ni
- */
- peer_net = lnet_peer_get_net_locked(peer, net_id);
-
- if (!peer_net) {
- CERROR("gateway peer %s has no NI on net %s\n",
- libcfs_nid2str(peer->lp_primary_nid),
- libcfs_net2str(net_id));
- return NULL;
+ msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
+ }
+
+ /*
+ * if we have response tracker block update it with the next hop
+ * nid
+ */
+ if (msg->msg_md) {
+ rspt = msg->msg_md->md_rspt_ptr;
+ if (rspt) {
+ rspt->rspt_next_hop_nid = msg->msg_txpeer->lpni_nid;
+ CDEBUG(D_NET, "rspt_next_hop_nid = %s\n",
+ libcfs_nid2str(rspt->rspt_next_hop_nid));
+ }
}
- return lnet_select_peer_ni(sd, peer, peer_net);
+ rc = lnet_post_send_locked(msg, 0);
+
+ if (!rc)
+ CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s try# %d\n",
+ libcfs_nid2str(msg->msg_hdr.src_nid),
+ libcfs_nid2str(msg->msg_txni->ni_nid),
+ libcfs_nid2str(sd->sd_src_nid),
+ libcfs_nid2str(msg->msg_hdr.dest_nid),
+ libcfs_nid2str(sd->sd_dst_nid),
+ libcfs_nid2str(msg->msg_txpeer->lpni_nid),
+ lnet_msgtyp2str(msg->msg_type), msg->msg_retry_count);
+
+ return rc;
}
static inline void
sd->sd_best_ni->ni_net->net_id);
}
- if (sd->sd_best_lpni)
+ if (sd->sd_best_lpni &&
+ sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid)
+ return lnet_handle_lo_send(sd);
+ else if (sd->sd_best_lpni)
return lnet_handle_send(sd);
CERROR("can't send to %s. no NI on %s\n",
}
static int
+lnet_initiate_peer_discovery(struct lnet_peer_ni *lpni,
+ struct lnet_msg *msg, lnet_nid_t rtr_nid,
+ int cpt)
+{
+ struct lnet_peer *peer;
+ lnet_nid_t primary_nid;
+ int rc;
+
+ lnet_peer_ni_addref_locked(lpni);
+
+ peer = lpni->lpni_peer_net->lpn_peer;
+
+ if (lnet_peer_gw_discovery(peer)) {
+ lnet_peer_ni_decref_locked(lpni);
+ return 0;
+ }
+
+ if (!lnet_msg_discovery(msg) || lnet_peer_is_uptodate(peer)) {
+ lnet_peer_ni_decref_locked(lpni);
+ return 0;
+ }
+
+ rc = lnet_discover_peer_locked(lpni, cpt, false);
+ if (rc) {
+ lnet_peer_ni_decref_locked(lpni);
+ return rc;
+ }
+ /* The peer may have changed. */
+ peer = lpni->lpni_peer_net->lpn_peer;
+ /* queue message and return */
+ msg->msg_rtr_nid_param = rtr_nid;
+ msg->msg_sending = 0;
+ msg->msg_txpeer = NULL;
+ spin_lock(&peer->lp_lock);
+ list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
+ spin_unlock(&peer->lp_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ primary_nid = peer->lp_primary_nid;
+
+ CDEBUG(D_NET, "msg %p delayed. %s pending discovery\n",
+ msg, libcfs_nid2str(primary_nid));
+
+ return LNET_DC_WAIT;
+}
+
+static int
lnet_handle_find_routed_path(struct lnet_send_data *sd,
lnet_nid_t dst_nid,
struct lnet_peer_ni **gw_lpni,
struct lnet_peer **gw_peer)
{
- struct lnet_peer_ni *gw;
+ int rc;
+ struct lnet_peer *gw;
+ struct lnet_peer *lp;
+ struct lnet_peer_net *lpn;
+ struct lnet_peer_net *best_lpn = NULL;
+ struct lnet_remotenet *rnet;
+ struct lnet_route *best_route;
+ struct lnet_route *last_route;
+ struct lnet_peer_ni *lpni = NULL;
+ struct lnet_peer_ni *gwni = NULL;
lnet_nid_t src_nid = sd->sd_src_nid;
- gw = lnet_find_route_locked(NULL, LNET_NIDNET(dst_nid),
- sd->sd_rtr_nid);
- if (!gw) {
+ /* we've already looked up the initial lpni using dst_nid */
+ lpni = sd->sd_best_lpni;
+ /* the peer tree must be in existence */
+ LASSERT(lpni && lpni->lpni_peer_net && lpni->lpni_peer_net->lpn_peer);
+ lp = lpni->lpni_peer_net->lpn_peer;
+
+ list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
+ /* is this remote network reachable? */
+ rnet = lnet_find_rnet_locked(lpn->lpn_net_id);
+ if (!rnet)
+ continue;
+
+ if (!best_lpn)
+ best_lpn = lpn;
+
+ if (best_lpn->lpn_seq <= lpn->lpn_seq)
+ continue;
+
+ best_lpn = lpn;
+ }
+
+ if (!best_lpn) {
+ CERROR("peer %s has no available nets \n",
+ libcfs_nid2str(sd->sd_dst_nid));
+ return -EHOSTUNREACH;
+ }
+
+ sd->sd_best_lpni = lnet_find_best_lpni_on_net(sd, lp, best_lpn->lpn_net_id);
+ if (!sd->sd_best_lpni) {
+ CERROR("peer %s down\n", libcfs_nid2str(sd->sd_dst_nid));
+ return -EHOSTUNREACH;
+ }
+
+ best_route = lnet_find_route_locked(NULL, best_lpn->lpn_net_id,
+ sd->sd_rtr_nid, &last_route,
+ &gwni);
+ if (!best_route) {
CERROR("no route to %s from %s\n",
libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
return -EHOSTUNREACH;
}
- /* get the peer of the gw_ni */
- LASSERT(gw->lpni_peer_net);
- LASSERT(gw->lpni_peer_net->lpn_peer);
+ if (!gwni) {
+ CERROR("Internal Error. Route expected to %s from %s\n",
+ libcfs_nid2str(dst_nid),
+ libcfs_nid2str(src_nid));
+ return -EFAULT;
+ }
+
+ gw = best_route->lr_gateway;
+ LASSERT(gw == gwni->lpni_peer_net->lpn_peer);
- *gw_peer = gw->lpni_peer_net->lpn_peer;
+ /*
+ * Discover this gateway if it hasn't already been discovered.
+ * This means we might delay the message until discovery has
+ * completed
+ */
+ sd->sd_msg->msg_src_nid_param = sd->sd_src_nid;
+ rc = lnet_initiate_peer_discovery(gwni, sd->sd_msg, sd->sd_rtr_nid,
+ sd->sd_cpt);
+ if (rc)
+ return rc;
if (!sd->sd_best_ni)
- sd->sd_best_ni = lnet_find_best_ni_on_spec_net(NULL, *gw_peer,
- gw->lpni_peer_net,
+ sd->sd_best_ni = lnet_find_best_ni_on_spec_net(NULL, gw,
+ lnet_peer_get_net_locked(gw,
+ best_route->lr_lnet),
sd->sd_md_cpt,
true);
if (!sd->sd_best_ni) {
CERROR("Internal Error. Expected local ni on %s "
"but non found :%s\n",
- libcfs_net2str(gw->lpni_peer_net->lpn_net_id),
+ libcfs_net2str(best_route->lr_lnet),
libcfs_nid2str(sd->sd_src_nid));
return -EFAULT;
}
+ *gw_lpni = gwni;
+ *gw_peer = gw;
+
/*
- * if gw is MR let's find its best peer_ni
+ * increment the sequence numbers since now we're sure we're
+ * going to use this path
*/
- if (lnet_peer_is_multi_rail(*gw_peer)) {
- gw = lnet_find_best_lpni_on_net(sd, *gw_peer,
- sd->sd_best_ni->ni_net->net_id);
- /*
- * We've already verified that the gw has an NI on that
- * desired net, but we're not finding it. Something is
- * wrong.
- */
- if (!gw) {
- CERROR("Internal Error. Route expected to %s from %s\n",
- libcfs_nid2str(dst_nid),
- libcfs_nid2str(src_nid));
- return -EFAULT;
- }
- }
-
- *gw_lpni = gw;
+ LASSERT(best_route && last_route);
+ best_route->lr_seq = last_route->lr_seq + 1;
+ best_lpn->lpn_seq++;
return 0;
}
rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
&gw_peer);
- if (rc < 0)
+ if (rc)
return rc;
if (sd->sd_send_case & NMR_DST)
}
struct lnet_ni *
-lnet_find_best_ni_on_local_net(struct lnet_peer *peer, int md_cpt)
+lnet_find_best_ni_on_local_net(struct lnet_peer *peer, int md_cpt,
+ bool discovery)
{
struct lnet_peer_net *peer_net = NULL;
struct lnet_ni *best_ni = NULL;
continue;
best_ni = lnet_find_best_ni_on_spec_net(best_ni, peer,
peer_net, md_cpt, false);
+
+ /*
+ * if this is a discovery message and lp_disc_net_id is
+ * specified then use that net to send the discovery on.
+ */
+ if (peer->lp_disc_net_id == peer_net->lpn_net_id &&
+ discovery)
+ break;
}
if (best_ni)
* networks.
*/
sd->sd_best_ni = lnet_find_best_ni_on_local_net(sd->sd_peer,
- sd->sd_md_cpt);
+ sd->sd_md_cpt,
+ lnet_msg_discovery(sd->sd_msg));
if (sd->sd_best_ni) {
sd->sd_best_lpni =
lnet_find_best_lpni_on_net(sd, sd->sd_peer,
* try and see if we can reach it over another routed
* network
*/
- if (sd->sd_best_lpni) {
+ if (sd->sd_best_lpni &&
+ sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid) {
+ /*
+ * in case we initially started with a routed
+ * destination, let's reset to local
+ */
+ sd->sd_send_case &= ~REMOTE_DST;
+ sd->sd_send_case |= LOCAL_DST;
+ return lnet_handle_lo_send(sd);
+ } else if (sd->sd_best_lpni) {
/*
* in case we initially started with a routed
* destination, let's reset to local
"No route available\n",
libcfs_nid2str(sd->sd_dst_nid));
return -EHOSTUNREACH;
+ } else if (rc > 0) {
+ return rc;
}
sd->sd_best_lpni = gw;
return rc;
/*
- * TODO; One possible enhancement is to run the selection
- * algorithm on the peer. However for remote peers the credits are
- * not decremented, so we'll be basically going over the peer NIs
- * in round robin. An MR router will run the selection algorithm
- * on the next-hop interfaces.
+ * Now that we must route to the destination, we must consider the
+ * MR case, where the destination has multiple interfaces, some of
+ * which we can route to and others we do not. For this reason we
+ * need to select the destination which we can route to and if
+ * there are multiple, we need to round robin.
*/
rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
&gw_peer);
- if (rc < 0)
+ if (rc)
return rc;
sd->sd_send_case &= ~LOCAL_DST;
*/
rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
&gw_peer);
- if (rc < 0)
+ if (rc)
return rc;
/*
* is no need to go through any selection. We can just shortcut
* the entire process and send over lolnd
*/
+ send_data.sd_msg = msg;
+ send_data.sd_cpt = cpt;
if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) {
- /* No send credit hassles with LOLND */
- lnet_ni_addref_locked(the_lnet.ln_loni, cpt);
- msg->msg_hdr.dest_nid = cpu_to_le64(the_lnet.ln_loni->ni_nid);
- if (!msg->msg_routing)
- msg->msg_hdr.src_nid =
- cpu_to_le64(the_lnet.ln_loni->ni_nid);
- msg->msg_target.nid = the_lnet.ln_loni->ni_nid;
- lnet_msg_commit(msg, cpt);
- msg->msg_txni = the_lnet.ln_loni;
+ rc = lnet_handle_lo_send(&send_data);
lnet_net_unlock(cpt);
-
- return LNET_CREDIT_OK;
+ return rc;
}
/*
* trigger discovery.
*/
peer = lpni->lpni_peer_net->lpn_peer;
- if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) {
- lnet_nid_t primary_nid;
- rc = lnet_discover_peer_locked(lpni, cpt, false);
- if (rc) {
- lnet_peer_ni_decref_locked(lpni);
- lnet_net_unlock(cpt);
- return rc;
- }
- /* The peer may have changed. */
- peer = lpni->lpni_peer_net->lpn_peer;
- /* queue message and return */
- msg->msg_rtr_nid_param = rtr_nid;
- msg->msg_sending = 0;
- list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
+ rc = lnet_initiate_peer_discovery(lpni, msg, rtr_nid, cpt);
+ if (rc) {
lnet_peer_ni_decref_locked(lpni);
- primary_nid = peer->lp_primary_nid;
lnet_net_unlock(cpt);
-
- CDEBUG(D_NET, "%s pending discovery\n",
- libcfs_nid2str(primary_nid));
-
- return LNET_DC_WAIT;
+ return rc;
}
lnet_peer_ni_decref_locked(lpni);
send_case |= SND_RESP;
/* assign parameters to the send_data */
- send_data.sd_msg = msg;
send_data.sd_rtr_nid = rtr_nid;
send_data.sd_src_nid = src_nid;
send_data.sd_dst_nid = dst_nid;
send_data.sd_final_dst_lpni = lpni;
send_data.sd_peer = peer;
send_data.sd_md_cpt = md_cpt;
- send_data.sd_cpt = cpt;
send_data.sd_send_case = send_case;
rc = lnet_handle_send_case_locked(&send_data);
+ /*
+ * Update the local cpt since send_data.sd_cpt might've been
+ * updated as a result of calling lnet_handle_send_case_locked().
+ */
+ cpt = send_data.sd_cpt;
+
if (rc == REPEAT_SEND)
goto again;
- lnet_net_unlock(send_data.sd_cpt);
+ lnet_net_unlock(cpt);
return rc;
}
LASSERT(!msg->msg_tx_committed);
rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid);
- if (rc < 0)
+ if (rc < 0) {
+ if (rc == -EHOSTUNREACH)
+ msg->msg_health_status = LNET_MSG_STATUS_REMOTE_ERROR;
+ else
+ msg->msg_health_status = LNET_MSG_STATUS_LOCAL_ERROR;
return rc;
+ }
if (rc == LNET_CREDIT_OK)
lnet_ni_send(msg->msg_txni, msg);
lnet_nid_t mt_nid;
};
+/* called with res_lock held */
void
lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
{
* The rspt queue for the cpt is protected by
* the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
*/
- lnet_res_lock(cpt);
- if (!md->md_rspt_ptr) {
- lnet_res_unlock(cpt);
+ if (!md->md_rspt_ptr)
return;
- }
+
rspt = md->md_rspt_ptr;
- md->md_rspt_ptr = NULL;
/* debug code */
LASSERT(rspt->rspt_cpt == cpt);
- /*
- * invalidate the handle to indicate that a response has been
- * received, which will then lead the monitor thread to clean up
- * the rspt block.
- */
- LNetInvalidateMDHandle(&rspt->rspt_mdh);
- lnet_res_unlock(cpt);
+ md->md_rspt_ptr = NULL;
+
+ if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
+ /*
+ * The monitor thread has invalidated this handle because the
+ * response timed out, but it failed to lookup the MD. That
+ * means this response tracker is on the zombie list. We can
+ * safely remove it under the resource lock (held by caller) and
+ * free the response tracker block.
+ */
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, cpt);
+ } else {
+ /*
+ * invalidate the handle to indicate that a response has been
+ * received, which will then lead the monitor thread to clean up
+ * the rspt block.
+ */
+ LNetInvalidateMDHandle(&rspt->rspt_mdh);
+ }
+}
+
+void
+lnet_clean_zombie_rstqs(void)
+{
+ struct lnet_rsp_tracker *rspt, *tmp;
+ int i;
+
+ cfs_cpt_for_each(i, lnet_cpt_table()) {
+ list_for_each_entry_safe(rspt, tmp,
+ the_lnet.ln_mt_zombie_rstqs[i],
+ rspt_on_list) {
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, i);
+ }
+ }
+
+ cfs_percpt_free(the_lnet.ln_mt_zombie_rstqs);
}
static void
-lnet_finalize_expired_responses(bool force)
+lnet_finalize_expired_responses(void)
{
struct lnet_libmd *md;
struct list_head local_queue;
struct lnet_rsp_tracker *rspt, *tmp;
+ ktime_t now;
int i;
if (the_lnet.ln_mt_rstq == NULL)
list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
lnet_net_unlock(i);
+ now = ktime_get();
+
list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
/*
* The rspt mdh will be invalidated when a response
lnet_res_lock(i);
if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
lnet_res_unlock(i);
- list_del_init(&rspt->rspt_on_list);
+ list_del(&rspt->rspt_on_list);
lnet_rspt_free(rspt, i);
continue;
}
- if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 ||
- force) {
+ if (ktime_compare(now, rspt->rspt_deadline) >= 0 ||
+ the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN) {
+ struct lnet_peer_ni *lpni;
+ lnet_nid_t nid;
+
md = lnet_handle2md(&rspt->rspt_mdh);
if (!md) {
+ /* MD has been queued for unlink, but
+ * rspt hasn't been detached (Note we've
+ * checked above that the rspt_mdh is
+ * valid). Since we cannot lookup the MD
+ * we're unable to detach the rspt
+ * ourselves. Thus, move the rspt to the
+ * zombie list where we'll wait for
+ * either:
+ * 1. The remaining operations on the
+ * MD to complete. In this case the
+ * final operation will result in
+ * lnet_msg_detach_md()->
+ * lnet_detach_rsp_tracker() where
+ * we will clean up this response
+ * tracker.
+ * 2. LNet to shutdown. In this case
+ * we'll wait until after all LND Nets
+ * have shutdown and then we can
+ * safely free any remaining response
+ * tracker blocks on the zombie list.
+ * Note: We need to hold the resource
+ * lock when adding to the zombie list
+ * because we may have concurrent access
+ * with lnet_detach_rsp_tracker().
+ */
LNetInvalidateMDHandle(&rspt->rspt_mdh);
+ list_move(&rspt->rspt_on_list,
+ the_lnet.ln_mt_zombie_rstqs[i]);
lnet_res_unlock(i);
- list_del_init(&rspt->rspt_on_list);
- lnet_rspt_free(rspt, i);
continue;
}
LASSERT(md->md_rspt_ptr == rspt);
md->md_rspt_ptr = NULL;
lnet_res_unlock(i);
+ LNetMDUnlink(rspt->rspt_mdh);
+
+ nid = rspt->rspt_next_hop_nid;
+
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, i);
+
+ /* If we're shutting down we just want to clean
+ * up the rspt blocks
+ */
+ if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
+ continue;
+
lnet_net_lock(i);
- the_lnet.ln_counters[i]->response_timeout_count++;
+ the_lnet.ln_counters[i]->lct_health.lch_response_timeout_count++;
lnet_net_unlock(i);
- list_del_init(&rspt->rspt_on_list);
+ CDEBUG(D_NET,
+ "Response timeout: md = %p: nid = %s\n",
+ md, libcfs_nid2str(nid));
- CDEBUG(D_NET, "Response timed out: md = %p\n", md);
- LNetMDUnlink(rspt->rspt_mdh);
- lnet_rspt_free(rspt, i);
+ /*
+ * If there is a timeout on the response
+ * from the next hop decrement its health
+ * value so that we don't use it
+ */
+ lnet_net_lock(0);
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (lpni) {
+ lnet_handle_remote_failure_locked(lpni);
+ lnet_peer_ni_decref_locked(lpni);
+ }
+ lnet_net_unlock(0);
} else {
lnet_res_unlock(i);
break;
}
}
- lnet_net_lock(i);
- if (!list_empty(&local_queue))
+ if (!list_empty(&local_queue)) {
+ lnet_net_lock(i);
list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
- lnet_net_unlock(i);
+ lnet_net_unlock(i);
+ }
}
}
lnet_peer_ni_decref_locked(lpni);
lnet_net_unlock(cpt);
- CDEBUG(D_NET, "resending %s->%s: %s recovery %d\n",
+ CDEBUG(D_NET, "resending %s->%s: %s recovery %d try# %d\n",
libcfs_nid2str(src_nid),
libcfs_id2str(msg->msg_target),
lnet_msgtyp2str(msg->msg_type),
- msg->msg_recovery);
+ msg->msg_recovery,
+ msg->msg_retry_count);
rc = lnet_send(src_nid, msg, LNET_NID_ANY);
if (rc) {
CERROR("Error sending %s to %s: %d\n",
}
lnet_net_lock(cpt);
if (!rc)
- the_lnet.ln_counters[cpt]->resend_count++;
+ the_lnet.ln_counters[cpt]->lct_health.lch_resend_count++;
}
}
}
/* called with cpt and ni_lock held */
static void
-lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt)
+lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt, bool force)
{
struct lnet_handle_md recovery_mdh;
LNetInvalidateMDHandle(&recovery_mdh);
- if (ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING) {
+ if (ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING ||
+ force) {
recovery_mdh = ni->ni_ping_mdh;
LNetInvalidateMDHandle(&ni->ni_ping_mdh);
}
lnet_net_lock(0);
lnet_ni_lock(ni);
- if (!(ni->ni_state & LNET_NI_STATE_ACTIVE) ||
+ if (ni->ni_state != LNET_NI_STATE_ACTIVE ||
healthv == LNET_MAX_HEALTH_VALUE) {
list_del_init(&ni->ni_recovery);
- lnet_unlink_ni_recovery_mdh_locked(ni, 0);
+ lnet_unlink_ni_recovery_mdh_locked(ni, 0, false);
lnet_ni_unlock(ni);
lnet_ni_decref_locked(ni, 0);
lnet_net_unlock(0);
continue;
}
+
+ /*
+ * if the local NI failed recovery we must unlink the md.
+ * But we want to keep the local_ni on the recovery queue
+ * so we can continue the attempts to recover it.
+ */
+ if (ni->ni_recovery_state & LNET_NI_RECOVERY_FAILED) {
+ lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
+ ni->ni_recovery_state &= ~LNET_NI_RECOVERY_FAILED;
+ }
+
lnet_ni_unlock(ni);
lnet_net_unlock(0);
libcfs_nid2str(ni->ni_nid));
lnet_ni_lock(ni);
- if (!(ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING)) {
- ni->ni_state |= LNET_NI_STATE_RECOVERY_PENDING;
+ if (!(ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING)) {
+ ni->ni_recovery_state |= LNET_NI_RECOVERY_PENDING;
lnet_ni_unlock(ni);
LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
CERROR("out of memory. Can't recover %s\n",
libcfs_nid2str(ni->ni_nid));
lnet_ni_lock(ni);
- ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+ ni->ni_recovery_state &=
+ ~LNET_NI_RECOVERY_PENDING;
lnet_ni_unlock(ni);
continue;
}
lnet_ni_lock(ni);
if (rc)
- ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+ ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
}
lnet_ni_unlock(ni);
}
lnet_net_unlock(0);
}
-static struct list_head **
-lnet_create_array_of_queues(void)
-{
- struct list_head **qs;
- struct list_head *q;
- int i;
-
- qs = cfs_percpt_alloc(lnet_cpt_table(),
- sizeof(struct list_head));
- if (!qs) {
- CERROR("Failed to allocate queues\n");
- return NULL;
- }
-
- cfs_percpt_for_each(q, i, qs)
- INIT_LIST_HEAD(q);
-
- return qs;
-}
-
static int
lnet_resendqs_create(void)
{
struct lnet_ni, ni_recovery);
list_del_init(&ni->ni_recovery);
lnet_ni_lock(ni);
- lnet_unlink_ni_recovery_mdh_locked(ni, 0);
+ lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
lnet_ni_unlock(ni);
lnet_ni_decref_locked(ni, 0);
}
}
static void
-lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt)
+lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt,
+ bool force)
{
struct lnet_handle_md recovery_mdh;
LNetInvalidateMDHandle(&recovery_mdh);
- if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) {
+ if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING || force) {
recovery_mdh = lpni->lpni_recovery_ping_mdh;
LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
}
lpni_recovery) {
list_del_init(&lpni->lpni_recovery);
spin_lock(&lpni->lpni_lock);
- lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX);
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX, true);
spin_unlock(&lpni->lpni_lock);
lnet_peer_ni_decref_locked(lpni);
}
if (lpni->lpni_state & LNET_PEER_NI_DELETING ||
healthv == LNET_MAX_HEALTH_VALUE) {
list_del_init(&lpni->lpni_recovery);
- lnet_unlink_lpni_recovery_mdh_locked(lpni, 0);
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, false);
spin_unlock(&lpni->lpni_lock);
lnet_peer_ni_decref_locked(lpni);
lnet_net_unlock(0);
continue;
}
+
+ /*
+ * If the peer NI has failed recovery we must unlink the
+ * md. But we want to keep the peer ni on the recovery
+ * queue so we can try to continue recovering it
+ */
+ if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_FAILED) {
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, true);
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_FAILED;
+ }
+
spin_unlock(&lpni->lpni_lock);
lnet_net_unlock(0);
static int
lnet_monitor_thread(void *arg)
{
- int wakeup_counter = 0;
+ time64_t recovery_timeout = 0;
+ time64_t rsp_timeout = 0;
+ int interval;
+ time64_t now;
+ wait_for_completion(&the_lnet.ln_started);
/*
* The monitor thread takes care of the following:
* 1. Checks the aliveness of routers
cfs_block_allsigs();
while (the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING) {
+ now = ktime_get_real_seconds();
+
if (lnet_router_checker_active())
lnet_check_routers();
lnet_resend_pending_msgs();
- wakeup_counter++;
- if (wakeup_counter >= lnet_transaction_timeout / 2) {
- lnet_finalize_expired_responses(false);
- wakeup_counter = 0;
+ if (now >= rsp_timeout) {
+ lnet_finalize_expired_responses();
+ rsp_timeout = now + (lnet_transaction_timeout / 2);
}
- lnet_recover_local_nis();
-
- lnet_recover_peer_nis();
+ if (now >= recovery_timeout) {
+ lnet_recover_local_nis();
+ lnet_recover_peer_nis();
+ recovery_timeout = now + lnet_recovery_interval;
+ }
/*
* TODO do we need to check if we should sleep without
* if we wake up every 1 second? Although, we've seen
* cases where we get a complaint that an idle thread
* is waking up unnecessarily.
+ *
+ * Take into account the current net_count when you wake
+ * up for alive router checking, since we need to check
+ * possibly as many networks as we have configured.
+ */
+ interval = min(lnet_recovery_interval,
+ min((unsigned int) alive_router_check_interval /
+ lnet_current_net_count,
+ lnet_transaction_timeout / 2));
+ wait_for_completion_interruptible_timeout(
+ &the_lnet.ln_mt_wait_complete,
+ cfs_time_seconds(interval));
+ /* Must re-init the completion before testing anything,
+ * including ln_mt_state.
*/
- wait_event_interruptible_timeout(the_lnet.ln_mt_waitq,
- false,
- cfs_time_seconds(1));
+ reinit_completion(&the_lnet.ln_mt_wait_complete);
}
- /* clean up the router checker */
- lnet_prune_rc_data(1);
-
/* Shutting down */
+ lnet_net_lock(LNET_LOCK_EX);
the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+ lnet_net_unlock(LNET_LOCK_EX);
/* signal that the monitor thread is exiting */
up(&the_lnet.ln_mt_signal);
static void
lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
- int status)
+ int status, bool unlink_event)
{
lnet_nid_t nid = ev_info->mt_nid;
return;
}
lnet_ni_lock(ni);
- ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+ ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
+ if (status)
+ ni->ni_recovery_state |= LNET_NI_RECOVERY_FAILED;
lnet_ni_unlock(ni);
lnet_net_unlock(0);
if (status != 0) {
- CERROR("local NI recovery failed with %d\n", status);
+ CERROR("local NI (%s) recovery failed with %d\n",
+ libcfs_nid2str(nid), status);
return;
}
/*
* carry forward too much information.
* In the peer case, it'll naturally be incremented
*/
- lnet_inc_healthv(&ni->ni_healthv);
+ if (!unlink_event)
+ lnet_inc_healthv(&ni->ni_healthv);
} else {
struct lnet_peer_ni *lpni;
int cpt;
}
spin_lock(&lpni->lpni_lock);
lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+ if (status)
+ lpni->lpni_state |= LNET_PEER_NI_RECOVERY_FAILED;
spin_unlock(&lpni->lpni_lock);
lnet_peer_ni_decref_locked(lpni);
lnet_net_unlock(cpt);
if (status != 0)
- CERROR("peer NI recovery failed with %d\n", status);
+ CERROR("peer NI (%s) recovery failed with %d\n",
+ libcfs_nid2str(nid), status);
}
}
-static void
+void
lnet_mt_event_handler(struct lnet_event *event)
{
struct lnet_mt_event_info *ev_info = event->md.user_ptr;
case LNET_EVENT_UNLINK:
CDEBUG(D_NET, "%s recovery ping unlinked\n",
libcfs_nid2str(ev_info->mt_nid));
+ /* fallthrough */
case LNET_EVENT_REPLY:
- lnet_handle_recovery_reply(ev_info, event->status);
+ lnet_handle_recovery_reply(ev_info, event->status,
+ event->type == LNET_EVENT_UNLINK);
break;
case LNET_EVENT_SEND:
CDEBUG(D_NET, "%s recovery message sent %s:%d\n",
static void
lnet_rsp_tracker_clean(void)
{
- lnet_finalize_expired_responses(true);
+ lnet_finalize_expired_responses();
cfs_percpt_free(the_lnet.ln_mt_rstq);
the_lnet.ln_mt_rstq = NULL;
if (rc)
goto clean_queues;
- rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh);
- if (rc != 0) {
- CERROR("Can't allocate monitor thread EQ: %d\n", rc);
- goto clean_queues;
- }
-
- /* Pre monitor thread start processing */
- rc = lnet_router_pre_mt_start();
- if (rc)
- goto free_mem;
-
sema_init(&the_lnet.ln_mt_signal, 0);
+ lnet_net_lock(LNET_LOCK_EX);
the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING;
+ lnet_net_unlock(LNET_LOCK_EX);
task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread");
if (IS_ERR(task)) {
rc = PTR_ERR(task);
goto clean_thread;
}
- /* post monitor thread start processing */
- lnet_router_post_mt_start();
-
return 0;
clean_thread:
+ lnet_net_lock(LNET_LOCK_EX);
the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
+ lnet_net_unlock(LNET_LOCK_EX);
/* block until event callback signals exit */
down(&the_lnet.ln_mt_signal);
/* clean up */
- lnet_router_cleanup();
-free_mem:
+ lnet_net_lock(LNET_LOCK_EX);
the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+ lnet_net_unlock(LNET_LOCK_EX);
lnet_rsp_tracker_clean();
lnet_clean_local_ni_recoveryq();
lnet_clean_peer_ni_recoveryq();
lnet_clean_resendqs();
- LNetEQFree(the_lnet.ln_mt_eqh);
LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
return rc;
clean_queues:
void lnet_monitor_thr_stop(void)
{
- int rc;
-
if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
return;
LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
+ lnet_net_lock(LNET_LOCK_EX);
the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
+ lnet_net_unlock(LNET_LOCK_EX);
/* tell the monitor thread that we're shutting down */
- wake_up(&the_lnet.ln_mt_waitq);
+ complete(&the_lnet.ln_mt_wait_complete);
/* block until monitor thread signals that it's done */
down(&the_lnet.ln_mt_signal);
LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN);
/* perform cleanup tasks */
- lnet_router_cleanup();
lnet_rsp_tracker_clean();
lnet_clean_local_ni_recoveryq();
lnet_clean_peer_ni_recoveryq();
lnet_clean_resendqs();
- rc = LNetEQFree(the_lnet.ln_mt_eqh);
- LASSERT(rc == 0);
+
return;
}
{
lnet_net_lock(cpt);
lnet_incr_stats(&ni->ni_stats, msg_type, LNET_STATS_TYPE_DROP);
- the_lnet.ln_counters[cpt]->drop_count++;
- the_lnet.ln_counters[cpt]->drop_length += nob;
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length += nob;
lnet_net_unlock(cpt);
lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
info.mi_rlength = hdr->payload_length;
info.mi_roffset = hdr->msg.put.offset;
info.mi_mbits = hdr->msg.put.match_bits;
- info.mi_cpt = lnet_cpt_of_nid(msg->msg_rxpeer->lpni_nid, ni);
+ info.mi_cpt = lnet_cpt_of_nid(msg->msg_initiator, ni);
msg->msg_rx_ready_delay = ni->ni_net->net_lnd->lnd_eager_recv == NULL;
ready_delay = msg->msg_rx_ready_delay;
info.mi_rlength = hdr->msg.get.sink_length;
info.mi_roffset = hdr->msg.get.src_offset;
info.mi_mbits = hdr->msg.get.match_bits;
- info.mi_cpt = lnet_cpt_of_nid(msg->msg_rxpeer->lpni_nid, ni);
+ info.mi_cpt = lnet_cpt_of_nid(msg->msg_initiator, ni);
rc = lnet_ptl_match_md(&info, msg);
if (rc == LNET_MATCHMD_DROP) {
lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
msg->msg_receiving = 0;
- rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
+ rc = lnet_send(ni->ni_nid, msg, msg->msg_from);
if (rc < 0) {
/* didn't get as far as lnet_ni_send() */
CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
void *private, int rdma_req)
{
- int rc = 0;
- int cpt;
- int for_me;
- struct lnet_msg *msg;
- lnet_pid_t dest_pid;
- lnet_nid_t dest_nid;
- lnet_nid_t src_nid;
struct lnet_peer_ni *lpni;
- __u32 payload_length;
- __u32 type;
+ struct lnet_msg *msg;
+ __u32 payload_length;
+ lnet_pid_t dest_pid;
+ lnet_nid_t dest_nid;
+ lnet_nid_t src_nid;
+ bool push = false;
+ int for_me;
+ __u32 type;
+ int rc = 0;
+ int cpt;
LASSERT (!in_interrupt ());
}
if (the_lnet.ln_routing &&
- ni->ni_last_alive != ktime_get_real_seconds()) {
- /* NB: so far here is the only place to set NI status to "up */
+ ni->ni_net->net_last_alive != ktime_get_real_seconds()) {
lnet_ni_lock(ni);
- ni->ni_last_alive = ktime_get_real_seconds();
+ spin_lock(&ni->ni_net->net_lock);
+ ni->ni_net->net_last_alive = ktime_get_real_seconds();
+ spin_unlock(&ni->ni_net->net_lock);
if (ni->ni_status != NULL &&
- ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
+ ni->ni_status->ns_status == LNET_NI_STATUS_DOWN) {
ni->ni_status->ns_status = LNET_NI_STATUS_UP;
+ push = true;
+ }
lnet_ni_unlock(ni);
}
+ if (push)
+ lnet_push_update_to_peers(1);
+
/* Regard a bad destination NID as a protocol error. Senders should
* know what they're doing; if they don't they're misconfigured, buggy
* or malicious so we chop them off at the knees :) */
}
if (!list_empty(&the_lnet.ln_drop_rules) &&
- lnet_drop_rule_match(hdr, NULL)) {
+ lnet_drop_rule_match(hdr, ni->ni_nid, NULL)) {
CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate"
"silent message loss\n",
libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
goto drop;
}
+ if (lnet_drop_asym_route && for_me &&
+ LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
+ struct lnet_net *net;
+ struct lnet_remotenet *rnet;
+ bool found = true;
+
+ /* we are dealing with a routed message,
+ * so see if route to reach src_nid goes through from_nid
+ */
+ lnet_net_lock(cpt);
+ net = lnet_get_net_locked(LNET_NIDNET(ni->ni_nid));
+ if (!net) {
+ lnet_net_unlock(cpt);
+ CERROR("net %s not found\n",
+ libcfs_net2str(LNET_NIDNET(ni->ni_nid)));
+ return -EPROTO;
+ }
+
+ rnet = lnet_find_rnet_locked(LNET_NIDNET(src_nid));
+ if (rnet) {
+ struct lnet_peer *gw = NULL;
+ struct lnet_peer_ni *lpni = NULL;
+ struct lnet_route *route;
+
+ list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
+ found = false;
+ gw = route->lr_gateway;
+ if (route->lr_lnet != net->net_id)
+ continue;
+ /*
+ * if the nid is one of the gateway's NIDs
+ * then this is a valid gateway
+ */
+ while ((lpni = lnet_get_next_peer_ni_locked(gw,
+ NULL, lpni)) != NULL) {
+ if (lpni->lpni_nid == from_nid) {
+ found = true;
+ break;
+ }
+ }
+ }
+ }
+ lnet_net_unlock(cpt);
+ if (!found) {
+ /* we would not use from_nid to route a message to
+ * src_nid
+ * => asymmetric routing detected but forbidden
+ */
+ CERROR("%s, src %s: Dropping asymmetrical route %s\n",
+ libcfs_nid2str(from_nid),
+ libcfs_nid2str(src_nid), lnet_msgtyp2str(type));
+ goto drop;
+ }
+ }
msg = lnet_msg_alloc();
if (msg == NULL) {
return 0;
goto drop;
}
+
+ if (the_lnet.ln_routing)
+ lpni->lpni_last_alive = ktime_get_seconds();
+
msg->msg_rxpeer = lpni;
msg->msg_rxni = ni;
lnet_ni_addref_locked(ni, cpt);
/* Multi-Rail: Primary NID of source. */
msg->msg_initiator = lnet_peer_primary_nid_locked(src_nid);
- if (lnet_isrouter(msg->msg_rxpeer)) {
- lnet_peer_set_alive(msg->msg_rxpeer);
- if (avoid_asym_router_failure &&
- LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
- /* received a remote message from router, update
- * remote NI status on this router.
- * NB: multi-hop routed message will be ignored.
- */
- lnet_router_ni_update_locked(msg->msg_rxpeer,
- LNET_NIDNET(src_nid));
- }
- }
+ /*
+ * mark the status of this lpni as UP since we received a message
+ * from it. The ping response reports back the ns_status which is
+ * marked on the remote as up or down and we cache it here.
+ */
+ msg->msg_rxpeer->lpni_ns_status = LNET_NI_STATUS_UP;
lnet_msg_commit(msg, cpt);
struct lnet_libmd *md, struct lnet_handle_md mdh)
{
s64 timeout_ns;
+ bool new_entry = true;
+ struct lnet_rsp_tracker *local_rspt;
/*
* MD has a refcount taken by message so it's not going away.
* added to the list.
*/
- /* debug code */
- LASSERT(md->md_rspt_ptr == NULL);
-
- /* we'll use that same event in case we never get a response */
- rspt->rspt_mdh = mdh;
- rspt->rspt_cpt = cpt;
- timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
- rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns);
-
lnet_res_lock(cpt);
- /* store the rspt so we can access it when we get the REPLY */
- md->md_rspt_ptr = rspt;
- lnet_res_unlock(cpt);
+ local_rspt = md->md_rspt_ptr;
+ timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
+ if (local_rspt != NULL) {
+ /*
+ * we already have an rspt attached to the md, so we'll
+ * update the deadline on that one.
+ */
+ LIBCFS_FREE(rspt, sizeof(*rspt));
+ new_entry = false;
+ } else {
+ /* new md */
+ rspt->rspt_mdh = mdh;
+ rspt->rspt_cpt = cpt;
+ /* store the rspt so we can access it when we get the REPLY */
+ md->md_rspt_ptr = rspt;
+ local_rspt = rspt;
+ }
+ local_rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns);
/*
* add to the list of tracked responses. It's added to tail of the
* list in order to expire all the older entries first.
*/
lnet_net_lock(cpt);
- list_add_tail(&rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]);
+ if (!new_entry && !list_empty(&local_rspt->rspt_on_list))
+ list_del_init(&local_rspt->rspt_on_list);
+ list_add_tail(&local_rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]);
lnet_net_unlock(cpt);
+ lnet_res_unlock(cpt);
}
/**
if (ack == LNET_ACK_REQ)
lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
- rc = lnet_send(self, msg, LNET_NID_ANY);
+ if (CFS_FAIL_CHECK_ORSET(CFS_FAIL_PTLRPC_OST_BULK_CB2,
+ CFS_FAIL_ONCE))
+ rc = -EIO;
+ else
+ rc = lnet_send(self, msg, LNET_NID_ANY);
+
if (rc != 0) {
CNETERR("Error sending PUT to %s: %d\n",
libcfs_id2str(target), rc);
msg->msg_no_resend = true;
- lnet_detach_rsp_tracker(msg->msg_md, cpt);
lnet_finalize(msg, rc);
}
lnet_net_lock(cpt);
lnet_incr_stats(&ni->ni_stats, LNET_MSG_GET, LNET_STATS_TYPE_DROP);
- the_lnet.ln_counters[cpt]->drop_count++;
- the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
+ the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length +=
+ getmd->md_length;
lnet_net_unlock(cpt);
if (msg != NULL)
CNETERR("Error sending GET to %s: %d\n",
libcfs_id2str(target), rc);
msg->msg_no_resend = true;
- lnet_detach_rsp_tracker(msg->msg_md, cpt);
lnet_finalize(msg, rc);
}
LASSERT(shortest != NULL);
hops = shortest_hops;
if (srcnidp != NULL) {
- ni = lnet_get_next_ni_locked(
- shortest->lr_gateway->lpni_net,
- NULL);
+ struct lnet_net *net;
+ net = lnet_get_net_locked(shortest->lr_lnet);
+ LASSERT(net);
+ ni = lnet_get_next_ni_locked(net, NULL);
*srcnidp = ni->ni_nid;
}
if (orderp != NULL)