+static int
+lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
+ struct lnet_msg *msg, lnet_nid_t rtr_nid, bool *lo_sent)
+{
+ struct lnet_ni *best_ni = NULL;
+ struct lnet_peer_ni *best_lpni = NULL;
+ struct lnet_peer_ni *net_gw = NULL;
+ struct lnet_peer_ni *best_gw = NULL;
+ struct lnet_peer_ni *lpni;
+ struct lnet_peer *peer = NULL;
+ struct lnet_peer_net *peer_net;
+ struct lnet_net *local_net;
+ struct lnet_ni *ni = NULL;
+ int cpt, cpt2, rc;
+ bool routing = false;
+ bool ni_is_pref = false;
+ bool preferred = false;
+ int best_credits = 0;
+ __u32 seq, seq2;
+ int best_lpni_credits = INT_MIN;
+ int md_cpt = 0;
+ int shortest_distance = INT_MAX;
+ int distance = 0;
+ bool found_ir = false;
+
+again:
+ /*
+ * get an initial CPT to use for locking. The idea here is not to
+ * serialize the calls to select_pathway, so that as many
+ * operations can run concurrently as possible. To do that we use
+ * the CPT where this call is being executed. Later on when we
+ * determine the CPT to use in lnet_message_commit, we switch the
+ * lock and check if there was any configuration changes, if none,
+ * then we proceed, if there is, then we'll need to update the cpt
+ * and redo the operation.
+ */
+ cpt = lnet_net_lock_current();
+
+ best_gw = NULL;
+ routing = false;
+ local_net = NULL;
+ best_ni = NULL;
+ shortest_distance = INT_MAX;
+ found_ir = false;
+
+ if (the_lnet.ln_shutdown) {
+ lnet_net_unlock(cpt);
+ return -ESHUTDOWN;
+ }
+
+ if (msg->msg_md != NULL)
+ /* get the cpt of the MD, used during NUMA based selection */
+ md_cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
+ else
+ md_cpt = CFS_CPT_ANY;
+
+ /*
+ * initialize the variables which could be reused if we go to
+ * again
+ */
+ lpni = NULL;
+ seq = lnet_get_dlc_seq_locked();
+
+ peer = lnet_find_or_create_peer_locked(dst_nid, cpt);
+ if (IS_ERR(peer)) {
+ lnet_net_unlock(cpt);
+ return PTR_ERR(peer);
+ }
+
+ /* If peer is not healthy then can not send anything to it */
+ if (!lnet_is_peer_healthy_locked(peer)) {
+ lnet_net_unlock(cpt);
+ return -EHOSTUNREACH;
+ }
+
+ if (!peer->lp_multi_rail && lnet_get_num_peer_nis(peer) > 1) {
+ CERROR("peer %s is declared to be non MR capable, "
+ "yet configured with more than one NID\n",
+ libcfs_nid2str(dst_nid));
+ return -EINVAL;
+ }
+
+ /*
+ * STEP 1: first jab at determineing best_ni
+ * if src_nid is explicitly specified, then best_ni is already
+ * pre-determiend for us. Otherwise we need to select the best
+ * one to use later on
+ */
+ if (src_nid != LNET_NID_ANY) {
+ best_ni = lnet_nid2ni_locked(src_nid, cpt);
+ if (!best_ni) {
+ lnet_net_unlock(cpt);
+ LCONSOLE_WARN("Can't send to %s: src %s is not a "
+ "local nid\n", libcfs_nid2str(dst_nid),
+ libcfs_nid2str(src_nid));
+ return -EINVAL;
+ }
+
+ if (best_ni->ni_net->net_id != LNET_NIDNET(dst_nid)) {
+ lnet_net_unlock(cpt);
+ LCONSOLE_WARN("No route to %s via from %s\n",
+ libcfs_nid2str(dst_nid),
+ libcfs_nid2str(src_nid));
+ return -EINVAL;
+ }
+ }
+
+ if (best_ni)
+ goto pick_peer;
+
+ /*
+ * Decide whether we need to route to peer_ni.
+ * Get the local net that I need to be on to be able to directly
+ * send to that peer.
+ *
+ * a. Find the peer which the dst_nid belongs to.
+ * b. Iterate through each of the peer_nets/nis to decide
+ * the best peer/local_ni pair to use
+ */
+ list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_on_peer_list) {
+ if (!lnet_is_peer_net_healthy_locked(peer_net))
+ continue;
+
+ local_net = lnet_get_net_locked(peer_net->lpn_net_id);
+ if (!local_net) {
+ /*
+ * go through each peer_ni on that peer_net and
+ * determine the best possible gw to go through
+ */
+ list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
+ lpni_on_peer_net_list) {
+ net_gw = lnet_find_route_locked(NULL,
+ lpni->lpni_nid,
+ rtr_nid);
+
+ /*
+ * if no route is found for that network then
+ * move onto the next peer_ni in the peer
+ */
+ if (!net_gw)
+ continue;
+
+ if (!best_gw) {
+ best_gw = net_gw;
+ best_lpni = lpni;
+ } else {
+ rc = lnet_compare_peers(net_gw,
+ best_gw);
+ if (rc > 0) {
+ best_gw = net_gw;
+ best_lpni = lpni;
+ }
+ }
+ }
+
+ if (!best_gw)
+ continue;
+
+ local_net = lnet_get_net_locked
+ (LNET_NIDNET(best_gw->lpni_nid));
+ routing = true;
+ } else {
+ routing = false;
+ best_gw = NULL;
+ }
+
+ /* no routable net found go on to a different net */
+ if (!local_net)
+ continue;
+
+ /*
+ * Iterate through the NIs in this local Net and select
+ * the NI to send from. The selection is determined by
+ * these 3 criterion in the following priority:
+ * 1. NUMA
+ * 2. NI available credits
+ * 3. Round Robin
+ */
+ while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
+ int ni_credits;
+
+ if (!lnet_is_ni_healthy_locked(ni))
+ continue;
+
+ ni_credits = atomic_read(&ni->ni_tx_credits);
+
+ /*
+ * calculate the distance from the cpt on which
+ * the message memory is allocated to the CPT of
+ * the NI's physical device
+ */
+ distance = cfs_cpt_distance(lnet_cpt_table(),
+ md_cpt,
+ ni->dev_cpt);
+
+ /*
+ * If we already have a closer NI within the NUMA
+ * range provided, then there is no need to
+ * consider the current NI. Move on to the next
+ * one.
+ */
+ if (distance > shortest_distance &&
+ distance > lnet_get_numa_range())
+ continue;
+
+ if (distance < shortest_distance &&
+ distance > lnet_get_numa_range()) {
+ /*
+ * The current NI is the closest one that we
+ * have found, even though it's not in the
+ * NUMA range specified. This occurs if
+ * the NUMA range is less than the least
+ * of the distances in the system.
+ * In effect NUMA range consideration is
+ * turned off.
+ */
+ shortest_distance = distance;
+ } else if ((distance <= shortest_distance &&
+ distance < lnet_get_numa_range()) ||
+ distance == shortest_distance) {
+ /*
+ * This NI is either within range or it's
+ * equidistant. In both of these cases we
+ * would want to select the NI based on
+ * its available credits first, and then
+ * via Round Robin.
+ */
+ if (distance <= shortest_distance &&
+ distance < lnet_get_numa_range()) {
+ /*
+ * If this is the first NI that's
+ * within range, then set the
+ * shortest distance to the range
+ * specified by the user. In
+ * effect we're saying that all
+ * NIs that fall within this NUMA
+ * range shall be dealt with as
+ * having equal NUMA weight. Which
+ * will mean that we should select
+ * through that set by their
+ * available credits first
+ * followed by Round Robin.
+ *
+ * And since this is the first NI
+ * in the range, let's just set it
+ * as our best_ni for now. The
+ * following NIs found in the
+ * range will be dealt with as
+ * mentioned previously.
+ */
+ shortest_distance = lnet_get_numa_range();
+ if (!found_ir) {
+ found_ir = true;
+ goto set_ni;
+ }
+ }
+ /*
+ * This NI is NUMA equidistant let's
+ * select using credits followed by Round
+ * Robin.
+ */
+ if (ni_credits < best_credits) {
+ continue;
+ } else if (ni_credits == best_credits) {
+ if (best_ni) {
+ if (best_ni->ni_seq <= ni->ni_seq)
+ continue;
+ }
+ }
+ }
+set_ni:
+ best_ni = ni;
+ best_credits = ni_credits;
+ }
+ }
+ /*
+ * if the peer is not MR capable, then we should always send to it
+ * using the first NI in the NET we determined.
+ */
+ if (!peer->lp_multi_rail && local_net != NULL)
+ best_ni = lnet_net2ni_locked(local_net->net_id, cpt);
+
+ if (!best_ni) {
+ lnet_net_unlock(cpt);
+ LCONSOLE_WARN("No local ni found to send from to %s\n",
+ libcfs_nid2str(dst_nid));
+ return -EINVAL;
+ }
+
+ /*
+ * Now that we selected the NI to use increment its sequence
+ * number so the Round Robin algorithm will detect that it has
+ * been used and pick the next NI.
+ */
+ best_ni->ni_seq++;
+
+ if (routing)
+ goto send;
+
+pick_peer:
+ if (best_ni == the_lnet.ln_loni) {
+ /* No send credit hassles with LOLND */
+ lnet_ni_addref_locked(best_ni, cpt);
+ msg->msg_hdr.dest_nid = cpu_to_le64(best_ni->ni_nid);
+ if (!msg->msg_routing)
+ msg->msg_hdr.src_nid = cpu_to_le64(best_ni->ni_nid);
+ msg->msg_target.nid = best_ni->ni_nid;
+ lnet_msg_commit(msg, cpt);
+
+ lnet_net_unlock(cpt);
+ msg->msg_txni = best_ni;
+ lnet_ni_send(best_ni, msg);
+
+ *lo_sent = true;
+ return 0;
+ }
+
+ lpni = NULL;
+
+ if (msg->msg_type == LNET_MSG_REPLY ||
+ msg->msg_type == LNET_MSG_ACK) {
+ /*
+ * for replies we want to respond on the same peer_ni we
+ * received the message on if possible. If not, then pick
+ * a peer_ni to send to
+ */
+ best_lpni = lnet_find_peer_ni_locked(dst_nid);
+ if (best_lpni) {
+ lnet_peer_ni_decref_locked(best_lpni);
+ goto send;
+ } else {
+ CDEBUG(D_NET, "unable to send msg_type %d to "
+ "originating %s\n", msg->msg_type,
+ libcfs_nid2str(dst_nid));
+ }
+ }
+
+ peer_net = lnet_peer_get_net_locked(peer,
+ best_ni->ni_net->net_id);
+ /*
+ * peer_net is not available or the src_nid is explicitly defined
+ * and the peer_net for that src_nid is unhealthy. find a route to
+ * the destination nid.
+ */
+ if (!peer_net ||
+ (src_nid != LNET_NID_ANY &&
+ !lnet_is_peer_net_healthy_locked(peer_net))) {
+ best_gw = lnet_find_route_locked(best_ni->ni_net,
+ dst_nid,
+ rtr_nid);
+ /*
+ * if no route is found for that network then
+ * move onto the next peer_ni in the peer
+ */
+ if (!best_gw) {
+ lnet_net_unlock(cpt);
+ LCONSOLE_WARN("No route to peer from %s\n",
+ libcfs_nid2str(best_ni->ni_nid));
+ return -EHOSTUNREACH;
+ }
+
+ CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
+ libcfs_nid2str(dst_nid),
+ libcfs_nid2str(best_gw->lpni_nid),
+ lnet_msgtyp2str(msg->msg_type), msg->msg_len);
+
+ best_lpni = lnet_find_peer_ni_locked(dst_nid);
+ LASSERT(best_lpni != NULL);
+ lnet_peer_ni_decref_locked(best_lpni);
+
+ routing = true;
+
+ goto send;
+ } else if (!lnet_is_peer_net_healthy_locked(peer_net)) {
+ /*
+ * this peer_net is unhealthy but we still have an opportunity
+ * to find another peer_net that we can use
+ */
+ __u32 net_id = peer_net->lpn_net_id;
+ lnet_net_unlock(cpt);
+ if (!best_lpni)
+ LCONSOLE_WARN("peer net %s unhealthy\n",
+ libcfs_net2str(net_id));
+ goto again;
+ }
+
+ best_lpni = NULL;
+ while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
+ /*
+ * if this peer ni is not healthy just skip it, no point in
+ * examining it further
+ */
+ if (!lnet_is_peer_ni_healthy_locked(lpni))
+ continue;
+ ni_is_pref = lnet_peer_is_ni_pref_locked(lpni, best_ni);
+
+ /* if this is a preferred peer use it */
+ if (!preferred && ni_is_pref) {
+ preferred = true;
+ } else if (preferred && !ni_is_pref) {
+ /*
+ * this is not the preferred peer so let's ignore
+ * it.
+ */
+ continue;
+ } if (lpni->lpni_txcredits < best_lpni_credits)
+ /*
+ * We already have a peer that has more credits
+ * available than this one. No need to consider
+ * this peer further.
+ */
+ continue;
+ else if (lpni->lpni_txcredits == best_lpni_credits) {
+ /*
+ * The best peer found so far and the current peer
+ * have the same number of available credits let's
+ * make sure to select between them using Round
+ * Robin
+ */
+ if (best_lpni) {
+ if (best_lpni->lpni_seq <= lpni->lpni_seq)
+ continue;
+ }
+ }
+
+ best_lpni = lpni;
+ best_lpni_credits = lpni->lpni_txcredits;
+ }
+
+ /*
+ * Increment sequence number of the peer selected so that we can
+ * pick the next one in Round Robin.
+ */
+ best_lpni->lpni_seq++;
+
+ /* if we still can't find a peer ni then we can't reach it */
+ if (!best_lpni) {
+ __u32 net_id = peer_net->lpn_net_id;
+ lnet_net_unlock(cpt);
+ LCONSOLE_WARN("no peer_ni found on peer net %s\n",
+ libcfs_net2str(net_id));
+ return -EHOSTUNREACH;
+ }
+
+send:
+ /*
+ * Use lnet_cpt_of_nid() to determine the CPT used to commit the
+ * message. This ensures that we get a CPT that is correct for
+ * the NI when the NI has been restricted to a subset of all CPTs.
+ * If the selected CPT differs from the one currently locked, we
+ * must unlock and relock the lnet_net_lock(), and then check whether
+ * the configuration has changed. We don't have a hold on the best_ni
+ * or best_peer_ni yet, and they may have vanished.
+ */
+ cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
+ if (cpt != cpt2) {
+ lnet_net_unlock(cpt);
+ cpt = cpt2;
+ lnet_net_lock(cpt);
+ seq2 = lnet_get_dlc_seq_locked();
+ if (seq2 != seq) {
+ lnet_net_unlock(cpt);
+ goto again;
+ }
+ }
+
+ /*
+ * store the best_lpni in the message right away to avoid having
+ * to do the same operation under different conditions
+ */
+ msg->msg_txpeer = (routing) ? best_gw : best_lpni;
+ msg->msg_txni = best_ni;
+ /*
+ * grab a reference for the best_ni since now it's in use in this
+ * send. the reference will need to be dropped when the message is
+ * finished in lnet_finalize()
+ */
+ lnet_ni_addref_locked(msg->msg_txni, cpt);
+ lnet_peer_ni_addref_locked(msg->msg_txpeer);
+
+ /*
+ * set the destination nid in the message here because it's
+ * possible that we'd be sending to a different nid than the one
+ * originaly given.
+ */
+ msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
+
+ /*
+ * Always set the target.nid to the best peer picked. Either the
+ * nid will be one of the preconfigured NIDs, or the same NID as
+ * what was originaly set in the target or it will be the NID of
+ * a router if this message should be routed
+ */
+ msg->msg_target.nid = msg->msg_txpeer->lpni_nid;
+
+ /*
+ * lnet_msg_commit assigns the correct cpt to the message, which
+ * is used to decrement the correct refcount on the ni when it's
+ * time to return the credits
+ */
+ lnet_msg_commit(msg, cpt);
+
+ /*
+ * If we are routing the message then we don't need to overwrite
+ * the src_nid since it would've been set at the origin. Otherwise
+ * we are the originator so we need to set it.
+ */
+ if (!msg->msg_routing)
+ msg->msg_hdr.src_nid = cpu_to_le64(msg->msg_txni->ni_nid);
+
+ if (routing) {
+ msg->msg_target_is_router = 1;
+ msg->msg_target.pid = LNET_PID_LUSTRE;
+ }
+
+ rc = lnet_post_send_locked(msg, 0);
+
+ lnet_net_unlock(cpt);
+
+ return rc;
+}