X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lnet%2Flnet%2Flib-move.c;h=a89fa41c7c6fa4f6869c3dcec988753ee951f236;hb=5c17777d97bd20cde68771c6186320b5eae90e62;hp=9dc65653535f65d74c35e34caece4364e49f0c6b;hpb=908ce58a765cb90692b840f8902414836d378206;p=fs%2Flustre-release.git diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index 9dc6565..a89fa41 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -23,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2016, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -36,6 +36,8 @@ #define DEBUG_SUBSYSTEM S_LNET +#include + #include #include #include @@ -44,6 +46,119 @@ static int local_nid_dist_zero = 1; module_param(local_nid_dist_zero, int, 0444); MODULE_PARM_DESC(local_nid_dist_zero, "Reserved"); +struct lnet_send_data { + struct lnet_ni *sd_best_ni; + struct lnet_peer_ni *sd_best_lpni; + struct lnet_peer_ni *sd_final_dst_lpni; + struct lnet_peer *sd_peer; + struct lnet_peer *sd_gw_peer; + struct lnet_peer_ni *sd_gw_lpni; + struct lnet_peer_net *sd_peer_net; + struct lnet_msg *sd_msg; + lnet_nid_t sd_dst_nid; + lnet_nid_t sd_src_nid; + lnet_nid_t sd_rtr_nid; + int sd_cpt; + int sd_md_cpt; + __u32 sd_send_case; +}; + +static inline struct lnet_comm_count * +get_stats_counts(struct lnet_element_stats *stats, + enum lnet_stats_type stats_type) +{ + switch (stats_type) { + case LNET_STATS_TYPE_SEND: + return &stats->el_send_stats; + case LNET_STATS_TYPE_RECV: + return &stats->el_recv_stats; + case LNET_STATS_TYPE_DROP: + return &stats->el_drop_stats; + default: + CERROR("Unknown stats type\n"); + } + + return NULL; +} + +void lnet_incr_stats(struct lnet_element_stats *stats, + enum lnet_msg_type msg_type, + enum lnet_stats_type stats_type) +{ + struct lnet_comm_count *counts = get_stats_counts(stats, stats_type); + if (!counts) + return; + + switch (msg_type) { + case LNET_MSG_ACK: + atomic_inc(&counts->co_ack_count); + break; + case LNET_MSG_PUT: + atomic_inc(&counts->co_put_count); + break; + case LNET_MSG_GET: + atomic_inc(&counts->co_get_count); + break; + case LNET_MSG_REPLY: + atomic_inc(&counts->co_reply_count); + break; + case LNET_MSG_HELLO: + atomic_inc(&counts->co_hello_count); + break; + default: + CERROR("There is a BUG in the code. Unknown message type\n"); + break; + } +} + +__u32 lnet_sum_stats(struct lnet_element_stats *stats, + enum lnet_stats_type stats_type) +{ + struct lnet_comm_count *counts = get_stats_counts(stats, stats_type); + if (!counts) + return 0; + + return (atomic_read(&counts->co_ack_count) + + atomic_read(&counts->co_put_count) + + atomic_read(&counts->co_get_count) + + atomic_read(&counts->co_reply_count) + + atomic_read(&counts->co_hello_count)); +} + +static inline void assign_stats(struct lnet_ioctl_comm_count *msg_stats, + struct lnet_comm_count *counts) +{ + msg_stats->ico_get_count = atomic_read(&counts->co_get_count); + msg_stats->ico_put_count = atomic_read(&counts->co_put_count); + msg_stats->ico_reply_count = atomic_read(&counts->co_reply_count); + msg_stats->ico_ack_count = atomic_read(&counts->co_ack_count); + msg_stats->ico_hello_count = atomic_read(&counts->co_hello_count); +} + +void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats, + struct lnet_element_stats *stats) +{ + struct lnet_comm_count *counts; + + LASSERT(msg_stats); + LASSERT(stats); + + counts = get_stats_counts(stats, LNET_STATS_TYPE_SEND); + if (!counts) + return; + assign_stats(&msg_stats->im_send_stats, counts); + + counts = get_stats_counts(stats, LNET_STATS_TYPE_RECV); + if (!counts) + return; + assign_stats(&msg_stats->im_recv_stats, counts); + + counts = get_stats_counts(stats, LNET_STATS_TYPE_DROP); + if (!counts) + return; + assign_stats(&msg_stats->im_drop_stats, counts); +} + int lnet_fail_nid(lnet_nid_t nid, unsigned int threshold) { @@ -630,6 +745,8 @@ lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target, memset (&msg->msg_hdr, 0, sizeof (msg->msg_hdr)); msg->msg_hdr.type = cpu_to_le32(type); + /* dest_nid will be overwritten by lnet_select_pathway() */ + msg->msg_hdr.dest_nid = cpu_to_le64(target.nid); msg->msg_hdr.dest_pid = cpu_to_le32(target.pid); /* src_nid will be set later */ msg->msg_hdr.src_pid = cpu_to_le32(the_lnet.ln_pid); @@ -640,15 +757,17 @@ static void lnet_ni_send(struct lnet_ni *ni, struct lnet_msg *msg) { void *priv = msg->msg_private; - int rc; + int rc; LASSERT (!in_interrupt ()); LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND || (msg->msg_txcredit && msg->msg_peertxcredit)); rc = (ni->ni_net->net_lnd->lnd_send)(ni, priv, msg); - if (rc < 0) + if (rc < 0) { + msg->msg_no_resend = true; lnet_finalize(msg, rc); + } } static int @@ -686,7 +805,7 @@ lnet_ni_eager_recv(struct lnet_ni *ni, struct lnet_msg *msg) static void lnet_ni_query_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp) { - cfs_time_t last_alive = 0; + time64_t last_alive = 0; int cpt = lnet_cpt_of_nid_locked(lp->lpni_nid, ni); LASSERT(lnet_peer_aliveness_enabled(lp)); @@ -696,7 +815,7 @@ lnet_ni_query_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp) (ni->ni_net->net_lnd->lnd_query)(ni, lp->lpni_nid, &last_alive); lnet_net_lock(cpt); - lp->lpni_last_query = cfs_time_current(); + lp->lpni_last_query = ktime_get_seconds(); if (last_alive != 0) /* NI has updated timestamp */ lp->lpni_last_alive = last_alive; @@ -704,10 +823,10 @@ lnet_ni_query_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp) /* NB: always called with lnet_net_lock held */ static inline int -lnet_peer_is_alive (struct lnet_peer_ni *lp, cfs_time_t now) +lnet_peer_is_alive(struct lnet_peer_ni *lp, time64_t now) { - int alive; - cfs_time_t deadline; + int alive; + time64_t deadline; LASSERT (lnet_peer_aliveness_enabled(lp)); @@ -717,16 +836,14 @@ lnet_peer_is_alive (struct lnet_peer_ni *lp, cfs_time_t now) */ spin_lock(&lp->lpni_lock); if (!lp->lpni_alive && lp->lpni_alive_count > 0 && - cfs_time_aftereq(lp->lpni_timestamp, lp->lpni_last_alive)) { + lp->lpni_timestamp >= lp->lpni_last_alive) { spin_unlock(&lp->lpni_lock); return 0; } - deadline = - cfs_time_add(lp->lpni_last_alive, - cfs_time_seconds(lp->lpni_net->net_tunables. - lct_peer_timeout)); - alive = cfs_time_after(deadline, now); + deadline = lp->lpni_last_alive + + lp->lpni_net->net_tunables.lct_peer_timeout; + alive = deadline > now; /* * Update obsolete lp_alive except for routers assumed to be dead @@ -748,9 +865,9 @@ lnet_peer_is_alive (struct lnet_peer_ni *lp, cfs_time_t now) /* NB: returns 1 when alive, 0 when dead, negative when error; * may drop the lnet_net_lock */ static int -lnet_peer_alive_locked (struct lnet_ni *ni, struct lnet_peer_ni *lp) +lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp) { - cfs_time_t now = cfs_time_current(); + time64_t now = ktime_get_seconds(); if (!lnet_peer_aliveness_enabled(lp)) return -ENODEV; @@ -764,17 +881,16 @@ lnet_peer_alive_locked (struct lnet_ni *ni, struct lnet_peer_ni *lp) */ if (lp->lpni_last_query != 0) { static const int lnet_queryinterval = 1; + time64_t next_query; - cfs_time_t next_query = - cfs_time_add(lp->lpni_last_query, - cfs_time_seconds(lnet_queryinterval)); + next_query = lp->lpni_last_query + lnet_queryinterval; - if (cfs_time_before(now, next_query)) { + if (now < next_query) { if (lp->lpni_alive) CWARN("Unexpected aliveness of peer %s: " - "%d < %d (%d/%d)\n", + "%lld < %lld (%d/%d)\n", libcfs_nid2str(lp->lpni_nid), - (int)now, (int)next_query, + now, next_query, lnet_queryinterval, lp->lpni_net->net_tunables.lct_peer_timeout); return 0; @@ -822,14 +938,20 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send) the_lnet.ln_counters[cpt]->drop_length += msg->msg_len; lnet_net_unlock(cpt); if (msg->msg_txpeer) - atomic_inc(&msg->msg_txpeer->lpni_stats.drop_count); + lnet_incr_stats(&msg->msg_txpeer->lpni_stats, + msg->msg_type, + LNET_STATS_TYPE_DROP); if (msg->msg_txni) - atomic_inc(&msg->msg_txni->ni_stats.drop_count); + lnet_incr_stats(&msg->msg_txni->ni_stats, + msg->msg_type, + LNET_STATS_TYPE_DROP); CNETERR("Dropping message for %s: peer not alive\n", libcfs_id2str(msg->msg_target)); - if (do_send) + if (do_send) { + msg->msg_health_status = LNET_MSG_STATUS_LOCAL_DROPPED; lnet_finalize(msg, -EHOSTUNREACH); + } lnet_net_lock(cpt); return -EHOSTUNREACH; @@ -842,8 +964,10 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send) CNETERR("Aborting message for %s: LNetM[DE]Unlink() already " "called on the MD/ME.\n", libcfs_id2str(msg->msg_target)); - if (do_send) + if (do_send) { + msg->msg_no_resend = true; lnet_finalize(msg, -ECANCELED); + } lnet_net_lock(cpt); return -ECANCELED; @@ -888,6 +1012,9 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send) } } + /* unset the tx_delay flag as we're going to send it now */ + msg->msg_tx_delayed = 0; + if (do_send) { lnet_net_unlock(cpt); lnet_ni_send(ni, msg); @@ -983,6 +1110,9 @@ lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv) msg->msg_niov = rbp->rbp_npages; msg->msg_kiov = &rb->rb_kiov[0]; + /* unset the msg-rx_delayed flag since we're receiving the message */ + msg->msg_rx_delayed = 0; + if (do_recv) { int cpt = msg->msg_rx_cpt; @@ -1039,6 +1169,8 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg) txpeer->lpni_txcredits++; if (txpeer->lpni_txcredits <= 0) { + int msg2_cpt; + msg2 = list_entry(txpeer->lpni_txq.next, struct lnet_msg, msg_list); list_del(&msg2->msg_list); @@ -1047,13 +1179,26 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg) LASSERT(msg2->msg_txpeer == txpeer); LASSERT(msg2->msg_tx_delayed); - if (msg2->msg_tx_cpt != msg->msg_tx_cpt) { + msg2_cpt = msg2->msg_tx_cpt; + + /* + * The msg_cpt can be different from the msg2_cpt + * so we need to make sure we lock the correct cpt + * for msg2. + * Once we call lnet_post_send_locked() it is no + * longer safe to access msg2, since it could've + * been freed by lnet_finalize(), but we still + * need to relock the correct cpt, so we cache the + * msg2_cpt for the purpose of the check that + * follows the call to lnet_pose_send_locked(). + */ + if (msg2_cpt != msg->msg_tx_cpt) { lnet_net_unlock(msg->msg_tx_cpt); - lnet_net_lock(msg2->msg_tx_cpt); + lnet_net_lock(msg2_cpt); } (void) lnet_post_send_locked(msg2, 1); - if (msg2->msg_tx_cpt != msg->msg_tx_cpt) { - lnet_net_unlock(msg2->msg_tx_cpt); + if (msg2_cpt != msg->msg_tx_cpt) { + lnet_net_unlock(msg2_cpt); lnet_net_lock(msg->msg_tx_cpt); } } else { @@ -1067,15 +1212,6 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg) } if (txpeer != NULL) { - /* - * TODO: - * Once the patch for the health comes in we need to set - * the health of the peer ni to bad when we fail to send - * a message. - * int status = msg->msg_ev.status; - * if (status != 0) - * lnet_set_peer_ni_health_locked(txpeer, false) - */ msg->msg_txpeer = NULL; lnet_peer_ni_decref_locked(txpeer); } @@ -1107,6 +1243,7 @@ lnet_drop_routed_msgs_locked(struct list_head *list, int cpt) lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL, 0, 0, 0, msg->msg_hdr.payload_length); list_del_init(&msg->msg_list); + msg->msg_no_resend = true; lnet_finalize(msg, -ECANCELED); } @@ -1253,7 +1390,7 @@ lnet_compare_routes(struct lnet_route *r1, struct lnet_route *r2) } static struct lnet_peer_ni * -lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target, +lnet_find_route_locked(struct lnet_net *net, __u32 remote_net, lnet_nid_t rtr_nid) { struct lnet_remotenet *rnet; @@ -1267,7 +1404,7 @@ lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target, /* If @rtr_nid is not LNET_NID_ANY, return the gateway with * rtr_nid nid, otherwise find the best gateway I can use */ - rnet = lnet_find_rnet_locked(LNET_NIDNET(target)); + rnet = lnet_find_rnet_locked(remote_net); if (rnet == NULL) return NULL; @@ -1312,30 +1449,42 @@ lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target, } static struct lnet_ni * -lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *cur_ni, +lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *best_ni, + struct lnet_peer *peer, struct lnet_peer_net *peer_net, int md_cpt) { - struct lnet_ni *ni = NULL, *best_ni = cur_ni; + struct lnet_ni *ni = NULL; unsigned int shortest_distance; int best_credits; + int best_healthv; + + /* + * If there is no peer_ni that we can send to on this network, + * then there is no point in looking for a new best_ni here. + */ + if (!lnet_get_next_peer_ni_locked(peer, peer_net, NULL)) + return best_ni; if (best_ni == NULL) { shortest_distance = UINT_MAX; best_credits = INT_MIN; + best_healthv = 0; } else { shortest_distance = cfs_cpt_distance(lnet_cpt_table(), md_cpt, best_ni->ni_dev_cpt); best_credits = atomic_read(&best_ni->ni_tx_credits); + best_healthv = atomic_read(&best_ni->ni_healthv); } while ((ni = lnet_get_next_ni_locked(local_net, ni))) { unsigned int distance; int ni_credits; - - if (!lnet_is_ni_healthy_locked(ni)) - continue; + int ni_healthv; + int ni_fatal; ni_credits = atomic_read(&ni->ni_tx_credits); + ni_healthv = atomic_read(&ni->ni_healthv); + ni_fatal = atomic_read(&ni->ni_fatal_error_on); /* * calculate the distance from the CPT on which @@ -1346,6 +1495,12 @@ lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *cur_ni, md_cpt, ni->ni_dev_cpt); + CDEBUG(D_NET, "compare ni %s [c:%d, d:%d, s:%d] with best_ni %s [c:%d, d:%d, s:%d]\n", + libcfs_nid2str(ni->ni_nid), ni_credits, distance, + ni->ni_seq, (best_ni) ? libcfs_nid2str(best_ni->ni_nid) + : "not seleced", best_credits, shortest_distance, + (best_ni) ? best_ni->ni_seq : 0); + /* * All distances smaller than the NUMA range * are treated equally. @@ -1354,382 +1509,209 @@ lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *cur_ni, distance = lnet_numa_range; /* - * Select on shorter distance, then available + * Select on health, shorter distance, available * credits, then round-robin. */ - if (distance > shortest_distance) { + if (ni_fatal) { + continue; + } else if (ni_healthv < best_healthv) { + continue; + } else if (ni_healthv > best_healthv) { + best_healthv = ni_healthv; + /* + * If we're going to prefer this ni because it's + * the healthiest, then we should set the + * shortest_distance in the algorithm in case + * there are multiple NIs with the same health but + * different distances. + */ + if (distance < shortest_distance) + shortest_distance = distance; + } else if (distance > shortest_distance) { continue; } else if (distance < shortest_distance) { shortest_distance = distance; } else if (ni_credits < best_credits) { continue; } else if (ni_credits == best_credits) { - if (best_ni && (best_ni)->ni_seq <= ni->ni_seq) + if (best_ni && best_ni->ni_seq <= ni->ni_seq) continue; } best_ni = ni; best_credits = ni_credits; } + CDEBUG(D_NET, "selected best_ni %s\n", + (best_ni) ? libcfs_nid2str(best_ni->ni_nid) : "no selection"); + return best_ni; } -static int -lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid, - struct lnet_msg *msg, lnet_nid_t rtr_nid) +/* + * Traffic to the LNET_RESERVED_PORTAL may not trigger peer discovery, + * because such traffic is required to perform discovery. We therefore + * exclude all GET and PUT on that portal. We also exclude all ACK and + * REPLY traffic, but that is because the portal is not tracked in the + * message structure for these message types. We could restrict this + * further by also checking for LNET_PROTO_PING_MATCHBITS. + */ +static bool +lnet_msg_discovery(struct lnet_msg *msg) { - struct lnet_ni *best_ni; - struct lnet_peer_ni *best_lpni; - struct lnet_peer_ni *best_gw; - struct lnet_peer_ni *lpni; - struct lnet_peer_ni *final_dst; - struct lnet_peer *peer; - struct lnet_peer_net *peer_net; - struct lnet_net *local_net; - __u32 seq; - int cpt, cpt2, rc; - bool routing; - bool routing2; - bool ni_is_pref; - bool preferred; - bool local_found; - int best_lpni_credits; - int md_cpt; - - /* - * get an initial CPT to use for locking. The idea here is not to - * serialize the calls to select_pathway, so that as many - * operations can run concurrently as possible. To do that we use - * the CPT where this call is being executed. Later on when we - * determine the CPT to use in lnet_message_commit, we switch the - * lock and check if there was any configuration change. If none, - * then we proceed, if there is, then we restart the operation. - */ - cpt = lnet_net_lock_current(); - - md_cpt = lnet_cpt_of_md(msg->msg_md); - if (md_cpt == CFS_CPT_ANY) - md_cpt = cpt; - -again: - best_ni = NULL; - best_lpni = NULL; - best_gw = NULL; - final_dst = NULL; - local_net = NULL; - routing = false; - routing2 = false; - local_found = false; - - seq = lnet_get_dlc_seq_locked(); - - if (the_lnet.ln_state != LNET_STATE_RUNNING) { - lnet_net_unlock(cpt); - return -ESHUTDOWN; - } - - peer = lnet_find_or_create_peer_locked(dst_nid, cpt); - if (IS_ERR(peer)) { - lnet_net_unlock(cpt); - return PTR_ERR(peer); + if (msg->msg_type == LNET_MSG_PUT) { + if (msg->msg_hdr.msg.put.ptl_index != LNET_RESERVED_PORTAL) + return true; + } else if (msg->msg_type == LNET_MSG_GET) { + if (msg->msg_hdr.msg.get.ptl_index != LNET_RESERVED_PORTAL) + return true; } + return false; +} - /* If peer is not healthy then can not send anything to it */ - if (!lnet_is_peer_healthy_locked(peer)) { - lnet_net_unlock(cpt); - return -EHOSTUNREACH; - } +#define SRC_SPEC 0x0001 +#define SRC_ANY 0x0002 +#define LOCAL_DST 0x0004 +#define REMOTE_DST 0x0008 +#define MR_DST 0x0010 +#define NMR_DST 0x0020 +#define SND_RESP 0x0040 + +/* The following to defines are used for return codes */ +#define REPEAT_SEND 0x1000 +#define PASS_THROUGH 0x2000 + +/* The different cases lnet_select pathway needs to handle */ +#define SRC_SPEC_LOCAL_MR_DST (SRC_SPEC | LOCAL_DST | MR_DST) +#define SRC_SPEC_ROUTER_MR_DST (SRC_SPEC | REMOTE_DST | MR_DST) +#define SRC_SPEC_LOCAL_NMR_DST (SRC_SPEC | LOCAL_DST | NMR_DST) +#define SRC_SPEC_ROUTER_NMR_DST (SRC_SPEC | REMOTE_DST | NMR_DST) +#define SRC_ANY_LOCAL_MR_DST (SRC_ANY | LOCAL_DST | MR_DST) +#define SRC_ANY_ROUTER_MR_DST (SRC_ANY | REMOTE_DST | MR_DST) +#define SRC_ANY_LOCAL_NMR_DST (SRC_ANY | LOCAL_DST | NMR_DST) +#define SRC_ANY_ROUTER_NMR_DST (SRC_ANY | REMOTE_DST | NMR_DST) - if (!peer->lp_multi_rail && lnet_get_num_peer_nis(peer) > 1) { - CERROR("peer %s is declared to be non MR capable, " - "yet configured with more than one NID\n", - libcfs_nid2str(dst_nid)); - return -EINVAL; - } +static int +lnet_handle_send(struct lnet_send_data *sd) +{ + struct lnet_ni *best_ni = sd->sd_best_ni; + struct lnet_peer_ni *best_lpni = sd->sd_best_lpni; + struct lnet_peer_ni *final_dst_lpni = sd->sd_final_dst_lpni; + struct lnet_msg *msg = sd->sd_msg; + int cpt2; + __u32 send_case = sd->sd_send_case; + int rc; + __u32 routing = send_case & REMOTE_DST; /* - * STEP 1: first jab at determining best_ni - * if src_nid is explicitly specified, then best_ni is already - * pre-determiend for us. Otherwise we need to select the best - * one to use later on + * Increment sequence number of the selected peer so that we + * pick the next one in Round Robin. */ - if (src_nid != LNET_NID_ANY) { - best_ni = lnet_nid2ni_locked(src_nid, cpt); - if (!best_ni) { - lnet_net_unlock(cpt); - LCONSOLE_WARN("Can't send to %s: src %s is not a " - "local nid\n", libcfs_nid2str(dst_nid), - libcfs_nid2str(src_nid)); - return -EINVAL; - } - } - - if (msg->msg_type == LNET_MSG_REPLY || - msg->msg_type == LNET_MSG_ACK || - !peer->lp_multi_rail || - best_ni) { - /* - * for replies we want to respond on the same peer_ni we - * received the message on if possible. If not, then pick - * a peer_ni to send to - * - * if the peer is non-multi-rail then you want to send to - * the dst_nid provided as well. - * - * If the best_ni has already been determined, IE the - * src_nid has been specified, then use the - * destination_nid provided as well, since we're - * continuing a series of related messages for the same - * RPC. - * - * It is expected to find the lpni using dst_nid, since we - * created it earlier. - */ - best_lpni = lnet_find_peer_ni_locked(dst_nid); - if (best_lpni) - lnet_peer_ni_decref_locked(best_lpni); - - if (best_lpni && !lnet_get_net_locked(LNET_NIDNET(dst_nid))) { - /* - * this lpni is not on a local network so we need - * to route this reply. - */ - best_gw = lnet_find_route_locked(NULL, - best_lpni->lpni_nid, - rtr_nid); - if (best_gw) { - /* - * RULE: Each node considers only the next-hop - * - * We're going to route the message, so change the peer to - * the router. - */ - LASSERT(best_gw->lpni_peer_net); - LASSERT(best_gw->lpni_peer_net->lpn_peer); - peer = best_gw->lpni_peer_net->lpn_peer; - - /* - * if the router is not multi-rail then use the best_gw - * found to send the message to - */ - if (!peer->lp_multi_rail) - best_lpni = best_gw; - else - best_lpni = NULL; - - routing = true; - } else { - best_lpni = NULL; - } - } else if (!best_lpni) { - lnet_net_unlock(cpt); - CERROR("unable to send msg_type %d to " - "originating %s. Destination NID not in DB\n", - msg->msg_type, libcfs_nid2str(dst_nid)); - return -EINVAL; - } - } + best_lpni->lpni_seq++; /* - * if the peer is not MR capable, then we should always send to it - * using the first NI in the NET we determined. + * grab a reference on the peer_ni so it sticks around even if + * we need to drop and relock the lnet_net_lock below. */ - if (!peer->lp_multi_rail) { - if (!best_lpni) { - lnet_net_unlock(cpt); - CERROR("no route to %s\n", - libcfs_nid2str(dst_nid)); - return -EHOSTUNREACH; - } + lnet_peer_ni_addref_locked(best_lpni); - /* best ni could be set because src_nid was provided */ - if (!best_ni) { - best_ni = lnet_net2ni_locked(best_lpni->lpni_net->net_id, cpt); - if (!best_ni) { - lnet_net_unlock(cpt); - CERROR("no path to %s from net %s\n", - libcfs_nid2str(best_lpni->lpni_nid), - libcfs_net2str(best_lpni->lpni_net->net_id)); - return -EHOSTUNREACH; - } + /* + * Use lnet_cpt_of_nid() to determine the CPT used to commit the + * message. This ensures that we get a CPT that is correct for + * the NI when the NI has been restricted to a subset of all CPTs. + * If the selected CPT differs from the one currently locked, we + * must unlock and relock the lnet_net_lock(), and then check whether + * the configuration has changed. We don't have a hold on the best_ni + * yet, and it may have vanished. + */ + cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni); + if (sd->sd_cpt != cpt2) { + __u32 seq = lnet_get_dlc_seq_locked(); + lnet_net_unlock(sd->sd_cpt); + sd->sd_cpt = cpt2; + lnet_net_lock(sd->sd_cpt); + if (seq != lnet_get_dlc_seq_locked()) { + lnet_peer_ni_decref_locked(best_lpni); + return REPEAT_SEND; } } /* - * if we already found a best_ni because src_nid is specified and - * best_lpni because we are replying to a message then just send - * the message + * store the best_lpni in the message right away to avoid having + * to do the same operation under different conditions */ - if (best_ni && best_lpni) - goto send; + msg->msg_txpeer = best_lpni; + msg->msg_txni = best_ni; /* - * If we already found a best_ni because src_nid is specified then - * pick the peer then send the message + * grab a reference for the best_ni since now it's in use in this + * send. The reference will be dropped in lnet_finalize() */ - if (best_ni) - goto pick_peer; + lnet_ni_addref_locked(msg->msg_txni, sd->sd_cpt); /* - * pick the best_ni by going through all the possible networks of - * that peer and see which local NI is best suited to talk to that - * peer. - * - * Locally connected networks will always be preferred over - * a routed network. If there are only routed paths to the peer, - * then the best route is chosen. If all routes are equal then - * they are used in round robin. + * Always set the target.nid to the best peer picked. Either the + * NID will be one of the peer NIDs selected, or the same NID as + * what was originally set in the target or it will be the NID of + * a router if this message should be routed */ - list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_on_peer_list) { - if (!lnet_is_peer_net_healthy_locked(peer_net)) - continue; - - local_net = lnet_get_net_locked(peer_net->lpn_net_id); - if (!local_net && !routing && !local_found) { - struct lnet_peer_ni *net_gw; - - lpni = list_entry(peer_net->lpn_peer_nis.next, - struct lnet_peer_ni, - lpni_on_peer_net_list); - - net_gw = lnet_find_route_locked(NULL, - lpni->lpni_nid, - rtr_nid); - if (!net_gw) - continue; - - if (best_gw) { - /* - * lnet_find_route_locked() call - * will return the best_Gw on the - * lpni->lpni_nid network. - * However, best_gw and net_gw can - * be on different networks. - * Therefore need to compare them - * to pick the better of either. - */ - if (lnet_compare_peers(best_gw, net_gw) > 0) - continue; - if (best_gw->lpni_gw_seq <= net_gw->lpni_gw_seq) - continue; - } - best_gw = net_gw; - final_dst = lpni; - - routing2 = true; - } else { - best_gw = NULL; - final_dst = NULL; - routing2 = false; - local_found = true; - } - - /* - * a gw on this network is found, but there could be - * other better gateways on other networks. So don't pick - * the best_ni until we determine the best_gw. - */ - if (best_gw) - continue; - - /* if no local_net found continue */ - if (!local_net) - continue; - - /* - * Iterate through the NIs in this local Net and select - * the NI to send from. The selection is determined by - * these 3 criterion in the following priority: - * 1. NUMA - * 2. NI available credits - * 3. Round Robin - */ - best_ni = lnet_get_best_ni(local_net, best_ni, md_cpt); - } - - if (!best_ni && !best_gw) { - lnet_net_unlock(cpt); - LCONSOLE_WARN("No local ni found to send from to %s\n", - libcfs_nid2str(dst_nid)); - return -EINVAL; - } - - if (!best_ni) { - best_ni = lnet_get_best_ni(best_gw->lpni_net, best_ni, md_cpt); - LASSERT(best_gw && best_ni); - - /* - * We're going to route the message, so change the peer to - * the router. - */ - LASSERT(best_gw->lpni_peer_net); - LASSERT(best_gw->lpni_peer_net->lpn_peer); - best_gw->lpni_gw_seq++; - peer = best_gw->lpni_peer_net->lpn_peer; - } + msg->msg_target.nid = msg->msg_txpeer->lpni_nid; /* - * Now that we selected the NI to use increment its sequence - * number so the Round Robin algorithm will detect that it has - * been used and pick the next NI. + * lnet_msg_commit assigns the correct cpt to the message, which + * is used to decrement the correct refcount on the ni when it's + * time to return the credits */ - best_ni->ni_seq++; + lnet_msg_commit(msg, sd->sd_cpt); -pick_peer: - /* - * At this point the best_ni is on a local network on which - * the peer has a peer_ni as well - */ - peer_net = lnet_peer_get_net_locked(peer, - best_ni->ni_net->net_id); /* - * peer_net is not available or the src_nid is explicitly defined - * and the peer_net for that src_nid is unhealthy. find a route to - * the destination nid. + * If we are routing the message then we keep the src_nid that was + * set by the originator. If we are not routing then we are the + * originator and set it here. */ - if (!peer_net || - (src_nid != LNET_NID_ANY && - !lnet_is_peer_net_healthy_locked(peer_net))) { - best_gw = lnet_find_route_locked(best_ni->ni_net, - dst_nid, - rtr_nid); - /* - * if no route is found for that network then - * move onto the next peer_ni in the peer - */ - if (!best_gw) { - lnet_net_unlock(cpt); - LCONSOLE_WARN("No route to peer from %s\n", - libcfs_nid2str(best_ni->ni_nid)); - return -EHOSTUNREACH; - } - - CDEBUG(D_NET, "Best route to %s via %s for %s %d\n", - libcfs_nid2str(dst_nid), - libcfs_nid2str(best_gw->lpni_nid), - lnet_msgtyp2str(msg->msg_type), msg->msg_len); + if (!msg->msg_routing) + msg->msg_hdr.src_nid = cpu_to_le64(msg->msg_txni->ni_nid); - routing2 = true; + if (routing) { + msg->msg_target_is_router = 1; + msg->msg_target.pid = LNET_PID_LUSTRE; /* - * RULE: Each node considers only the next-hop + * since we're routing we want to ensure that the + * msg_hdr.dest_nid is set to the final destination. When + * the router receives this message it knows how to route + * it. * - * We're going to route the message, so change the peer to - * the router. + * final_dst_lpni is set at the beginning of the + * lnet_select_pathway() function and is never changed. + * It's safe to use it here. */ - LASSERT(best_gw->lpni_peer_net); - LASSERT(best_gw->lpni_peer_net->lpn_peer); - peer = best_gw->lpni_peer_net->lpn_peer; - } else if (!lnet_is_peer_net_healthy_locked(peer_net)) { + msg->msg_hdr.dest_nid = cpu_to_le64(final_dst_lpni->lpni_nid); + } else { /* - * this peer_net is unhealthy but we still have an opportunity - * to find another peer_net that we can use + * if we're not routing set the dest_nid to the best peer + * ni NID that we picked earlier in the algorithm. */ - __u32 net_id = peer_net->lpn_net_id; - LCONSOLE_WARN("peer net %s unhealthy\n", - libcfs_net2str(net_id)); - goto again; + msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid); } + rc = lnet_post_send_locked(msg, 0); + + if (!rc) + CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s\n", + libcfs_nid2str(msg->msg_hdr.src_nid), + libcfs_nid2str(msg->msg_txni->ni_nid), + libcfs_nid2str(sd->sd_src_nid), + libcfs_nid2str(msg->msg_hdr.dest_nid), + libcfs_nid2str(sd->sd_dst_nid), + libcfs_nid2str(msg->msg_txpeer->lpni_nid), + lnet_msgtyp2str(msg->msg_type)); + + return rc; +} + +static struct lnet_peer_ni * +lnet_select_peer_ni(struct lnet_send_data *sd, struct lnet_peer *peer, + struct lnet_peer_net *peer_net) +{ /* * Look at the peer NIs for the destination peer that connect * to the chosen net. If a peer_ni is preferred when using the @@ -1738,21 +1720,42 @@ pick_peer: * the available transmit credits are used. If the transmit * credits are equal, we round-robin over the peer_ni. */ - lpni = NULL; - best_lpni_credits = INT_MIN; - preferred = false; - best_lpni = NULL; + struct lnet_peer_ni *lpni = NULL; + struct lnet_peer_ni *best_lpni = NULL; + struct lnet_ni *best_ni = sd->sd_best_ni; + lnet_nid_t dst_nid = sd->sd_dst_nid; + int best_lpni_credits = INT_MIN; + bool preferred = false; + bool ni_is_pref; + int best_lpni_healthv = 0; + int lpni_healthv; + while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) { /* - * if this peer ni is not healthy just skip it, no point in - * examining it further + * if the best_ni we've chosen aleady has this lpni + * preferred, then let's use it */ - if (!lnet_is_peer_ni_healthy_locked(lpni)) - continue; - ni_is_pref = lnet_peer_is_ni_pref_locked(lpni, best_ni); + ni_is_pref = lnet_peer_is_pref_nid_locked(lpni, + best_ni->ni_nid); + + lpni_healthv = atomic_read(&lpni->lpni_healthv); + + CDEBUG(D_NET, "%s ni_is_pref = %d\n", + libcfs_nid2str(best_ni->ni_nid), ni_is_pref); + + if (best_lpni) + CDEBUG(D_NET, "%s c:[%d, %d], s:[%d, %d]\n", + libcfs_nid2str(lpni->lpni_nid), + lpni->lpni_txcredits, best_lpni_credits, + lpni->lpni_seq, best_lpni->lpni_seq); + /* pick the healthiest peer ni */ + if (lpni_healthv < best_lpni_healthv) { + continue; + } else if (lpni_healthv > best_lpni_healthv) { + best_lpni_healthv = lpni_healthv; /* if this is a preferred peer use it */ - if (!preferred && ni_is_pref) { + } else if (!preferred && ni_is_pref) { preferred = true; } else if (preferred && !ni_is_pref) { /* @@ -1788,162 +1791,1806 @@ pick_peer: if (!best_lpni) { __u32 net_id = (peer_net) ? peer_net->lpn_net_id : LNET_NIDNET(dst_nid); - lnet_net_unlock(cpt); - LCONSOLE_WARN("no peer_ni found on peer net %s\n", + CDEBUG(D_NET, "no peer_ni found on peer net %s\n", libcfs_net2str(net_id)); - return -EHOSTUNREACH; + return NULL; } + CDEBUG(D_NET, "sd_best_lpni = %s\n", + libcfs_nid2str(best_lpni->lpni_nid)); -send: - /* Shortcut for loopback. */ - if (best_ni == the_lnet.ln_loni) { - /* No send credit hassles with LOLND */ - lnet_ni_addref_locked(best_ni, cpt); - msg->msg_hdr.dest_nid = cpu_to_le64(best_ni->ni_nid); - if (!msg->msg_routing) - msg->msg_hdr.src_nid = cpu_to_le64(best_ni->ni_nid); - msg->msg_target.nid = best_ni->ni_nid; - lnet_msg_commit(msg, cpt); - msg->msg_txni = best_ni; - lnet_net_unlock(cpt); - - return LNET_CREDIT_OK; - } + return best_lpni; +} - routing = routing || routing2; +/* + * Prerequisite: the best_ni should already be set in the sd + */ +static inline struct lnet_peer_ni * +lnet_find_best_lpni_on_net(struct lnet_send_data *sd, struct lnet_peer *peer, + __u32 net_id) +{ + struct lnet_peer_net *peer_net; /* - * Increment sequence number of the peer selected so that we - * pick the next one in Round Robin. + * The gateway is Multi-Rail capable so now we must select the + * proper peer_ni + */ + peer_net = lnet_peer_get_net_locked(peer, net_id); + + if (!peer_net) { + CERROR("gateway peer %s has no NI on net %s\n", + libcfs_nid2str(peer->lp_primary_nid), + libcfs_net2str(net_id)); + return NULL; + } + + return lnet_select_peer_ni(sd, peer, peer_net); +} + +static inline void +lnet_set_non_mr_pref_nid(struct lnet_send_data *sd) +{ + if (sd->sd_send_case & NMR_DST && + sd->sd_msg->msg_type != LNET_MSG_REPLY && + sd->sd_msg->msg_type != LNET_MSG_ACK && + sd->sd_best_lpni->lpni_pref_nnids == 0) { + CDEBUG(D_NET, "Setting preferred local NID %s on NMR peer %s\n", + libcfs_nid2str(sd->sd_best_ni->ni_nid), + libcfs_nid2str(sd->sd_best_lpni->lpni_nid)); + lnet_peer_ni_set_non_mr_pref_nid(sd->sd_best_lpni, + sd->sd_best_ni->ni_nid); + } +} + +/* + * Source Specified + * Local Destination + * non-mr peer + * + * use the source and destination NIDs as the pathway + */ +static int +lnet_handle_spec_local_nmr_dst(struct lnet_send_data *sd) +{ + /* the destination lpni is set before we get here. */ + + /* find local NI */ + sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt); + if (!sd->sd_best_ni) { + CERROR("Can't send to %s: src %s is not a " + "local nid\n", libcfs_nid2str(sd->sd_dst_nid), + libcfs_nid2str(sd->sd_src_nid)); + return -EINVAL; + } + + /* + * the preferred NID will only be set for NMR peers + */ + lnet_set_non_mr_pref_nid(sd); + + return lnet_handle_send(sd); +} + +/* + * Source Specified + * Local Destination + * MR Peer + * + * Run the selection algorithm on the peer NIs unless we're sending + * a response, in this case just send to the destination + */ +static int +lnet_handle_spec_local_mr_dst(struct lnet_send_data *sd) +{ + sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt); + if (!sd->sd_best_ni) { + CERROR("Can't send to %s: src %s is not a " + "local nid\n", libcfs_nid2str(sd->sd_dst_nid), + libcfs_nid2str(sd->sd_src_nid)); + return -EINVAL; + } + + /* + * only run the selection algorithm to pick the peer_ni if we're + * sending a GET or a PUT. Responses are sent to the same + * destination NID provided. + */ + if (!(sd->sd_send_case & SND_RESP)) { + sd->sd_best_lpni = + lnet_find_best_lpni_on_net(sd, sd->sd_peer, + sd->sd_best_ni->ni_net->net_id); + } + + if (sd->sd_best_lpni) + return lnet_handle_send(sd); + + CERROR("can't send to %s. no NI on %s\n", + libcfs_nid2str(sd->sd_dst_nid), + libcfs_net2str(sd->sd_best_ni->ni_net->net_id)); + + return -EHOSTUNREACH; +} + +struct lnet_ni * +lnet_find_best_ni_on_spec_net(struct lnet_ni *cur_best_ni, + struct lnet_peer *peer, + struct lnet_peer_net *peer_net, + int cpt, + bool incr_seq) +{ + struct lnet_net *local_net; + struct lnet_ni *best_ni; + + local_net = lnet_get_net_locked(peer_net->lpn_net_id); + if (!local_net) + return NULL; + + /* + * Iterate through the NIs in this local Net and select + * the NI to send from. The selection is determined by + * these 3 criterion in the following priority: + * 1. NUMA + * 2. NI available credits + * 3. Round Robin + */ + best_ni = lnet_get_best_ni(local_net, cur_best_ni, + peer, peer_net, cpt); + + if (incr_seq && best_ni) + best_ni->ni_seq++; + + return best_ni; +} + +static int +lnet_handle_find_routed_path(struct lnet_send_data *sd, + lnet_nid_t dst_nid, + struct lnet_peer_ni **gw_lpni, + struct lnet_peer **gw_peer) +{ + struct lnet_peer_ni *gw; + lnet_nid_t src_nid = sd->sd_src_nid; + + gw = lnet_find_route_locked(NULL, LNET_NIDNET(dst_nid), + sd->sd_rtr_nid); + if (!gw) { + CERROR("no route to %s from %s\n", + libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid)); + return -EHOSTUNREACH; + } + + /* get the peer of the gw_ni */ + LASSERT(gw->lpni_peer_net); + LASSERT(gw->lpni_peer_net->lpn_peer); + + *gw_peer = gw->lpni_peer_net->lpn_peer; + + if (!sd->sd_best_ni) + sd->sd_best_ni = lnet_find_best_ni_on_spec_net(NULL, *gw_peer, + gw->lpni_peer_net, + sd->sd_md_cpt, + true); + + if (!sd->sd_best_ni) { + CERROR("Internal Error. Expected local ni on %s " + "but non found :%s\n", + libcfs_net2str(gw->lpni_peer_net->lpn_net_id), + libcfs_nid2str(sd->sd_src_nid)); + return -EFAULT; + } + + /* + * if gw is MR let's find its best peer_ni + */ + if (lnet_peer_is_multi_rail(*gw_peer)) { + gw = lnet_find_best_lpni_on_net(sd, *gw_peer, + sd->sd_best_ni->ni_net->net_id); + /* + * We've already verified that the gw has an NI on that + * desired net, but we're not finding it. Something is + * wrong. + */ + if (!gw) { + CERROR("Internal Error. Route expected to %s from %s\n", + libcfs_nid2str(dst_nid), + libcfs_nid2str(src_nid)); + return -EFAULT; + } + } + + *gw_lpni = gw; + + return 0; +} + +/* + * Handle two cases: + * + * Case 1: + * Source specified + * Remote destination + * Non-MR destination + * + * Case 2: + * Source specified + * Remote destination + * MR destination + * + * The handling of these two cases is similar. Even though the destination + * can be MR or non-MR, we'll deal directly with the router. + */ +static int +lnet_handle_spec_router_dst(struct lnet_send_data *sd) +{ + int rc; + struct lnet_peer_ni *gw_lpni = NULL; + struct lnet_peer *gw_peer = NULL; + + /* find local NI */ + sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt); + if (!sd->sd_best_ni) { + CERROR("Can't send to %s: src %s is not a " + "local nid\n", libcfs_nid2str(sd->sd_dst_nid), + libcfs_nid2str(sd->sd_src_nid)); + return -EINVAL; + } + + rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni, + &gw_peer); + if (rc < 0) + return rc; + + if (sd->sd_send_case & NMR_DST) + /* + * since the final destination is non-MR let's set its preferred + * NID before we send + */ + lnet_set_non_mr_pref_nid(sd); + + /* + * We're going to send to the gw found so let's set its + * info + */ + sd->sd_peer = gw_peer; + sd->sd_best_lpni = gw_lpni; + + return lnet_handle_send(sd); +} + +struct lnet_ni * +lnet_find_best_ni_on_local_net(struct lnet_peer *peer, int md_cpt) +{ + struct lnet_peer_net *peer_net = NULL; + struct lnet_ni *best_ni = NULL; + + /* + * The peer can have multiple interfaces, some of them can be on + * the local network and others on a routed network. We should + * prefer the local network. However if the local network is not + * available then we need to try the routed network + */ + + /* go through all the peer nets and find the best_ni */ + list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) { + /* + * The peer's list of nets can contain non-local nets. We + * want to only examine the local ones. + */ + if (!lnet_get_net_locked(peer_net->lpn_net_id)) + continue; + best_ni = lnet_find_best_ni_on_spec_net(best_ni, peer, + peer_net, md_cpt, false); + } + + if (best_ni) + /* increment sequence number so we can round robin */ + best_ni->ni_seq++; + + return best_ni; +} + +static struct lnet_ni * +lnet_find_existing_preferred_best_ni(struct lnet_send_data *sd) +{ + struct lnet_ni *best_ni = NULL; + struct lnet_peer_net *peer_net; + struct lnet_peer *peer = sd->sd_peer; + struct lnet_peer_ni *best_lpni = sd->sd_best_lpni; + struct lnet_peer_ni *lpni; + int cpt = sd->sd_cpt; + + /* + * We must use a consistent source address when sending to a + * non-MR peer. However, a non-MR peer can have multiple NIDs + * on multiple networks, and we may even need to talk to this + * peer on multiple networks -- certain types of + * load-balancing configuration do this. + * + * So we need to pick the NI the peer prefers for this + * particular network. + */ + + /* Get the target peer_ni */ + peer_net = lnet_peer_get_net_locked(peer, + LNET_NIDNET(best_lpni->lpni_nid)); + LASSERT(peer_net != NULL); + list_for_each_entry(lpni, &peer_net->lpn_peer_nis, + lpni_peer_nis) { + if (lpni->lpni_pref_nnids == 0) + continue; + LASSERT(lpni->lpni_pref_nnids == 1); + best_ni = lnet_nid2ni_locked( + lpni->lpni_pref.nid, cpt); + break; + } + + return best_ni; +} + +/* Prerequisite: sd->sd_peer and sd->sd_best_lpni should be set */ +static int +lnet_select_preferred_best_ni(struct lnet_send_data *sd) +{ + struct lnet_ni *best_ni = NULL; + struct lnet_peer_ni *best_lpni = sd->sd_best_lpni; + + /* + * We must use a consistent source address when sending to a + * non-MR peer. However, a non-MR peer can have multiple NIDs + * on multiple networks, and we may even need to talk to this + * peer on multiple networks -- certain types of + * load-balancing configuration do this. + * + * So we need to pick the NI the peer prefers for this + * particular network. + */ + + best_ni = lnet_find_existing_preferred_best_ni(sd); + + /* if best_ni is still not set just pick one */ + if (!best_ni) { + best_ni = + lnet_find_best_ni_on_spec_net(NULL, sd->sd_peer, + sd->sd_best_lpni->lpni_peer_net, + sd->sd_md_cpt, true); + /* If there is no best_ni we don't have a route */ + if (!best_ni) { + CERROR("no path to %s from net %s\n", + libcfs_nid2str(best_lpni->lpni_nid), + libcfs_net2str(best_lpni->lpni_net->net_id)); + return -EHOSTUNREACH; + } + } + + sd->sd_best_ni = best_ni; + + /* Set preferred NI if necessary. */ + lnet_set_non_mr_pref_nid(sd); + + return 0; +} + + +/* + * Source not specified + * Local destination + * Non-MR Peer + * + * always use the same source NID for NMR peers + * If we've talked to that peer before then we already have a preferred + * source NI associated with it. Otherwise, we select a preferred local NI + * and store it in the peer + */ +static int +lnet_handle_any_local_nmr_dst(struct lnet_send_data *sd) +{ + int rc; + + /* sd->sd_best_lpni is already set to the final destination */ + + /* + * At this point we should've created the peer ni and peer. If we + * can't find it, then something went wrong. Instead of assert + * output a relevant message and fail the send + */ + if (!sd->sd_best_lpni) { + CERROR("Internal fault. Unable to send msg %s to %s. " + "NID not known\n", + lnet_msgtyp2str(sd->sd_msg->msg_type), + libcfs_nid2str(sd->sd_dst_nid)); + return -EFAULT; + } + + rc = lnet_select_preferred_best_ni(sd); + if (!rc) + rc = lnet_handle_send(sd); + + return rc; +} + +static int +lnet_handle_any_mr_dsta(struct lnet_send_data *sd) +{ + /* + * NOTE we've already handled the remote peer case. So we only + * need to worry about the local case here. + * + * if we're sending a response, ACK or reply, we need to send it + * to the destination NID given to us. At this point we already + * have the peer_ni we're suppose to send to, so just find the + * best_ni on the peer net and use that. Since we're sending to an + * MR peer then we can just run the selection algorithm on our + * local NIs and pick the best one. + */ + if (sd->sd_send_case & SND_RESP) { + sd->sd_best_ni = + lnet_find_best_ni_on_spec_net(NULL, sd->sd_peer, + sd->sd_best_lpni->lpni_peer_net, + sd->sd_md_cpt, true); + + if (!sd->sd_best_ni) { + /* + * We're not going to deal with not able to send + * a response to the provided final destination + */ + CERROR("Can't send response to %s. " + "No local NI available\n", + libcfs_nid2str(sd->sd_dst_nid)); + return -EHOSTUNREACH; + } + + return lnet_handle_send(sd); + } + + /* + * If we get here that means we're sending a fresh request, PUT or + * GET, so we need to run our standard selection algorithm. + * First find the best local interface that's on any of the peer's + * networks. + */ + sd->sd_best_ni = lnet_find_best_ni_on_local_net(sd->sd_peer, + sd->sd_md_cpt); + if (sd->sd_best_ni) { + sd->sd_best_lpni = + lnet_find_best_lpni_on_net(sd, sd->sd_peer, + sd->sd_best_ni->ni_net->net_id); + + /* + * if we're successful in selecting a peer_ni on the local + * network, then send to it. Otherwise fall through and + * try and see if we can reach it over another routed + * network + */ + if (sd->sd_best_lpni) { + /* + * in case we initially started with a routed + * destination, let's reset to local + */ + sd->sd_send_case &= ~REMOTE_DST; + sd->sd_send_case |= LOCAL_DST; + return lnet_handle_send(sd); + } + + CERROR("Internal Error. Expected to have a best_lpni: " + "%s -> %s\n", + libcfs_nid2str(sd->sd_src_nid), + libcfs_nid2str(sd->sd_dst_nid)); + + return -EFAULT; + } + + /* + * Peer doesn't have a local network. Let's see if there is + * a remote network we can reach it on. + */ + return PASS_THROUGH; +} + +/* + * Case 1: + * Source NID not specified + * Local destination + * MR peer + * + * Case 2: + * Source NID not speified + * Remote destination + * MR peer + * + * In both of these cases if we're sending a response, ACK or REPLY, then + * we need to send to the destination NID provided. + * + * In the remote case let's deal with MR routers. + * + */ + +static int +lnet_handle_any_mr_dst(struct lnet_send_data *sd) +{ + int rc = 0; + struct lnet_peer *gw_peer = NULL; + struct lnet_peer_ni *gw_lpni = NULL; + + /* + * handle sending a response to a remote peer here so we don't + * have to worry about it if we hit lnet_handle_any_mr_dsta() + */ + if (sd->sd_send_case & REMOTE_DST && + sd->sd_send_case & SND_RESP) { + struct lnet_peer_ni *gw; + struct lnet_peer *gw_peer; + + rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw, + &gw_peer); + if (rc < 0) { + CERROR("Can't send response to %s. " + "No route available\n", + libcfs_nid2str(sd->sd_dst_nid)); + return -EHOSTUNREACH; + } + + sd->sd_best_lpni = gw; + sd->sd_peer = gw_peer; + + return lnet_handle_send(sd); + } + + /* + * Even though the NID for the peer might not be on a local network, + * since the peer is MR there could be other interfaces on the + * local network. In that case we'd still like to prefer the local + * network over the routed network. If we're unable to do that + * then we select the best router among the different routed networks, + * and if the router is MR then we can deal with it as such. + */ + rc = lnet_handle_any_mr_dsta(sd); + if (rc != PASS_THROUGH) + return rc; + + /* + * TODO; One possible enhancement is to run the selection + * algorithm on the peer. However for remote peers the credits are + * not decremented, so we'll be basically going over the peer NIs + * in round robin. An MR router will run the selection algorithm + * on the next-hop interfaces. + */ + rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni, + &gw_peer); + if (rc < 0) + return rc; + + sd->sd_send_case &= ~LOCAL_DST; + sd->sd_send_case |= REMOTE_DST; + + sd->sd_peer = gw_peer; + sd->sd_best_lpni = gw_lpni; + + return lnet_handle_send(sd); +} + +/* + * Source not specified + * Remote destination + * Non-MR peer + * + * Must send to the specified peer NID using the same source NID that + * we've used before. If it's the first time to talk to that peer then + * find the source NI and assign it as preferred to that peer + */ +static int +lnet_handle_any_router_nmr_dst(struct lnet_send_data *sd) +{ + int rc; + struct lnet_peer_ni *gw_lpni = NULL; + struct lnet_peer *gw_peer = NULL; + + /* + * Let's set if we have a preferred NI to talk to this NMR peer + */ + sd->sd_best_ni = lnet_find_existing_preferred_best_ni(sd); + + /* + * find the router and that'll find the best NI if we didn't find + * it already. + */ + rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni, + &gw_peer); + if (rc < 0) + return rc; + + /* + * set the best_ni we've chosen as the preferred one for + * this peer + */ + lnet_set_non_mr_pref_nid(sd); + + /* we'll be sending to the gw */ + sd->sd_best_lpni = gw_lpni; + sd->sd_peer = gw_peer; + + return lnet_handle_send(sd); +} + +static int +lnet_handle_send_case_locked(struct lnet_send_data *sd) +{ + /* + * turn off the SND_RESP bit. + * It will be checked in the case handling + */ + __u32 send_case = sd->sd_send_case &= ~SND_RESP ; + + CDEBUG(D_NET, "Source %s%s to %s %s %s destination\n", + (send_case & SRC_SPEC) ? "Specified: " : "ANY", + (send_case & SRC_SPEC) ? libcfs_nid2str(sd->sd_src_nid) : "", + (send_case & MR_DST) ? "MR: " : "NMR: ", + libcfs_nid2str(sd->sd_dst_nid), + (send_case & LOCAL_DST) ? "local" : "routed"); + + switch (send_case) { + /* + * For all cases where the source is specified, we should always + * use the destination NID, whether it's an MR destination or not, + * since we're continuing a series of related messages for the + * same RPC + */ + case SRC_SPEC_LOCAL_NMR_DST: + return lnet_handle_spec_local_nmr_dst(sd); + case SRC_SPEC_LOCAL_MR_DST: + return lnet_handle_spec_local_mr_dst(sd); + case SRC_SPEC_ROUTER_NMR_DST: + case SRC_SPEC_ROUTER_MR_DST: + return lnet_handle_spec_router_dst(sd); + case SRC_ANY_LOCAL_NMR_DST: + return lnet_handle_any_local_nmr_dst(sd); + case SRC_ANY_LOCAL_MR_DST: + case SRC_ANY_ROUTER_MR_DST: + return lnet_handle_any_mr_dst(sd); + case SRC_ANY_ROUTER_NMR_DST: + return lnet_handle_any_router_nmr_dst(sd); + default: + CERROR("Unknown send case\n"); + return -1; + } +} + +static int +lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid, + struct lnet_msg *msg, lnet_nid_t rtr_nid) +{ + struct lnet_peer_ni *lpni; + struct lnet_peer *peer; + struct lnet_send_data send_data; + int cpt, rc; + int md_cpt; + __u32 send_case = 0; + + memset(&send_data, 0, sizeof(send_data)); + + /* + * get an initial CPT to use for locking. The idea here is not to + * serialize the calls to select_pathway, so that as many + * operations can run concurrently as possible. To do that we use + * the CPT where this call is being executed. Later on when we + * determine the CPT to use in lnet_message_commit, we switch the + * lock and check if there was any configuration change. If none, + * then we proceed, if there is, then we restart the operation. + */ + cpt = lnet_net_lock_current(); + + md_cpt = lnet_cpt_of_md(msg->msg_md, msg->msg_offset); + if (md_cpt == CFS_CPT_ANY) + md_cpt = cpt; + +again: + + /* + * If we're being asked to send to the loopback interface, there + * is no need to go through any selection. We can just shortcut + * the entire process and send over lolnd */ - best_lpni->lpni_seq++; + if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) { + /* No send credit hassles with LOLND */ + lnet_ni_addref_locked(the_lnet.ln_loni, cpt); + msg->msg_hdr.dest_nid = cpu_to_le64(the_lnet.ln_loni->ni_nid); + if (!msg->msg_routing) + msg->msg_hdr.src_nid = + cpu_to_le64(the_lnet.ln_loni->ni_nid); + msg->msg_target.nid = the_lnet.ln_loni->ni_nid; + lnet_msg_commit(msg, cpt); + msg->msg_txni = the_lnet.ln_loni; + lnet_net_unlock(cpt); + + return LNET_CREDIT_OK; + } + + /* + * find an existing peer_ni, or create one and mark it as having been + * created due to network traffic. This call will create the + * peer->peer_net->peer_ni tree. + */ + lpni = lnet_nid2peerni_locked(dst_nid, LNET_NID_ANY, cpt); + if (IS_ERR(lpni)) { + lnet_net_unlock(cpt); + return PTR_ERR(lpni); + } + + /* + * Cache the original src_nid. If we need to resend the message + * then we'll need to know whether the src_nid was originally + * specified for this message. If it was originally specified, + * then we need to keep using the same src_nid since it's + * continuing the same sequence of messages. + */ + msg->msg_src_nid_param = src_nid; + + /* + * Now that we have a peer_ni, check if we want to discover + * the peer. Traffic to the LNET_RESERVED_PORTAL should not + * trigger discovery. + */ + peer = lpni->lpni_peer_net->lpn_peer; + if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) { + lnet_nid_t primary_nid; + rc = lnet_discover_peer_locked(lpni, cpt, false); + if (rc) { + lnet_peer_ni_decref_locked(lpni); + lnet_net_unlock(cpt); + return rc; + } + /* The peer may have changed. */ + peer = lpni->lpni_peer_net->lpn_peer; + /* queue message and return */ + msg->msg_rtr_nid_param = rtr_nid; + msg->msg_sending = 0; + list_add_tail(&msg->msg_list, &peer->lp_dc_pendq); + lnet_peer_ni_decref_locked(lpni); + primary_nid = peer->lp_primary_nid; + lnet_net_unlock(cpt); + + CDEBUG(D_NET, "%s pending discovery\n", + libcfs_nid2str(primary_nid)); + + return LNET_DC_WAIT; + } + lnet_peer_ni_decref_locked(lpni); + + /* + * Identify the different send cases + */ + if (src_nid == LNET_NID_ANY) + send_case |= SRC_ANY; + else + send_case |= SRC_SPEC; + + if (lnet_get_net_locked(LNET_NIDNET(dst_nid))) + send_case |= LOCAL_DST; + else + send_case |= REMOTE_DST; + + /* + * if this is a non-MR peer or if we're recovering a peer ni then + * let's consider this an NMR case so we can hit the destination + * NID. + */ + if (!lnet_peer_is_multi_rail(peer) || msg->msg_recovery) + send_case |= NMR_DST; + else + send_case |= MR_DST; + + if (msg->msg_type == LNET_MSG_REPLY || + msg->msg_type == LNET_MSG_ACK) + send_case |= SND_RESP; + + /* assign parameters to the send_data */ + send_data.sd_msg = msg; + send_data.sd_rtr_nid = rtr_nid; + send_data.sd_src_nid = src_nid; + send_data.sd_dst_nid = dst_nid; + send_data.sd_best_lpni = lpni; + /* + * keep a pointer to the final destination in case we're going to + * route, so we'll need to access it later + */ + send_data.sd_final_dst_lpni = lpni; + send_data.sd_peer = peer; + send_data.sd_md_cpt = md_cpt; + send_data.sd_cpt = cpt; + send_data.sd_send_case = send_case; + + rc = lnet_handle_send_case_locked(&send_data); + + if (rc == REPEAT_SEND) + goto again; + + lnet_net_unlock(send_data.sd_cpt); + + return rc; +} + +int +lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid) +{ + lnet_nid_t dst_nid = msg->msg_target.nid; + int rc; + + /* + * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases, + * but we might want to use pre-determined router for ACK/REPLY + * in the future + */ + /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */ + LASSERT(msg->msg_txpeer == NULL); + LASSERT(msg->msg_txni == NULL); + LASSERT(!msg->msg_sending); + LASSERT(!msg->msg_target_is_router); + LASSERT(!msg->msg_receiving); + + msg->msg_sending = 1; + + LASSERT(!msg->msg_tx_committed); + + rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid); + if (rc < 0) + return rc; + + if (rc == LNET_CREDIT_OK) + lnet_ni_send(msg->msg_txni, msg); + + /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */ + return 0; +} + +enum lnet_mt_event_type { + MT_TYPE_LOCAL_NI = 0, + MT_TYPE_PEER_NI +}; + +struct lnet_mt_event_info { + enum lnet_mt_event_type mt_type; + lnet_nid_t mt_nid; +}; + +void +lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt) +{ + struct lnet_rsp_tracker *rspt; + + /* + * msg has a refcount on the MD so the MD is not going away. + * The rspt queue for the cpt is protected by + * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie. + */ + lnet_res_lock(cpt); + if (!md->md_rspt_ptr) { + lnet_res_unlock(cpt); + return; + } + rspt = md->md_rspt_ptr; + md->md_rspt_ptr = NULL; + + /* debug code */ + LASSERT(rspt->rspt_cpt == cpt); + + /* + * invalidate the handle to indicate that a response has been + * received, which will then lead the monitor thread to clean up + * the rspt block. + */ + LNetInvalidateMDHandle(&rspt->rspt_mdh); + lnet_res_unlock(cpt); +} + +static void +lnet_finalize_expired_responses(bool force) +{ + struct lnet_libmd *md; + struct list_head local_queue; + struct lnet_rsp_tracker *rspt, *tmp; + int i; + + if (the_lnet.ln_mt_rstq == NULL) + return; + + cfs_cpt_for_each(i, lnet_cpt_table()) { + INIT_LIST_HEAD(&local_queue); + + lnet_net_lock(i); + if (!the_lnet.ln_mt_rstq[i]) { + lnet_net_unlock(i); + continue; + } + list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue); + lnet_net_unlock(i); + + list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) { + /* + * The rspt mdh will be invalidated when a response + * is received or whenever we want to discard the + * block the monitor thread will walk the queue + * and clean up any rsts with an invalid mdh. + * The monitor thread will walk the queue until + * the first unexpired rspt block. This means that + * some rspt blocks which received their + * corresponding responses will linger in the + * queue until they are cleaned up eventually. + */ + lnet_res_lock(i); + if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) { + lnet_res_unlock(i); + list_del_init(&rspt->rspt_on_list); + lnet_rspt_free(rspt, i); + continue; + } + + if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 || + force) { + md = lnet_handle2md(&rspt->rspt_mdh); + if (!md) { + LNetInvalidateMDHandle(&rspt->rspt_mdh); + lnet_res_unlock(i); + list_del_init(&rspt->rspt_on_list); + lnet_rspt_free(rspt, i); + continue; + } + LASSERT(md->md_rspt_ptr == rspt); + md->md_rspt_ptr = NULL; + lnet_res_unlock(i); + + lnet_net_lock(i); + the_lnet.ln_counters[i]->response_timeout_count++; + lnet_net_unlock(i); + + list_del_init(&rspt->rspt_on_list); + + CDEBUG(D_NET, "Response timed out: md = %p\n", md); + LNetMDUnlink(rspt->rspt_mdh); + lnet_rspt_free(rspt, i); + } else { + lnet_res_unlock(i); + break; + } + } + + lnet_net_lock(i); + if (!list_empty(&local_queue)) + list_splice(&local_queue, the_lnet.ln_mt_rstq[i]); + lnet_net_unlock(i); + } +} + +static void +lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt) +{ + struct lnet_msg *msg; + + while (!list_empty(resendq)) { + struct lnet_peer_ni *lpni; + + msg = list_entry(resendq->next, struct lnet_msg, + msg_list); + + list_del_init(&msg->msg_list); + + lpni = lnet_find_peer_ni_locked(msg->msg_hdr.dest_nid); + if (!lpni) { + lnet_net_unlock(cpt); + CERROR("Expected that a peer is already created for %s\n", + libcfs_nid2str(msg->msg_hdr.dest_nid)); + msg->msg_no_resend = true; + lnet_finalize(msg, -EFAULT); + lnet_net_lock(cpt); + } else { + struct lnet_peer *peer; + int rc; + lnet_nid_t src_nid = LNET_NID_ANY; + + /* + * if this message is not being routed and the + * peer is non-MR then we must use the same + * src_nid that was used in the original send. + * Otherwise if we're routing the message (IE + * we're a router) then we can use any of our + * local interfaces. It doesn't matter to the + * final destination. + */ + peer = lpni->lpni_peer_net->lpn_peer; + if (!msg->msg_routing && + !lnet_peer_is_multi_rail(peer)) + src_nid = le64_to_cpu(msg->msg_hdr.src_nid); + + /* + * If we originally specified a src NID, then we + * must attempt to reuse it in the resend as well. + */ + if (msg->msg_src_nid_param != LNET_NID_ANY) + src_nid = msg->msg_src_nid_param; + lnet_peer_ni_decref_locked(lpni); + + lnet_net_unlock(cpt); + CDEBUG(D_NET, "resending %s->%s: %s recovery %d\n", + libcfs_nid2str(src_nid), + libcfs_id2str(msg->msg_target), + lnet_msgtyp2str(msg->msg_type), + msg->msg_recovery); + rc = lnet_send(src_nid, msg, LNET_NID_ANY); + if (rc) { + CERROR("Error sending %s to %s: %d\n", + lnet_msgtyp2str(msg->msg_type), + libcfs_id2str(msg->msg_target), rc); + msg->msg_no_resend = true; + lnet_finalize(msg, rc); + } + lnet_net_lock(cpt); + if (!rc) + the_lnet.ln_counters[cpt]->resend_count++; + } + } +} + +static void +lnet_resend_pending_msgs(void) +{ + int i; + + cfs_cpt_for_each(i, lnet_cpt_table()) { + lnet_net_lock(i); + lnet_resend_pending_msgs_locked(the_lnet.ln_mt_resendqs[i], i); + lnet_net_unlock(i); + } +} + +/* called with cpt and ni_lock held */ +static void +lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt) +{ + struct lnet_handle_md recovery_mdh; + + LNetInvalidateMDHandle(&recovery_mdh); + + if (ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING) { + recovery_mdh = ni->ni_ping_mdh; + LNetInvalidateMDHandle(&ni->ni_ping_mdh); + } + lnet_ni_unlock(ni); + lnet_net_unlock(cpt); + if (!LNetMDHandleIsInvalid(recovery_mdh)) + LNetMDUnlink(recovery_mdh); + lnet_net_lock(cpt); + lnet_ni_lock(ni); +} + +static void +lnet_recover_local_nis(void) +{ + struct lnet_mt_event_info *ev_info; + struct list_head processed_list; + struct list_head local_queue; + struct lnet_handle_md mdh; + struct lnet_ni *tmp; + struct lnet_ni *ni; + lnet_nid_t nid; + int healthv; + int rc; + + INIT_LIST_HEAD(&local_queue); + INIT_LIST_HEAD(&processed_list); + + /* + * splice the recovery queue on a local queue. We will iterate + * through the local queue and update it as needed. Once we're + * done with the traversal, we'll splice the local queue back on + * the head of the ln_mt_localNIRecovq. Any newly added local NIs + * will be traversed in the next iteration. + */ + lnet_net_lock(0); + list_splice_init(&the_lnet.ln_mt_localNIRecovq, + &local_queue); + lnet_net_unlock(0); + + list_for_each_entry_safe(ni, tmp, &local_queue, ni_recovery) { + /* + * if an NI is being deleted or it is now healthy, there + * is no need to keep it around in the recovery queue. + * The monitor thread is the only thread responsible for + * removing the NI from the recovery queue. + * Multiple threads can be adding NIs to the recovery + * queue. + */ + healthv = atomic_read(&ni->ni_healthv); + + lnet_net_lock(0); + lnet_ni_lock(ni); + if (!(ni->ni_state & LNET_NI_STATE_ACTIVE) || + healthv == LNET_MAX_HEALTH_VALUE) { + list_del_init(&ni->ni_recovery); + lnet_unlink_ni_recovery_mdh_locked(ni, 0); + lnet_ni_unlock(ni); + lnet_ni_decref_locked(ni, 0); + lnet_net_unlock(0); + continue; + } + lnet_ni_unlock(ni); + lnet_net_unlock(0); + + + CDEBUG(D_NET, "attempting to recover local ni: %s\n", + libcfs_nid2str(ni->ni_nid)); + + lnet_ni_lock(ni); + if (!(ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING)) { + ni->ni_state |= LNET_NI_STATE_RECOVERY_PENDING; + lnet_ni_unlock(ni); + + LIBCFS_ALLOC(ev_info, sizeof(*ev_info)); + if (!ev_info) { + CERROR("out of memory. Can't recover %s\n", + libcfs_nid2str(ni->ni_nid)); + lnet_ni_lock(ni); + ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING; + lnet_ni_unlock(ni); + continue; + } + + mdh = ni->ni_ping_mdh; + /* + * Invalidate the ni mdh in case it's deleted. + * We'll unlink the mdh in this case below. + */ + LNetInvalidateMDHandle(&ni->ni_ping_mdh); + nid = ni->ni_nid; + + /* + * remove the NI from the local queue and drop the + * reference count to it while we're recovering + * it. The reason for that, is that the NI could + * be deleted, and the way the code is structured + * is if we don't drop the NI, then the deletion + * code will enter a loop waiting for the + * reference count to be removed while holding the + * ln_mutex_lock(). When we look up the peer to + * send to in lnet_select_pathway() we will try to + * lock the ln_mutex_lock() as well, leading to + * a deadlock. By dropping the refcount and + * removing it from the list, we allow for the NI + * to be removed, then we use the cached NID to + * look it up again. If it's gone, then we just + * continue examining the rest of the queue. + */ + lnet_net_lock(0); + list_del_init(&ni->ni_recovery); + lnet_ni_decref_locked(ni, 0); + lnet_net_unlock(0); + + ev_info->mt_type = MT_TYPE_LOCAL_NI; + ev_info->mt_nid = nid; + rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN, + ev_info, the_lnet.ln_mt_eqh, true); + /* lookup the nid again */ + lnet_net_lock(0); + ni = lnet_nid2ni_locked(nid, 0); + if (!ni) { + /* + * the NI has been deleted when we dropped + * the ref count + */ + lnet_net_unlock(0); + LNetMDUnlink(mdh); + continue; + } + /* + * Same note as in lnet_recover_peer_nis(). When + * we're sending the ping, the NI is free to be + * deleted or manipulated. By this point it + * could've been added back on the recovery queue, + * and a refcount taken on it. + * So we can't just add it blindly again or we'll + * corrupt the queue. We must check under lock if + * it's not on any list and if not then add it + * to the processed list, which will eventually be + * spliced back on to the recovery queue. + */ + ni->ni_ping_mdh = mdh; + if (list_empty(&ni->ni_recovery)) { + list_add_tail(&ni->ni_recovery, &processed_list); + lnet_ni_addref_locked(ni, 0); + } + lnet_net_unlock(0); + + lnet_ni_lock(ni); + if (rc) + ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING; + } + lnet_ni_unlock(ni); + } + + /* + * put back the remaining NIs on the ln_mt_localNIRecovq to be + * reexamined in the next iteration. + */ + list_splice_init(&processed_list, &local_queue); + lnet_net_lock(0); + list_splice(&local_queue, &the_lnet.ln_mt_localNIRecovq); + lnet_net_unlock(0); +} + +static struct list_head ** +lnet_create_array_of_queues(void) +{ + struct list_head **qs; + struct list_head *q; + int i; + + qs = cfs_percpt_alloc(lnet_cpt_table(), + sizeof(struct list_head)); + if (!qs) { + CERROR("Failed to allocate queues\n"); + return NULL; + } + + cfs_percpt_for_each(q, i, qs) + INIT_LIST_HEAD(q); + + return qs; +} + +static int +lnet_resendqs_create(void) +{ + struct list_head **resendqs; + resendqs = lnet_create_array_of_queues(); + + if (!resendqs) + return -ENOMEM; + + lnet_net_lock(LNET_LOCK_EX); + the_lnet.ln_mt_resendqs = resendqs; + lnet_net_unlock(LNET_LOCK_EX); + + return 0; +} + +static void +lnet_clean_local_ni_recoveryq(void) +{ + struct lnet_ni *ni; + + /* This is only called when the monitor thread has stopped */ + lnet_net_lock(0); + + while (!list_empty(&the_lnet.ln_mt_localNIRecovq)) { + ni = list_entry(the_lnet.ln_mt_localNIRecovq.next, + struct lnet_ni, ni_recovery); + list_del_init(&ni->ni_recovery); + lnet_ni_lock(ni); + lnet_unlink_ni_recovery_mdh_locked(ni, 0); + lnet_ni_unlock(ni); + lnet_ni_decref_locked(ni, 0); + } + + lnet_net_unlock(0); +} + +static void +lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt) +{ + struct lnet_handle_md recovery_mdh; + + LNetInvalidateMDHandle(&recovery_mdh); + + if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) { + recovery_mdh = lpni->lpni_recovery_ping_mdh; + LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh); + } + spin_unlock(&lpni->lpni_lock); + lnet_net_unlock(cpt); + if (!LNetMDHandleIsInvalid(recovery_mdh)) + LNetMDUnlink(recovery_mdh); + lnet_net_lock(cpt); + spin_lock(&lpni->lpni_lock); +} + +static void +lnet_clean_peer_ni_recoveryq(void) +{ + struct lnet_peer_ni *lpni, *tmp; - /* - * grab a reference on the peer_ni so it sticks around even if - * we need to drop and relock the lnet_net_lock below. - */ - lnet_peer_ni_addref_locked(best_lpni); + lnet_net_lock(LNET_LOCK_EX); - /* - * Use lnet_cpt_of_nid() to determine the CPT used to commit the - * message. This ensures that we get a CPT that is correct for - * the NI when the NI has been restricted to a subset of all CPTs. - * If the selected CPT differs from the one currently locked, we - * must unlock and relock the lnet_net_lock(), and then check whether - * the configuration has changed. We don't have a hold on the best_ni - * yet, and it may have vanished. - */ - cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni); - if (cpt != cpt2) { - lnet_net_unlock(cpt); - cpt = cpt2; - lnet_net_lock(cpt); - if (seq != lnet_get_dlc_seq_locked()) { - lnet_peer_ni_decref_locked(best_lpni); - goto again; + list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_mt_peerNIRecovq, + lpni_recovery) { + list_del_init(&lpni->lpni_recovery); + spin_lock(&lpni->lpni_lock); + lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX); + spin_unlock(&lpni->lpni_lock); + lnet_peer_ni_decref_locked(lpni); + } + + lnet_net_unlock(LNET_LOCK_EX); +} + +static void +lnet_clean_resendqs(void) +{ + struct lnet_msg *msg, *tmp; + struct list_head msgs; + int i; + + INIT_LIST_HEAD(&msgs); + + cfs_cpt_for_each(i, lnet_cpt_table()) { + lnet_net_lock(i); + list_splice_init(the_lnet.ln_mt_resendqs[i], &msgs); + lnet_net_unlock(i); + list_for_each_entry_safe(msg, tmp, &msgs, msg_list) { + list_del_init(&msg->msg_list); + msg->msg_no_resend = true; + lnet_finalize(msg, -ESHUTDOWN); } } - /* - * store the best_lpni in the message right away to avoid having - * to do the same operation under different conditions - */ - msg->msg_txpeer = best_lpni; - msg->msg_txni = best_ni; + cfs_percpt_free(the_lnet.ln_mt_resendqs); +} - /* - * grab a reference for the best_ni since now it's in use in this - * send. the reference will need to be dropped when the message is - * finished in lnet_finalize() - */ - lnet_ni_addref_locked(msg->msg_txni, cpt); +static void +lnet_recover_peer_nis(void) +{ + struct lnet_mt_event_info *ev_info; + struct list_head processed_list; + struct list_head local_queue; + struct lnet_handle_md mdh; + struct lnet_peer_ni *lpni; + struct lnet_peer_ni *tmp; + lnet_nid_t nid; + int healthv; + int rc; - /* - * Always set the target.nid to the best peer picked. Either the - * nid will be one of the preconfigured NIDs, or the same NID as - * what was originally set in the target or it will be the NID of - * a router if this message should be routed - */ - msg->msg_target.nid = msg->msg_txpeer->lpni_nid; + INIT_LIST_HEAD(&local_queue); + INIT_LIST_HEAD(&processed_list); /* - * lnet_msg_commit assigns the correct cpt to the message, which - * is used to decrement the correct refcount on the ni when it's - * time to return the credits + * Always use cpt 0 for locking across all interactions with + * ln_mt_peerNIRecovq */ - lnet_msg_commit(msg, cpt); + lnet_net_lock(0); + list_splice_init(&the_lnet.ln_mt_peerNIRecovq, + &local_queue); + lnet_net_unlock(0); + + list_for_each_entry_safe(lpni, tmp, &local_queue, + lpni_recovery) { + /* + * The same protection strategy is used here as is in the + * local recovery case. + */ + lnet_net_lock(0); + healthv = atomic_read(&lpni->lpni_healthv); + spin_lock(&lpni->lpni_lock); + if (lpni->lpni_state & LNET_PEER_NI_DELETING || + healthv == LNET_MAX_HEALTH_VALUE) { + list_del_init(&lpni->lpni_recovery); + lnet_unlink_lpni_recovery_mdh_locked(lpni, 0); + spin_unlock(&lpni->lpni_lock); + lnet_peer_ni_decref_locked(lpni); + lnet_net_unlock(0); + continue; + } + spin_unlock(&lpni->lpni_lock); + lnet_net_unlock(0); + + /* + * NOTE: we're racing with peer deletion from user space. + * It's possible that a peer is deleted after we check its + * state. In this case the recovery can create a new peer + */ + spin_lock(&lpni->lpni_lock); + if (!(lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) && + !(lpni->lpni_state & LNET_PEER_NI_DELETING)) { + lpni->lpni_state |= LNET_PEER_NI_RECOVERY_PENDING; + spin_unlock(&lpni->lpni_lock); + + LIBCFS_ALLOC(ev_info, sizeof(*ev_info)); + if (!ev_info) { + CERROR("out of memory. Can't recover %s\n", + libcfs_nid2str(lpni->lpni_nid)); + spin_lock(&lpni->lpni_lock); + lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING; + spin_unlock(&lpni->lpni_lock); + continue; + } + + /* look at the comments in lnet_recover_local_nis() */ + mdh = lpni->lpni_recovery_ping_mdh; + LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh); + nid = lpni->lpni_nid; + lnet_net_lock(0); + list_del_init(&lpni->lpni_recovery); + lnet_peer_ni_decref_locked(lpni); + lnet_net_unlock(0); + + ev_info->mt_type = MT_TYPE_PEER_NI; + ev_info->mt_nid = nid; + rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN, + ev_info, the_lnet.ln_mt_eqh, true); + lnet_net_lock(0); + /* + * lnet_find_peer_ni_locked() grabs a refcount for + * us. No need to take it explicitly. + */ + lpni = lnet_find_peer_ni_locked(nid); + if (!lpni) { + lnet_net_unlock(0); + LNetMDUnlink(mdh); + continue; + } + + lpni->lpni_recovery_ping_mdh = mdh; + /* + * While we're unlocked the lpni could've been + * readded on the recovery queue. In this case we + * don't need to add it to the local queue, since + * it's already on there and the thread that added + * it would've incremented the refcount on the + * peer, which means we need to decref the refcount + * that was implicitly grabbed by find_peer_ni_locked. + * Otherwise, if the lpni is still not on + * the recovery queue, then we'll add it to the + * processed list. + */ + if (list_empty(&lpni->lpni_recovery)) + list_add_tail(&lpni->lpni_recovery, &processed_list); + else + lnet_peer_ni_decref_locked(lpni); + lnet_net_unlock(0); + + spin_lock(&lpni->lpni_lock); + if (rc) + lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING; + } + spin_unlock(&lpni->lpni_lock); + } + + list_splice_init(&processed_list, &local_queue); + lnet_net_lock(0); + list_splice(&local_queue, &the_lnet.ln_mt_peerNIRecovq); + lnet_net_unlock(0); +} + +static int +lnet_monitor_thread(void *arg) +{ + int wakeup_counter = 0; /* - * If we are routing the message then we don't need to overwrite - * the src_nid since it would've been set at the origin. Otherwise - * we are the originator so we need to set it. + * The monitor thread takes care of the following: + * 1. Checks the aliveness of routers + * 2. Checks if there are messages on the resend queue to resend + * them. + * 3. Check if there are any NIs on the local recovery queue and + * pings them + * 4. Checks if there are any NIs on the remote recovery queue + * and pings them. */ - if (!msg->msg_routing) - msg->msg_hdr.src_nid = cpu_to_le64(msg->msg_txni->ni_nid); + cfs_block_allsigs(); + + while (the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING) { + if (lnet_router_checker_active()) + lnet_check_routers(); + + lnet_resend_pending_msgs(); + + wakeup_counter++; + if (wakeup_counter >= lnet_transaction_timeout / 2) { + lnet_finalize_expired_responses(false); + wakeup_counter = 0; + } + + lnet_recover_local_nis(); + + lnet_recover_peer_nis(); - if (routing) { - msg->msg_target_is_router = 1; - msg->msg_target.pid = LNET_PID_LUSTRE; /* - * since we're routing we want to ensure that the - * msg_hdr.dest_nid is set to the final destination. When - * the router receives this message it knows how to route - * it. + * TODO do we need to check if we should sleep without + * timeout? Technically, an active system will always + * have messages in flight so this check will always + * evaluate to false. And on an idle system do we care + * if we wake up every 1 second? Although, we've seen + * cases where we get a complaint that an idle thread + * is waking up unnecessarily. */ - msg->msg_hdr.dest_nid = - cpu_to_le64(final_dst ? final_dst->lpni_nid : dst_nid); - } else { + wait_event_interruptible_timeout(the_lnet.ln_mt_waitq, + false, + cfs_time_seconds(1)); + } + + /* clean up the router checker */ + lnet_prune_rc_data(1); + + /* Shutting down */ + the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN; + + /* signal that the monitor thread is exiting */ + up(&the_lnet.ln_mt_signal); + + return 0; +} + +/* + * lnet_send_ping + * Sends a ping. + * Returns == 0 if success + * Returns > 0 if LNetMDBind or prior fails + * Returns < 0 if LNetGet fails + */ +int +lnet_send_ping(lnet_nid_t dest_nid, + struct lnet_handle_md *mdh, int nnis, + void *user_data, struct lnet_handle_eq eqh, bool recovery) +{ + struct lnet_md md = { NULL }; + struct lnet_process_id id; + struct lnet_ping_buffer *pbuf; + int rc; + + if (dest_nid == LNET_NID_ANY) { + rc = -EHOSTUNREACH; + goto fail_error; + } + + pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS); + if (!pbuf) { + rc = ENOMEM; + goto fail_error; + } + + /* initialize md content */ + md.start = &pbuf->pb_info; + md.length = LNET_PING_INFO_SIZE(nnis); + md.threshold = 2; /* GET/REPLY */ + md.max_size = 0; + md.options = LNET_MD_TRUNCATE; + md.user_ptr = user_data; + md.eq_handle = eqh; + + rc = LNetMDBind(md, LNET_UNLINK, mdh); + if (rc) { + lnet_ping_buffer_decref(pbuf); + CERROR("Can't bind MD: %d\n", rc); + rc = -rc; /* change the rc to positive */ + goto fail_error; + } + id.pid = LNET_PID_LUSTRE; + id.nid = dest_nid; + + rc = LNetGet(LNET_NID_ANY, *mdh, id, + LNET_RESERVED_PORTAL, + LNET_PROTO_PING_MATCHBITS, 0, recovery); + + if (rc) + goto fail_unlink_md; + + return 0; + +fail_unlink_md: + LNetMDUnlink(*mdh); + LNetInvalidateMDHandle(mdh); +fail_error: + return rc; +} + +static void +lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info, + int status) +{ + lnet_nid_t nid = ev_info->mt_nid; + + if (ev_info->mt_type == MT_TYPE_LOCAL_NI) { + struct lnet_ni *ni; + + lnet_net_lock(0); + ni = lnet_nid2ni_locked(nid, 0); + if (!ni) { + lnet_net_unlock(0); + return; + } + lnet_ni_lock(ni); + ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING; + lnet_ni_unlock(ni); + lnet_net_unlock(0); + + if (status != 0) { + CERROR("local NI recovery failed with %d\n", status); + return; + } /* - * if we're not routing set the dest_nid to the best peer - * ni that we picked earlier in the algorithm. + * need to increment healthv for the ni here, because in + * the lnet_finalize() path we don't have access to this + * NI. And in order to get access to it, we'll need to + * carry forward too much information. + * In the peer case, it'll naturally be incremented */ - msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid); + lnet_inc_healthv(&ni->ni_healthv); + } else { + struct lnet_peer_ni *lpni; + int cpt; + + cpt = lnet_net_lock_current(); + lpni = lnet_find_peer_ni_locked(nid); + if (!lpni) { + lnet_net_unlock(cpt); + return; + } + spin_lock(&lpni->lpni_lock); + lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING; + spin_unlock(&lpni->lpni_lock); + lnet_peer_ni_decref_locked(lpni); + lnet_net_unlock(cpt); + + if (status != 0) + CERROR("peer NI recovery failed with %d\n", status); } +} - rc = lnet_post_send_locked(msg, 0); +static void +lnet_mt_event_handler(struct lnet_event *event) +{ + struct lnet_mt_event_info *ev_info = event->md.user_ptr; + struct lnet_ping_buffer *pbuf; + + /* TODO: remove assert */ + LASSERT(event->type == LNET_EVENT_REPLY || + event->type == LNET_EVENT_SEND || + event->type == LNET_EVENT_UNLINK); + + CDEBUG(D_NET, "Received event: %d status: %d\n", event->type, + event->status); + + switch (event->type) { + case LNET_EVENT_UNLINK: + CDEBUG(D_NET, "%s recovery ping unlinked\n", + libcfs_nid2str(ev_info->mt_nid)); + case LNET_EVENT_REPLY: + lnet_handle_recovery_reply(ev_info, event->status); + break; + case LNET_EVENT_SEND: + CDEBUG(D_NET, "%s recovery message sent %s:%d\n", + libcfs_nid2str(ev_info->mt_nid), + (event->status) ? "unsuccessfully" : + "successfully", event->status); + break; + default: + CERROR("Unexpected event: %d\n", event->type); + break; + } + if (event->unlinked) { + LIBCFS_FREE(ev_info, sizeof(*ev_info)); + pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start); + lnet_ping_buffer_decref(pbuf); + } +} - lnet_net_unlock(cpt); +static int +lnet_rsp_tracker_create(void) +{ + struct list_head **rstqs; + rstqs = lnet_create_array_of_queues(); - return rc; + if (!rstqs) + return -ENOMEM; + + the_lnet.ln_mt_rstq = rstqs; + + return 0; } -int -lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid) +static void +lnet_rsp_tracker_clean(void) { - lnet_nid_t dst_nid = msg->msg_target.nid; - int rc; + lnet_finalize_expired_responses(true); - /* - * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases, - * but we might want to use pre-determined router for ACK/REPLY - * in the future - */ - /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */ - LASSERT (msg->msg_txpeer == NULL); - LASSERT (!msg->msg_sending); - LASSERT (!msg->msg_target_is_router); - LASSERT (!msg->msg_receiving); + cfs_percpt_free(the_lnet.ln_mt_rstq); + the_lnet.ln_mt_rstq = NULL; +} - msg->msg_sending = 1; +int lnet_monitor_thr_start(void) +{ + int rc = 0; + struct task_struct *task; - LASSERT(!msg->msg_tx_committed); + if (the_lnet.ln_mt_state != LNET_MT_STATE_SHUTDOWN) + return -EALREADY; - rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid); - if (rc < 0) + rc = lnet_resendqs_create(); + if (rc) return rc; - if (rc == LNET_CREDIT_OK) - lnet_ni_send(msg->msg_txni, msg); + rc = lnet_rsp_tracker_create(); + if (rc) + goto clean_queues; + + rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh); + if (rc != 0) { + CERROR("Can't allocate monitor thread EQ: %d\n", rc); + goto clean_queues; + } + + /* Pre monitor thread start processing */ + rc = lnet_router_pre_mt_start(); + if (rc) + goto free_mem; + + sema_init(&the_lnet.ln_mt_signal, 0); + + the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING; + task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CERROR("Can't start monitor thread: %d\n", rc); + goto clean_thread; + } + + /* post monitor thread start processing */ + lnet_router_post_mt_start(); - /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT */ return 0; + +clean_thread: + the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING; + /* block until event callback signals exit */ + down(&the_lnet.ln_mt_signal); + /* clean up */ + lnet_router_cleanup(); +free_mem: + the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN; + lnet_rsp_tracker_clean(); + lnet_clean_local_ni_recoveryq(); + lnet_clean_peer_ni_recoveryq(); + lnet_clean_resendqs(); + LNetEQFree(the_lnet.ln_mt_eqh); + LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh); + return rc; +clean_queues: + lnet_rsp_tracker_clean(); + lnet_clean_local_ni_recoveryq(); + lnet_clean_peer_ni_recoveryq(); + lnet_clean_resendqs(); + return rc; +} + +void lnet_monitor_thr_stop(void) +{ + int rc; + + if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN) + return; + + LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING); + the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING; + + /* tell the monitor thread that we're shutting down */ + wake_up(&the_lnet.ln_mt_waitq); + + /* block until monitor thread signals that it's done */ + down(&the_lnet.ln_mt_signal); + LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN); + + /* perform cleanup tasks */ + lnet_router_cleanup(); + lnet_rsp_tracker_clean(); + lnet_clean_local_ni_recoveryq(); + lnet_clean_peer_ni_recoveryq(); + lnet_clean_resendqs(); + rc = LNetEQFree(the_lnet.ln_mt_eqh); + LASSERT(rc == 0); + return; } void -lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob) +lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob, + __u32 msg_type) { lnet_net_lock(cpt); + lnet_incr_stats(&ni->ni_stats, msg_type, LNET_STATS_TYPE_DROP); the_lnet.ln_counters[cpt]->drop_count++; the_lnet.ln_counters[cpt]->drop_length += nob; lnet_net_unlock(cpt); @@ -2102,13 +3749,13 @@ lnet_parse_get(struct lnet_ni *ni, struct lnet_msg *msg, int rdma_get) static int lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg) { - void *private = msg->msg_private; - struct lnet_hdr *hdr = &msg->msg_hdr; + void *private = msg->msg_private; + struct lnet_hdr *hdr = &msg->msg_hdr; struct lnet_process_id src = {0}; - struct lnet_libmd *md; - int rlength; - int mlength; - int cpt; + struct lnet_libmd *md; + int rlength; + int mlength; + int cpt; cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie); lnet_res_lock(cpt); @@ -2169,10 +3816,10 @@ lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg) static int lnet_parse_ack(struct lnet_ni *ni, struct lnet_msg *msg) { - struct lnet_hdr *hdr = &msg->msg_hdr; + struct lnet_hdr *hdr = &msg->msg_hdr; struct lnet_process_id src = {0}; - struct lnet_libmd *md; - int cpt; + struct lnet_libmd *md; + int cpt; src.nid = hdr->src_nid; src.pid = hdr->src_pid; @@ -2379,6 +4026,13 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid, for_me = (ni->ni_nid == dest_nid); cpt = lnet_cpt_of_nid(from_nid, ni); + CDEBUG(D_NET, "TRACE: %s(%s) <- %s : %s - %s\n", + libcfs_nid2str(dest_nid), + libcfs_nid2str(ni->ni_nid), + libcfs_nid2str(src_nid), + lnet_msgtyp2str(type), + (for_me) ? "for me" : "routed"); + switch (type) { case LNET_MSG_ACK: case LNET_MSG_GET: @@ -2481,7 +4135,7 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid, } if (!list_empty(&the_lnet.ln_drop_rules) && - lnet_drop_rule_match(hdr)) { + lnet_drop_rule_match(hdr, NULL)) { CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate" "silent message loss\n", libcfs_nid2str(from_nid), libcfs_nid2str(src_nid), @@ -2524,11 +4178,9 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid, msg->msg_hdr.dest_pid = dest_pid; msg->msg_hdr.payload_length = payload_length; } - /* Multi-Rail: Primary NID of source. */ - msg->msg_initiator = lnet_peer_primary_nid(src_nid); lnet_net_lock(cpt); - lpni = lnet_nid2peerni_locked(from_nid, cpt); + lpni = lnet_nid2peerni_locked(from_nid, ni->ni_nid, cpt); if (IS_ERR(lpni)) { lnet_net_unlock(cpt); CERROR("%s, src %s: Dropping %s " @@ -2544,6 +4196,8 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid, msg->msg_rxpeer = lpni; msg->msg_rxni = ni; lnet_ni_addref_locked(ni, cpt); + /* Multi-Rail: Primary NID of source. */ + msg->msg_initiator = lnet_peer_primary_nid_locked(src_nid); if (lnet_isrouter(msg->msg_rxpeer)) { lnet_peer_set_alive(msg->msg_rxpeer); @@ -2593,7 +4247,7 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid, lnet_finalize(msg, rc); drop: - lnet_drop_message(ni, cpt, private, payload_length); + lnet_drop_message(ni, cpt, private, payload_length, type); return 0; } EXPORT_SYMBOL(lnet_parse); @@ -2629,7 +4283,10 @@ lnet_drop_delayed_msg_list(struct list_head *head, char *reason) * until that's done */ lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt, - msg->msg_private, msg->msg_len); + msg->msg_private, msg->msg_len, + msg->msg_type); + + msg->msg_no_resend = true; /* * NB: message will not generate event because w/o attached MD, * but we still should give error code so lnet_msg_decommit() @@ -2672,6 +4329,43 @@ lnet_recv_delayed_msg_list(struct list_head *head) } } +static void +lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt, + struct lnet_libmd *md, struct lnet_handle_md mdh) +{ + s64 timeout_ns; + + /* + * MD has a refcount taken by message so it's not going away. + * The MD however can be looked up. We need to secure the access + * to the md_rspt_ptr by taking the res_lock. + * The rspt can be accessed without protection up to when it gets + * added to the list. + */ + + /* debug code */ + LASSERT(md->md_rspt_ptr == NULL); + + /* we'll use that same event in case we never get a response */ + rspt->rspt_mdh = mdh; + rspt->rspt_cpt = cpt; + timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC; + rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns); + + lnet_res_lock(cpt); + /* store the rspt so we can access it when we get the REPLY */ + md->md_rspt_ptr = rspt; + lnet_res_unlock(cpt); + + /* + * add to the list of tracked responses. It's added to tail of the + * list in order to expire all the older entries first. + */ + lnet_net_lock(cpt); + list_add_tail(&rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]); + lnet_net_unlock(cpt); +} + /** * Initiate an asynchronous PUT operation. * @@ -2722,10 +4416,11 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack, __u64 match_bits, unsigned int offset, __u64 hdr_data) { - struct lnet_msg *msg; - struct lnet_libmd *md; - int cpt; - int rc; + struct lnet_msg *msg; + struct lnet_libmd *md; + int cpt; + int rc; + struct lnet_rsp_tracker *rspt = NULL; LASSERT(the_lnet.ln_refcount > 0); @@ -2745,6 +4440,17 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack, msg->msg_vmflush = !!memory_pressure_get(); cpt = lnet_cpt_of_cookie(mdh.cookie); + + if (ack == LNET_ACK_REQ) { + rspt = lnet_rspt_alloc(cpt); + if (!rspt) { + CERROR("Dropping PUT to %s: ENOMEM on response tracker\n", + libcfs_id2str(target)); + return -ENOMEM; + } + INIT_LIST_HEAD(&rspt->rspt_on_list); + } + lnet_res_lock(cpt); md = lnet_handle2md(&mdh); @@ -2757,6 +4463,7 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack, md->md_me->me_portal); lnet_res_unlock(cpt); + LIBCFS_FREE(rspt, sizeof(*rspt)); lnet_msg_free(msg); return -ENOENT; } @@ -2789,10 +4496,15 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack, lnet_build_msg_event(msg, LNET_EVENT_SEND); + if (ack == LNET_ACK_REQ) + lnet_attach_rsp_tracker(rspt, cpt, md, mdh); + rc = lnet_send(self, msg, LNET_NID_ANY); if (rc != 0) { CNETERR("Error sending PUT to %s: %d\n", libcfs_id2str(target), rc); + msg->msg_no_resend = true; + lnet_detach_rsp_tracker(msg->msg_md, cpt); lnet_finalize(msg, rc); } @@ -2845,8 +4557,7 @@ lnet_create_reply_msg(struct lnet_ni *ni, struct lnet_msg *getmsg) libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd); /* setup information for lnet_build_msg_event */ - msg->msg_initiator = lnet_peer_primary_nid(peer_id.nid); - /* Cheaper: msg->msg_initiator = getmsg->msg_txpeer->lp_nid; */ + msg->msg_initiator = getmsg->msg_txpeer->lpni_peer_net->lpn_peer->lp_primary_nid; msg->msg_from = peer_id.nid; msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */ msg->msg_hdr.src_nid = peer_id.nid; @@ -2870,6 +4581,7 @@ lnet_create_reply_msg(struct lnet_ni *ni, struct lnet_msg *getmsg) cpt = lnet_cpt_of_nid(peer_id.nid, ni); lnet_net_lock(cpt); + lnet_incr_stats(&ni->ni_stats, LNET_MSG_GET, LNET_STATS_TYPE_DROP); the_lnet.ln_counters[cpt]->drop_count++; the_lnet.ln_counters[cpt]->drop_length += getmd->md_length; lnet_net_unlock(cpt); @@ -2922,12 +4634,13 @@ EXPORT_SYMBOL(lnet_set_reply_msg_len); int LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, struct lnet_process_id target, unsigned int portal, - __u64 match_bits, unsigned int offset) + __u64 match_bits, unsigned int offset, bool recovery) { - struct lnet_msg *msg; - struct lnet_libmd *md; - int cpt; - int rc; + struct lnet_msg *msg; + struct lnet_libmd *md; + struct lnet_rsp_tracker *rspt; + int cpt; + int rc; LASSERT(the_lnet.ln_refcount > 0); @@ -2940,13 +4653,24 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, } msg = lnet_msg_alloc(); - if (msg == NULL) { + if (!msg) { CERROR("Dropping GET to %s: ENOMEM on struct lnet_msg\n", libcfs_id2str(target)); return -ENOMEM; } cpt = lnet_cpt_of_cookie(mdh.cookie); + + rspt = lnet_rspt_alloc(cpt); + if (!rspt) { + CERROR("Dropping GET to %s: ENOMEM on response tracker\n", + libcfs_id2str(target)); + return -ENOMEM; + } + INIT_LIST_HEAD(&rspt->rspt_on_list); + + msg->msg_recovery = recovery; + lnet_res_lock(cpt); md = lnet_handle2md(&mdh); @@ -2961,6 +4685,7 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, lnet_res_unlock(cpt); lnet_msg_free(msg); + LIBCFS_FREE(rspt, sizeof(*rspt)); return -ENOENT; } @@ -2985,10 +4710,14 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh, lnet_build_msg_event(msg, LNET_EVENT_SEND); + lnet_attach_rsp_tracker(rspt, cpt, md, mdh); + rc = lnet_send(self, msg, LNET_NID_ANY); if (rc < 0) { CNETERR("Error sending GET to %s: %d\n", libcfs_id2str(target), rc); + msg->msg_no_resend = true; + lnet_detach_rsp_tracker(msg->msg_md, cpt); lnet_finalize(msg, rc); }