Whamcloud - gitweb
LU-11297 lnet: MR Routing Feature 83/34983/3
authorAmir Shehata <ashehata@whamcloud.com>
Fri, 7 Jun 2019 18:35:09 +0000 (14:35 -0400)
committerOleg Drokin <green@whamcloud.com>
Fri, 7 Jun 2019 18:38:06 +0000 (14:38 -0400)
This is a merge commit from the multi-rail branch. It brings in
the MR Routing feature. This feature aligns the LNET Multi-Rail
behavior with routing. A gateway now is viewed as a Multi-Rail
capable node. When a route is added only one entry per gateway
should be used. That route entry should use the primary-nid of
the gateway. The multi-rail selection algorithm is then run when
sending to the gateway to select the best interface to send to.

Furthermore the gateway aliveness is now kept via the health
mechanism. And the gateway pinger now uses discovery instead
of maintaining its own pinger handler.

Signed-off-by: Amir Shehata <ashehata@whamcloud.com>
Change-Id: Ie2d8c6449f84860511b322ff2db3ed656a163e74

23 files changed:
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/include/uapi/linux/lnet/lnet-dlc.h
lnet/include/uapi/linux/lnet/lnetctl.h
lnet/klnds/gnilnd/gnilnd.c
lnet/klnds/gnilnd/gnilnd_conn.c
lnet/klnds/o2iblnd/o2iblnd_cb.c
lnet/klnds/socklnd/socklnd.c
lnet/klnds/socklnd/socklnd.h
lnet/lnet/api-ni.c
lnet/lnet/config.c
lnet/lnet/lib-eq.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/lnet/net_fault.c
lnet/lnet/peer.c
lnet/lnet/router.c
lnet/lnet/router_proc.c
lnet/utils/lnetconfig/liblnetconfig.c
lnet/utils/lnetconfig/liblnetconfig.h
lnet/utils/lnetctl.c
lustre/tests/sanity.sh
lustre/utils/portals.c

index cd2b608..52aeb92 100644 (file)
@@ -92,16 +92,8 @@ extern struct lnet the_lnet;                 /* THE network */
                kernel_getsockname(sock, addr, addrlen)
 #endif
 
-static inline int lnet_is_route_alive(struct lnet_route *route)
-{
-       if (!route->lr_gateway->lpni_alive)
-               return 0; /* gateway is down */
-       if ((route->lr_gateway->lpni_ping_feats &
-            LNET_PING_FEAT_NI_STATUS) == 0)
-               return 1; /* no NI status, assume it's alive */
-       /* has NI status, check # down NIs */
-       return route->lr_downis == 0;
-}
+bool lnet_is_route_alive(struct lnet_route *route);
+bool lnet_is_gateway_alive(struct lnet_peer *gw);
 
 static inline int lnet_is_wire_handle_none(struct lnet_handle_wire *wh)
 {
@@ -448,9 +440,9 @@ lnet_peer_ni_decref_locked(struct lnet_peer_ni *lp)
 }
 
 static inline int
-lnet_isrouter(struct lnet_peer_ni *lp)
+lnet_isrouter(struct lnet_peer_ni *lpni)
 {
-       return lp->lpni_rtr_refcount != 0;
+       return lpni->lpni_peer_net->lpn_peer->lp_rtr_refcount != 0;
 }
 
 static inline void
@@ -574,19 +566,23 @@ extern unsigned int lnet_health_sensitivity;
 extern unsigned int lnet_recovery_interval;
 extern unsigned int lnet_peer_discovery_disabled;
 extern unsigned int lnet_drop_asym_route;
+extern unsigned int router_sensitivity_percentage;
+extern int alive_router_check_interval;
 extern int portal_rotor;
 
-int lnet_notify(struct lnet_ni *ni, lnet_nid_t peer, int alive,
+void lnet_mt_event_handler(struct lnet_event *event);
+
+int lnet_notify(struct lnet_ni *ni, lnet_nid_t peer, bool alive, bool reset,
                time64_t when);
 void lnet_notify_locked(struct lnet_peer_ni *lp, int notifylnd, int alive,
                        time64_t when);
 int lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway_nid,
-                  unsigned int priority);
-int lnet_check_routes(void);
+                  __u32 priority, __u32 sensitivity);
 int lnet_del_route(__u32 net, lnet_nid_t gw_nid);
 void lnet_destroy_routes(void);
 int lnet_get_route(int idx, __u32 *net, __u32 *hops,
-                  lnet_nid_t *gateway, __u32 *alive, __u32 *priority);
+                  lnet_nid_t *gateway, __u32 *alive, __u32 *priority,
+                  __u32 *sensitivity);
 int lnet_get_rtr_pool_cfg(int idx, struct lnet_ioctl_pool_cfg *pool_cfg);
 struct lnet_ni *lnet_get_next_ni_locked(struct lnet_net *mynet,
                                        struct lnet_ni *prev);
@@ -607,6 +603,8 @@ int  lnet_rtrpools_adjust(int tiny, int small, int large);
 int lnet_rtrpools_enable(void);
 void lnet_rtrpools_disable(void);
 void lnet_rtrpools_free(int keep_pools);
+void lnet_rtr_transfer_to_peer(struct lnet_peer *src,
+                              struct lnet_peer *target);
 struct lnet_remotenet *lnet_find_rnet_locked(__u32 net);
 int lnet_dyn_add_net(struct lnet_ioctl_config_data *conf);
 int lnet_dyn_del_net(__u32 net);
@@ -617,6 +615,7 @@ struct lnet_net *lnet_get_net_locked(__u32 net_id);
 
 int lnet_islocalnid(lnet_nid_t nid);
 int lnet_islocalnet(__u32 net);
+int lnet_islocalnet_locked(__u32 net);
 
 void lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md,
                        unsigned int offset, unsigned int mlen);
@@ -734,7 +733,8 @@ int lnet_fault_ctl(int cmd, struct libcfs_ioctl_data *data);
 int lnet_fault_init(void);
 void lnet_fault_fini(void);
 
-bool lnet_drop_rule_match(struct lnet_hdr *hdr, enum lnet_msg_hstatus *hstatus);
+bool lnet_drop_rule_match(struct lnet_hdr *hdr, lnet_nid_t local_nid,
+                         enum lnet_msg_hstatus *hstatus);
 
 int lnet_delay_rule_add(struct lnet_fault_attr *attr);
 int lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown);
@@ -854,17 +854,16 @@ int lnet_sock_connect(struct socket **sockp, int *fatal,
 
 int lnet_peers_start_down(void);
 int lnet_peer_buffer_credits(struct lnet_net *net);
+void lnet_consolidate_routes_locked(struct lnet_peer *orig_lp,
+                                   struct lnet_peer *new_lp);
+void lnet_router_discovery_complete(struct lnet_peer *lp);
 
 int lnet_monitor_thr_start(void);
 void lnet_monitor_thr_stop(void);
 
 bool lnet_router_checker_active(void);
 void lnet_check_routers(void);
-int lnet_router_pre_mt_start(void);
-void lnet_router_post_mt_start(void);
-void lnet_prune_rc_data(int wait_unlink);
-void lnet_router_cleanup(void);
-void lnet_router_ni_update_locked(struct lnet_peer_ni *gw, __u32 net);
+void lnet_wait_router_start(void);
 void lnet_swap_pinginfo(struct lnet_ping_buffer *pbuf);
 
 int lnet_ping_info_validate(struct lnet_ping_info *pinfo);
@@ -904,13 +903,18 @@ bool lnet_net_unique(__u32 net_id, struct list_head *nilist,
 bool lnet_ni_unique_net(struct list_head *nilist, char *iface);
 void lnet_incr_dlc_seq(void);
 __u32 lnet_get_dlc_seq_locked(void);
+int lnet_get_net_count(void);
 
+struct lnet_peer_net *lnet_get_next_peer_net_locked(struct lnet_peer *lp,
+                                                   __u32 prev_lpn_id);
 struct lnet_peer_ni *lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
                                                  struct lnet_peer_net *peer_net,
                                                  struct lnet_peer_ni *prev);
 struct lnet_peer_ni *lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref,
                                        int cpt);
 struct lnet_peer_ni *lnet_nid2peerni_ex(lnet_nid_t nid, int cpt);
+struct lnet_peer_ni *lnet_peer_get_ni_locked(struct lnet_peer *lp,
+                                            lnet_nid_t nid);
 struct lnet_peer_ni *lnet_find_peer_ni_locked(lnet_nid_t nid);
 struct lnet_peer *lnet_find_peer(lnet_nid_t nid);
 void lnet_peer_net_added(struct lnet_net *net);
@@ -951,15 +955,6 @@ lnet_find_peer_net_locked(struct lnet_peer *peer, __u32 net_id)
        return NULL;
 }
 
-static inline void
-lnet_peer_set_alive(struct lnet_peer_ni *lp)
-{
-       lp->lpni_last_alive = ktime_get_seconds();
-       lp->lpni_last_query = lp->lpni_last_alive;
-       if (!lp->lpni_alive)
-               lnet_notify_locked(lp, 0, 1, lp->lpni_last_alive);
-}
-
 static inline bool
 lnet_peer_is_multi_rail(struct lnet_peer *lp)
 {
@@ -983,6 +978,8 @@ lnet_peer_ni_is_primary(struct lnet_peer_ni *lpni)
 }
 
 bool lnet_peer_is_uptodate(struct lnet_peer *lp);
+bool lnet_is_discovery_disabled(struct lnet_peer *lp);
+bool lnet_peer_gw_discovery(struct lnet_peer *lp);
 
 static inline bool
 lnet_peer_needs_push(struct lnet_peer *lp)
@@ -993,11 +990,36 @@ lnet_peer_needs_push(struct lnet_peer *lp)
                return true;
        if (lp->lp_state & LNET_PEER_NO_DISCOVERY)
                return false;
+       /* if discovery is not enabled then no need to push */
+       if (lnet_peer_discovery_disabled)
+               return false;
        if (lp->lp_node_seqno < atomic_read(&the_lnet.ln_ping_target_seqno))
                return true;
        return false;
 }
 
+/*
+ * A peer is alive if it satisfies the following two conditions:
+ *  1. peer health >= LNET_MAX_HEALTH_VALUE * router_sensitivity_percentage
+ *  2. the cached NI status received when we discover the peer is UP
+ */
+static inline bool
+lnet_is_peer_ni_alive(struct lnet_peer_ni *lpni)
+{
+       bool halive = false;
+
+       halive = (atomic_read(&lpni->lpni_healthv) >=
+                (LNET_MAX_HEALTH_VALUE * router_sensitivity_percentage / 100));
+
+       return halive && lpni->lpni_ns_status == LNET_NI_STATUS_UP;
+}
+
+static inline void
+lnet_set_healthv(atomic_t *healthv, int value)
+{
+       atomic_set(healthv, value);
+}
+
 static inline void
 lnet_inc_healthv(atomic_t *healthv)
 {
index bd1df1e..0550090 100644 (file)
@@ -297,8 +297,8 @@ struct lnet_lnd {
        int (*lnd_eager_recv)(struct lnet_ni *ni, void *private,
                              struct lnet_msg *msg, void **new_privatep);
 
-       /* notification of peer health */
-       void (*lnd_notify)(struct lnet_ni *ni, lnet_nid_t peer, int alive);
+       /* notification of peer down */
+       void (*lnd_notify_peer_down)(lnet_nid_t peer);
 
        /* query of peer aliveness */
        void (*lnd_query)(struct lnet_ni *ni, lnet_nid_t peer, time64_t *when);
@@ -415,6 +415,12 @@ struct lnet_net {
 
        /* network state */
        enum lnet_net_state     net_state;
+
+       /* when I was last alive */
+       time64_t                net_last_alive;
+
+       /* protects access to net_last_alive */
+       spinlock_t              net_lock;
 };
 
 struct lnet_ni {
@@ -450,9 +456,6 @@ struct lnet_ni {
        /* percpt reference count */
        int                     **ni_refs;
 
-       /* when I was last alive */
-       time64_t                ni_last_alive;
-
        /* pointer to parent network */
        struct lnet_net         *ni_net;
 
@@ -527,16 +530,6 @@ struct lnet_ping_buffer {
 #define LNET_PING_INFO_TO_BUFFER(PINFO)        \
        container_of((PINFO), struct lnet_ping_buffer, pb_info)
 
-/* router checker data, per router */
-struct lnet_rc_data {
-       /* chain on the_lnet.ln_zombie_rcd or ln_deathrow_rcd */
-       struct list_head        rcd_list;
-       struct lnet_handle_md   rcd_mdh;        /* ping buffer MD */
-       struct lnet_peer_ni     *rcd_gateway;   /* reference to gateway */
-       struct lnet_ping_buffer *rcd_pingbuffer;/* ping buffer */
-       int                     rcd_nnis;       /* desired size of buffer */
-};
-
 struct lnet_peer_ni {
        /* chain on lpn_peer_nis */
        struct list_head        lpni_peer_nis;
@@ -548,49 +541,27 @@ struct lnet_peer_ni {
        struct list_head        lpni_hashlist;
        /* messages blocking for tx credits */
        struct list_head        lpni_txq;
-       /* messages blocking for router credits */
-       struct list_head        lpni_rtrq;
-       /* chain on router list */
-       struct list_head        lpni_rtr_list;
        /* pointer to peer net I'm part of */
        struct lnet_peer_net    *lpni_peer_net;
        /* statistics kept on each peer NI */
        struct lnet_element_stats lpni_stats;
        struct lnet_health_remote_stats lpni_hstats;
-       /* spin lock protecting credits and lpni_txq / lpni_rtrq */
+       /* spin lock protecting credits and lpni_txq */
        spinlock_t              lpni_lock;
        /* # tx credits available */
        int                     lpni_txcredits;
        /* low water mark */
        int                     lpni_mintxcredits;
+       /*
+        * Each peer_ni in a gateway maintains its own credits. This
+        * allows more traffic to gateways that have multiple interfaces.
+        */
        /* # router credits */
        int                     lpni_rtrcredits;
        /* low water mark */
        int                     lpni_minrtrcredits;
        /* bytes queued for sending */
        long                    lpni_txqnob;
-       /* alive/dead? */
-       bool                    lpni_alive;
-       /* notification outstanding? */
-       bool                    lpni_notify;
-       /* outstanding notification for LND? */
-       bool                    lpni_notifylnd;
-       /* some thread is handling notification */
-       bool                    lpni_notifying;
-       /* SEND event outstanding from ping */
-       bool                    lpni_ping_notsent;
-       /* # times router went dead<->alive. Protected with lpni_lock */
-       int                     lpni_alive_count;
-       /* time of last aliveness news */
-       time64_t                lpni_timestamp;
-       /* time of last ping attempt */
-       time64_t                lpni_ping_timestamp;
-       /* != 0 if ping reply expected */
-       time64_t                lpni_ping_deadline;
-       /* when I was last alive */
-       time64_t                lpni_last_alive;
-       /* when lpni_ni was queried last time */
-       time64_t                lpni_last_query;
        /* network peer is on */
        struct lnet_net         *lpni_net;
        /* peer's NID */
@@ -605,18 +576,16 @@ struct lnet_peer_ni {
        int                     lpni_cpt;
        /* state flags -- protected by lpni_lock */
        unsigned                lpni_state;
-       /* # refs from lnet_route_t::lr_gateway */
-       int                     lpni_rtr_refcount;
+       /* status of the peer NI as reported by the peer */
+       __u32                   lpni_ns_status;
        /* sequence number used to round robin over peer nis within a net */
        __u32                   lpni_seq;
        /* sequence number used to round robin over gateways */
        __u32                   lpni_gw_seq;
-       /* health flag */
-       bool                    lpni_healthy;
        /* returned RC ping features. Protected with lpni_lock */
        unsigned int            lpni_ping_feats;
-       /* routes on this peer */
-       struct list_head        lpni_routes;
+       /* time last message was received from the peer */
+       time64_t                lpni_last_alive;
        /* preferred local nids: if only one, use lpni_pref.nid */
        union lpni_pref {
                lnet_nid_t      nid;
@@ -624,8 +593,6 @@ struct lnet_peer_ni {
        } lpni_pref;
        /* number of preferred NIDs in lnpi_pref_nids */
        __u32                   lpni_pref_nnids;
-       /* router checker state */
-       struct lnet_rc_data     *lpni_rcd;
 };
 
 /* Preferred path added due to traffic on non-MR peer_ni */
@@ -647,19 +614,40 @@ struct lnet_peer {
        /* list of messages pending discovery*/
        struct list_head        lp_dc_pendq;
 
+       /* chain on router list */
+       struct list_head        lp_rtr_list;
+
        /* primary NID of the peer */
        lnet_nid_t              lp_primary_nid;
 
+       /* net to perform discovery on */
+       __u32                   lp_disc_net_id;
+
        /* CPT of peer_table */
        int                     lp_cpt;
 
        /* number of NIDs on this peer */
        int                     lp_nnis;
 
+       /* # refs from lnet_route_t::lr_gateway */
+       int                     lp_rtr_refcount;
+
+       /*
+        * peer specific health sensitivity value to decrement peer nis in
+        * this peer with if set to something other than 0
+        */
+       __u32                   lp_health_sensitivity;
+
+       /* messages blocking for router credits */
+       struct list_head        lp_rtrq;
+
+       /* routes on this peer */
+       struct list_head        lp_routes;
+
        /* reference count */
        atomic_t                lp_refcount;
 
-       /* lock protecting peer state flags */
+       /* lock protecting peer state flags and lpni_rtrq */
        spinlock_t              lp_lock;
 
        /* peer state flags */
@@ -714,9 +702,13 @@ struct lnet_peer {
  *
  * A peer is marked NO_DISCOVERY if the LNET_PING_FEAT_DISCOVERY bit was
  * NOT set when the peer was pinged by discovery.
+ *
+ * A peer is marked ROUTER if it indicates so in the feature bit.
  */
 #define LNET_PEER_MULTI_RAIL   (1 << 0)        /* Multi-rail aware */
 #define LNET_PEER_NO_DISCOVERY (1 << 1)        /* Peer disabled discovery */
+#define LNET_PEER_ROUTER_ENABLED (1 << 2)      /* router feature enabled */
+
 /*
  * A peer is marked CONFIGURED if it was configured by DLC.
  *
@@ -730,28 +722,34 @@ struct lnet_peer {
  * A peer that was created as the result of inbound traffic will not
  * be marked at all.
  */
-#define LNET_PEER_CONFIGURED   (1 << 2)        /* Configured via DLC */
-#define LNET_PEER_DISCOVERED   (1 << 3)        /* Peer was discovered */
-#define LNET_PEER_REDISCOVER   (1 << 4)        /* Discovery was disabled */
+#define LNET_PEER_CONFIGURED   (1 << 3)        /* Configured via DLC */
+#define LNET_PEER_DISCOVERED   (1 << 4)        /* Peer was discovered */
+#define LNET_PEER_REDISCOVER   (1 << 5)        /* Discovery was disabled */
 /*
  * A peer is marked DISCOVERING when discovery is in progress.
  * The other flags below correspond to stages of discovery.
  */
-#define LNET_PEER_DISCOVERING  (1 << 5)        /* Discovering */
-#define LNET_PEER_DATA_PRESENT (1 << 6)        /* Remote peer data present */
-#define LNET_PEER_NIDS_UPTODATE        (1 << 7)        /* Remote peer info uptodate */
-#define LNET_PEER_PING_SENT    (1 << 8)        /* Waiting for REPLY to Ping */
-#define LNET_PEER_PUSH_SENT    (1 << 9)        /* Waiting for ACK of Push */
-#define LNET_PEER_PING_FAILED  (1 << 10)       /* Ping send failure */
-#define LNET_PEER_PUSH_FAILED  (1 << 11)       /* Push send failure */
+#define LNET_PEER_DISCOVERING  (1 << 6)        /* Discovering */
+#define LNET_PEER_DATA_PRESENT (1 << 7)        /* Remote peer data present */
+#define LNET_PEER_NIDS_UPTODATE        (1 << 8)        /* Remote peer info uptodate */
+#define LNET_PEER_PING_SENT    (1 << 9)        /* Waiting for REPLY to Ping */
+#define LNET_PEER_PUSH_SENT    (1 << 10)       /* Waiting for ACK of Push */
+#define LNET_PEER_PING_FAILED  (1 << 11)       /* Ping send failure */
+#define LNET_PEER_PUSH_FAILED  (1 << 12)       /* Push send failure */
 /*
  * A ping can be forced as a way to fix up state, or as a manual
  * intervention by an admin.
  * A push can be forced in circumstances that would normally not
  * allow for one to happen.
  */
-#define LNET_PEER_FORCE_PING   (1 << 12)       /* Forced Ping */
-#define LNET_PEER_FORCE_PUSH   (1 << 13)       /* Forced Push */
+#define LNET_PEER_FORCE_PING   (1 << 13)       /* Forced Ping */
+#define LNET_PEER_FORCE_PUSH   (1 << 14)       /* Forced Push */
+
+/* force delete even if router */
+#define LNET_PEER_RTR_NI_FORCE_DEL (1 << 15)
+
+/* gw undergoing alive discovery */
+#define LNET_PEER_RTR_DISCOVERY (1 << 16)
 
 struct lnet_peer_net {
        /* chain on lp_peer_nets */
@@ -766,6 +764,12 @@ struct lnet_peer_net {
        /* Net ID */
        __u32                   lpn_net_id;
 
+       /* time of last router net check attempt */
+       time64_t                lpn_rtrcheck_timestamp;
+
+       /* selection sequence number */
+       __u32                   lpn_seq;
+
        /* reference count */
        atomic_t                lpn_refcount;
 };
@@ -810,10 +814,11 @@ struct lnet_peer_table {
 struct lnet_route {
        struct list_head        lr_list;        /* chain on net */
        struct list_head        lr_gwlist;      /* chain on gateway */
-       struct lnet_peer_ni     *lr_gateway;    /* router node */
+       struct lnet_peer        *lr_gateway;    /* router node */
+       lnet_nid_t              lr_nid;         /* NID used to add route */
        __u32                   lr_net;         /* remote network number */
+       __u32                   lr_lnet;        /* local network number */
        int                     lr_seq;         /* sequence for round-robin */
-       unsigned int            lr_downis;      /* number of down NIs */
        __u32                   lr_hops;        /* how far I am */
        unsigned int            lr_priority;    /* route priority */
 };
@@ -1086,12 +1091,6 @@ struct lnet {
 
        /* monitor thread startup/shutdown state */
        int                             ln_mt_state;
-       /* router checker's event queue */
-       struct lnet_handle_eq           ln_rc_eqh;
-       /* rcd still pending on net */
-       struct list_head                ln_rcd_deathrow;
-       /* rcd ready for free */
-       struct list_head                ln_rcd_zombie;
        /* serialise startup/shutdown */
        struct semaphore                ln_mt_signal;
 
index f10cbc3..a454a65 100644 (file)
@@ -139,6 +139,7 @@ struct lnet_ioctl_config_data {
                        __u32 rtr_hop;
                        __u32 rtr_priority;
                        __u32 rtr_flags;
+                       __u32 rtr_sensitivity;
                } cfg_route;
                struct {
                        char net_intf[LNET_MAX_STR_LEN];
index cb4f153..cdf5849 100644 (file)
@@ -77,6 +77,10 @@ struct lnet_fault_attr {
        lnet_nid_t                      fa_src;
        /** destination NID of drop rule, see \a dr_src for details */
        lnet_nid_t                      fa_dst;
+       /** local NID. In case of router this is the NID we're ceiving
+        * messages on
+        */
+       lnet_nid_t                      fa_local_nid;
        /**
         * Portal mask to drop, -1 means all portals, for example:
         * fa_ptl_mask = (1 << _LDLM_CB_REQUEST_PORTAL ) |
@@ -108,6 +112,8 @@ struct lnet_fault_attr {
                        __u32                   da_health_error_mask;
                        /** randomize error generation */
                        bool                    da_random;
+                       /** drop all messages if flag is set */
+                       bool                    da_drop_all;
                } drop;
                /** message latency simulation */
                struct {
index 1fd80be..cf1b263 100644 (file)
@@ -608,6 +608,7 @@ kgnilnd_peer_notify(kgn_peer_t *peer, int error, int alive)
                                ktime_get_seconds() - peer->gnp_last_alive);
 
                        lnet_notify(net->gnn_ni, peer_nid, alive,
+                                   (alive) ? true : false,
                                    peer->gnp_last_alive);
 
                        kgnilnd_net_decref(net);
index a3f7c75..a225780 100644 (file)
@@ -1954,7 +1954,7 @@ kgnilnd_finish_connect(kgn_dgram_t *dgram)
        /* Notify LNET that we now have a working connection to this peer.
         * This is a Cray extension to the "standard" LND behavior.
         */
-       lnet_notify(peer->gnp_net->gnn_ni, peer->gnp_nid, 1,
+       lnet_notify(peer->gnp_net->gnn_ni, peer->gnp_nid, true, true,
                    ktime_get_seconds());
 
        /* drop our 'hold' ref */
index a0edce8..c6fb08f 100644 (file)
@@ -2019,24 +2019,24 @@ kiblnd_peer_alive(struct kib_peer_ni *peer_ni)
 static void
 kiblnd_peer_notify(struct kib_peer_ni *peer_ni)
 {
-        int           error = 0;
+       int           error = 0;
        time64_t last_alive = 0;
-        unsigned long flags;
+       unsigned long flags;
 
        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
 
        if (kiblnd_peer_idle(peer_ni) && peer_ni->ibp_error != 0) {
-                error = peer_ni->ibp_error;
-                peer_ni->ibp_error = 0;
+               error = peer_ni->ibp_error;
+               peer_ni->ibp_error = 0;
 
-                last_alive = peer_ni->ibp_last_alive;
-        }
+               last_alive = peer_ni->ibp_last_alive;
+       }
 
        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 
-        if (error != 0)
-                lnet_notify(peer_ni->ibp_ni,
-                            peer_ni->ibp_nid, 0, last_alive);
+       if (error != 0)
+               lnet_notify(peer_ni->ibp_ni,
+                           peer_ni->ibp_nid, false, false, last_alive);
 }
 
 void
index 3409264..5d60a0a 100644 (file)
@@ -1521,8 +1521,8 @@ ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
        read_unlock(&ksocknal_data.ksnd_global_lock);
 
        if (notify)
-               lnet_notify(peer_ni->ksnp_ni, peer_ni->ksnp_id.nid, 0,
-                           last_alive);
+               lnet_notify(peer_ni->ksnp_ni, peer_ni->ksnp_id.nid,
+                           false, false, last_alive);
 }
 
 void
@@ -1781,7 +1781,7 @@ ksocknal_close_matching_conns(struct lnet_process_id id, __u32 ipaddr)
 }
 
 void
-ksocknal_notify(struct lnet_ni *ni, lnet_nid_t gw_nid, int alive)
+ksocknal_notify_gw_down(lnet_nid_t gw_nid)
 {
        /* The router is telling me she's been notified of a change in
         * gateway state....
@@ -1791,17 +1791,14 @@ ksocknal_notify(struct lnet_ni *ni, lnet_nid_t gw_nid, int alive)
                .pid    = LNET_PID_ANY,
        };
 
-        CDEBUG (D_NET, "gw %s %s\n", libcfs_nid2str(gw_nid),
-                alive ? "up" : "down");
+       CDEBUG(D_NET, "gw %s down\n", libcfs_nid2str(gw_nid));
 
-        if (!alive) {
-                /* If the gateway crashed, close all open connections... */
-                ksocknal_close_matching_conns (id, 0);
-                return;
-        }
+       /* If the gateway crashed, close all open connections... */
+       ksocknal_close_matching_conns(id, 0);
+       return;
 
-        /* ...otherwise do nothing.  We can only establish new connections
-         * if we have autroutes, and these connect on demand. */
+       /* We can only establish new connections
+        * if we have autroutes, and these connect on demand. */
 }
 
 void
@@ -2882,7 +2879,7 @@ static int __init ksocklnd_init(void)
        the_ksocklnd.lnd_ctl      = ksocknal_ctl;
        the_ksocklnd.lnd_send     = ksocknal_send;
        the_ksocklnd.lnd_recv     = ksocknal_recv;
-       the_ksocklnd.lnd_notify   = ksocknal_notify;
+       the_ksocklnd.lnd_notify_peer_down   = ksocknal_notify_gw_down;
        the_ksocklnd.lnd_query    = ksocknal_query;
        the_ksocklnd.lnd_accept   = ksocknal_accept;
 
index 9f198dd..7a3b03e 100644 (file)
@@ -627,7 +627,7 @@ extern void ksocknal_next_tx_carrier(struct ksock_conn *conn);
 extern void ksocknal_queue_tx_locked(struct ksock_tx *tx, struct ksock_conn *conn);
 extern void ksocknal_txlist_done(struct lnet_ni *ni, struct list_head *txlist,
                                 int error);
-extern void ksocknal_notify(struct lnet_ni *ni, lnet_nid_t gw_nid, int alive);
+extern void ksocknal_notify(lnet_nid_t gw_nid);
 extern void ksocknal_query(struct lnet_ni *ni, lnet_nid_t nid, time64_t *when);
 extern int ksocknal_thread_start(int (*fn)(void *arg), void *arg, char *name);
 extern void ksocknal_thread_fini(void);
index c08e929..8784577 100644 (file)
@@ -80,10 +80,10 @@ MODULE_PARM_DESC(lnet_numa_range,
 
 /*
  * lnet_health_sensitivity determines by how much we decrement the health
- * value on sending error. The value defaults to 0, which means health
- * checking is turned off by default.
+ * value on sending error. The value defaults to 100, which means health
+ * interface health is decremented by 100 points every failure.
  */
-unsigned int lnet_health_sensitivity = 0;
+unsigned int lnet_health_sensitivity = 100;
 static int sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp);
 #ifdef HAVE_KERNEL_PARAM_OPS
 static struct kernel_param_ops param_ops_health_sensitivity = {
@@ -179,7 +179,10 @@ module_param_call(lnet_drop_asym_route, drop_asym_route_set, param_get_int,
 MODULE_PARM_DESC(lnet_drop_asym_route,
                 "Set to 1 to drop asymmetrical route messages.");
 
-unsigned lnet_transaction_timeout = 50;
+#define LNET_TRANSACTION_TIMEOUT_NO_HEALTH_DEFAULT 50
+#define LNET_TRANSACTION_TIMEOUT_HEALTH_DEFAULT 10
+
+unsigned lnet_transaction_timeout = LNET_TRANSACTION_TIMEOUT_HEALTH_DEFAULT;
 static int transaction_to_set(const char *val, cfs_kernel_param_arg_t *kp);
 #ifdef HAVE_KERNEL_PARAM_OPS
 static struct kernel_param_ops param_ops_transaction_timeout = {
@@ -197,7 +200,8 @@ module_param_call(lnet_transaction_timeout, transaction_to_set, param_get_int,
 MODULE_PARM_DESC(lnet_transaction_timeout,
                "Maximum number of seconds to wait for a peer response.");
 
-unsigned lnet_retry_count = 0;
+#define LNET_RETRY_COUNT_HEALTH_DEFAULT 3
+unsigned lnet_retry_count = LNET_RETRY_COUNT_HEALTH_DEFAULT;
 static int retry_count_set(const char *val, cfs_kernel_param_arg_t *kp);
 #ifdef HAVE_KERNEL_PARAM_OPS
 static struct kernel_param_ops param_ops_retry_count = {
@@ -217,6 +221,7 @@ MODULE_PARM_DESC(lnet_retry_count,
 
 
 unsigned lnet_lnd_timeout = LNET_LND_DEFAULT_TIMEOUT;
+unsigned int lnet_current_net_count;
 
 /*
  * This sequence number keeps track of how many times DLC was used to
@@ -252,11 +257,6 @@ sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp)
         */
        mutex_lock(&the_lnet.ln_api_mutex);
 
-       if (the_lnet.ln_state != LNET_STATE_RUNNING) {
-               mutex_unlock(&the_lnet.ln_api_mutex);
-               return 0;
-       }
-
        if (value > LNET_MAX_HEALTH_VALUE) {
                mutex_unlock(&the_lnet.ln_api_mutex);
                CERROR("Invalid health value. Maximum: %d value = %lu\n",
@@ -264,6 +264,23 @@ sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp)
                return -EINVAL;
        }
 
+       /*
+        * if we're turning on health then use the health timeout
+        * defaults.
+        */
+       if (*sensitivity == 0 && value != 0) {
+               lnet_transaction_timeout = LNET_TRANSACTION_TIMEOUT_HEALTH_DEFAULT;
+               lnet_retry_count = LNET_RETRY_COUNT_HEALTH_DEFAULT;
+       /*
+        * if we're turning off health then use the no health timeout
+        * default.
+        */
+       } else if (*sensitivity != 0 && value == 0) {
+               lnet_transaction_timeout =
+                       LNET_TRANSACTION_TIMEOUT_NO_HEALTH_DEFAULT;
+               lnet_retry_count = 0;
+       }
+
        *sensitivity = value;
 
        mutex_unlock(&the_lnet.ln_api_mutex);
@@ -295,11 +312,6 @@ recovery_interval_set(const char *val, cfs_kernel_param_arg_t *kp)
         */
        mutex_lock(&the_lnet.ln_api_mutex);
 
-       if (the_lnet.ln_state != LNET_STATE_RUNNING) {
-               mutex_unlock(&the_lnet.ln_api_mutex);
-               return 0;
-       }
-
        *interval = value;
 
        mutex_unlock(&the_lnet.ln_api_mutex);
@@ -408,11 +420,6 @@ transaction_to_set(const char *val, cfs_kernel_param_arg_t *kp)
         */
        mutex_lock(&the_lnet.ln_api_mutex);
 
-       if (the_lnet.ln_state != LNET_STATE_RUNNING) {
-               mutex_unlock(&the_lnet.ln_api_mutex);
-               return 0;
-       }
-
        if (value < lnet_retry_count || value == 0) {
                mutex_unlock(&the_lnet.ln_api_mutex);
                CERROR("Invalid value for lnet_transaction_timeout (%lu). "
@@ -456,9 +463,10 @@ retry_count_set(const char *val, cfs_kernel_param_arg_t *kp)
         */
        mutex_lock(&the_lnet.ln_api_mutex);
 
-       if (the_lnet.ln_state != LNET_STATE_RUNNING) {
+       if (lnet_health_sensitivity == 0) {
                mutex_unlock(&the_lnet.ln_api_mutex);
-               return 0;
+               CERROR("Can not set retry_count when health feature is turned off\n");
+               return -EINVAL;
        }
 
        if (value > lnet_transaction_timeout) {
@@ -469,11 +477,6 @@ retry_count_set(const char *val, cfs_kernel_param_arg_t *kp)
                return -EINVAL;
        }
 
-       if (value == *retry_count) {
-               mutex_unlock(&the_lnet.ln_api_mutex);
-               return 0;
-       }
-
        *retry_count = value;
 
        if (value == 0)
@@ -1130,6 +1133,7 @@ lnet_prepare(lnet_pid_t requested_pid)
        INIT_LIST_HEAD(&the_lnet.ln_mt_localNIRecovq);
        INIT_LIST_HEAD(&the_lnet.ln_mt_peerNIRecovq);
        init_waitqueue_head(&the_lnet.ln_dc_waitq);
+       LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
 
        rc = lnet_descriptor_setup();
        if (rc != 0)
@@ -1198,6 +1202,8 @@ lnet_prepare(lnet_pid_t requested_pid)
 static int
 lnet_unprepare (void)
 {
+       int rc;
+
        /* NB no LNET_LOCK since this is the last reference.  All LND instances
         * have shut down already, so it is safe to unlink and free all
         * descriptors, even those that appear committed to a network op (eg MD
@@ -1209,6 +1215,12 @@ lnet_unprepare (void)
        LASSERT(list_empty(&the_lnet.ln_test_peers));
        LASSERT(list_empty(&the_lnet.ln_nets));
 
+       if (!LNetEQHandleIsInvalid(the_lnet.ln_mt_eqh)) {
+               rc = LNetEQFree(the_lnet.ln_mt_eqh);
+               LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
+               LASSERT(rc == 0);
+       }
+
        lnet_portals_destroy();
 
        if (the_lnet.ln_md_containers != NULL) {
@@ -1357,18 +1369,28 @@ lnet_cpt_of_nid(lnet_nid_t nid, struct lnet_ni *ni)
 EXPORT_SYMBOL(lnet_cpt_of_nid);
 
 int
-lnet_islocalnet(__u32 net_id)
+lnet_islocalnet_locked(__u32 net_id)
 {
        struct lnet_net *net;
-       int             cpt;
-       bool            local;
-
-       cpt = lnet_net_lock_current();
+       bool local;
 
        net = lnet_get_net_locked(net_id);
 
        local = net != NULL;
 
+       return local;
+}
+
+int
+lnet_islocalnet(__u32 net_id)
+{
+       int cpt;
+       bool local;
+
+       cpt = lnet_net_lock_current();
+
+       local = lnet_islocalnet_locked(net_id);
+
        lnet_net_unlock(cpt);
 
        return local;
@@ -1526,6 +1548,45 @@ lnet_get_ni_count(void)
 }
 
 int
+lnet_get_net_count(void)
+{
+       struct lnet_net *net;
+       int count = 0;
+
+       lnet_net_lock(0);
+
+       list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
+               count++;
+       }
+
+       lnet_net_unlock(0);
+
+       return count;
+}
+
+void
+lnet_swap_pinginfo(struct lnet_ping_buffer *pbuf)
+{
+       struct lnet_ni_status *stat;
+       int nnis;
+       int i;
+
+       __swab32s(&pbuf->pb_info.pi_magic);
+       __swab32s(&pbuf->pb_info.pi_features);
+       __swab32s(&pbuf->pb_info.pi_pid);
+       __swab32s(&pbuf->pb_info.pi_nnis);
+       nnis = pbuf->pb_info.pi_nnis;
+       if (nnis > pbuf->pb_nnis)
+               nnis = pbuf->pb_nnis;
+       for (i = 0; i < nnis; i++) {
+               stat = &pbuf->pb_info.pi_ni[i];
+               __swab64s(&stat->ns_nid);
+               __swab32s(&stat->ns_status);
+       }
+       return;
+}
+
+int
 lnet_ping_info_validate(struct lnet_ping_info *pinfo)
 {
        if (!pinfo)
@@ -2349,6 +2410,9 @@ lnet_startup_lndnet(struct lnet_net *net, struct lnet_lnd_tunables *tun)
                lnet_net_unlock(LNET_LOCK_EX);
        }
 
+       /* update net count */
+       lnet_current_net_count = lnet_get_net_count();
+
        return ni_count;
 
 failed1:
@@ -2443,12 +2507,9 @@ int lnet_lib_init(void)
        }
 
        the_lnet.ln_refcount = 0;
-       LNetInvalidateEQHandle(&the_lnet.ln_rc_eqh);
        INIT_LIST_HEAD(&the_lnet.ln_lnds);
        INIT_LIST_HEAD(&the_lnet.ln_net_zombie);
-       INIT_LIST_HEAD(&the_lnet.ln_rcd_zombie);
        INIT_LIST_HEAD(&the_lnet.ln_msg_resend);
-       INIT_LIST_HEAD(&the_lnet.ln_rcd_deathrow);
 
        /* The hash table size is the number of bits it takes to express the set
         * ln_num_routes, minus 1 (better to under estimate than over so we
@@ -2564,10 +2625,6 @@ LNetNIInit(lnet_pid_t requested_pid)
                if (rc != 0)
                        goto err_shutdown_lndnis;
 
-               rc = lnet_check_routes();
-               if (rc != 0)
-                       goto err_destroy_routes;
-
                rc = lnet_rtrpools_alloc(im_a_router);
                if (rc != 0)
                        goto err_destroy_routes;
@@ -2586,29 +2643,38 @@ LNetNIInit(lnet_pid_t requested_pid)
 
        lnet_ping_target_update(pbuf, ping_mdh);
 
-       rc = lnet_monitor_thr_start();
-       if (rc != 0)
+       rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh);
+       if (rc != 0) {
+               CERROR("Can't allocate monitor thread EQ: %d\n", rc);
                goto err_stop_ping;
+       }
 
        rc = lnet_push_target_init();
        if (rc != 0)
-               goto err_stop_monitor_thr;
+               goto err_stop_ping;
 
        rc = lnet_peer_discovery_start();
        if (rc != 0)
                goto err_destroy_push_target;
 
+       rc = lnet_monitor_thr_start();
+       if (rc != 0)
+               goto err_stop_discovery_thr;
+
        lnet_fault_init();
        lnet_router_debugfs_init();
 
        mutex_unlock(&the_lnet.ln_api_mutex);
 
+       /* wait for all routers to start */
+       lnet_wait_router_start();
+
        return 0;
 
+err_stop_discovery_thr:
+       lnet_peer_discovery_stop();
 err_destroy_push_target:
        lnet_push_target_fini();
-err_stop_monitor_thr:
-       lnet_monitor_thr_stop();
 err_stop_ping:
        lnet_ping_target_fini();
 err_acceptor_stop:
@@ -2658,9 +2724,9 @@ LNetNIFini()
                lnet_fault_fini();
 
                lnet_router_debugfs_fini();
+               lnet_monitor_thr_stop();
                lnet_peer_discovery_stop();
                lnet_push_target_fini();
-               lnet_monitor_thr_stop();
                lnet_ping_target_fini();
 
                /* Teardown fns that use my own API functions BEFORE here */
@@ -3498,26 +3564,28 @@ LNetCtl(unsigned int cmd, void *arg)
        case IOC_LIBCFS_FAIL_NID:
                return lnet_fail_nid(data->ioc_nid, data->ioc_count);
 
-       case IOC_LIBCFS_ADD_ROUTE:
+       case IOC_LIBCFS_ADD_ROUTE: {
+               /* default router sensitivity to 1 */
+               unsigned int sensitivity = 1;
                config = arg;
 
                if (config->cfg_hdr.ioc_len < sizeof(*config))
                        return -EINVAL;
 
+               if (config->cfg_config_u.cfg_route.rtr_sensitivity) {
+                       sensitivity =
+                         config->cfg_config_u.cfg_route.rtr_sensitivity;
+               }
+
                mutex_lock(&the_lnet.ln_api_mutex);
                rc = lnet_add_route(config->cfg_net,
                                    config->cfg_config_u.cfg_route.rtr_hop,
                                    config->cfg_nid,
                                    config->cfg_config_u.cfg_route.
-                                       rtr_priority);
-               if (rc == 0) {
-                       rc = lnet_check_routes();
-                       if (rc != 0)
-                               lnet_del_route(config->cfg_net,
-                                              config->cfg_nid);
-               }
+                                       rtr_priority, sensitivity);
                mutex_unlock(&the_lnet.ln_api_mutex);
                return rc;
+       }
 
        case IOC_LIBCFS_DEL_ROUTE:
                config = arg;
@@ -3543,7 +3611,9 @@ LNetCtl(unsigned int cmd, void *arg)
                                    &config->cfg_nid,
                                    &config->cfg_config_u.cfg_route.rtr_flags,
                                    &config->cfg_config_u.cfg_route.
-                                       rtr_priority);
+                                       rtr_priority,
+                                   &config->cfg_config_u.cfg_route.
+                                       rtr_sensitivity);
                mutex_unlock(&the_lnet.ln_api_mutex);
                return rc;
 
@@ -3817,7 +3887,7 @@ LNetCtl(unsigned int cmd, void *arg)
                 * that deadline to the wall clock.
                 */
                deadline += ktime_get_seconds();
-               return lnet_notify(NULL, data->ioc_nid, data->ioc_flags,
+               return lnet_notify(NULL, data->ioc_nid, data->ioc_flags, false,
                                   deadline);
        }
 
index bcd780d..15f80bd 100644 (file)
@@ -377,8 +377,10 @@ lnet_net_alloc(__u32 net_id, struct list_head *net_list)
        INIT_LIST_HEAD(&net->net_ni_list);
        INIT_LIST_HEAD(&net->net_ni_added);
        INIT_LIST_HEAD(&net->net_ni_zombie);
+       spin_lock_init(&net->net_lock);
 
        net->net_id = net_id;
+       net->net_last_alive = ktime_get_real_seconds();
        net->net_state = LNET_NET_STATE_INIT;
 
        /* initialize global paramters to undefiend */
@@ -459,6 +461,7 @@ lnet_ni_alloc_common(struct lnet_net *net, char *iface)
        spin_lock_init(&ni->ni_lock);
        INIT_LIST_HEAD(&ni->ni_netlist);
        INIT_LIST_HEAD(&ni->ni_recovery);
+       LNetInvalidateMDHandle(&ni->ni_ping_mdh);
        ni->ni_refs = cfs_percpt_alloc(lnet_cpt_table(),
                                       sizeof(*ni->ni_refs[0]));
        if (ni->ni_refs == NULL)
@@ -482,7 +485,6 @@ lnet_ni_alloc_common(struct lnet_net *net, char *iface)
        else
                ni->ni_net_ns = NULL;
 
-       ni->ni_last_alive = ktime_get_real_seconds();
        ni->ni_state = LNET_NI_STATE_INIT;
        list_add_tail(&ni->ni_netlist, &net->net_ni_added);
 
@@ -1245,7 +1247,7 @@ lnet_parse_route (char *str, int *im_a_router)
                                continue;
                        }
 
-                       rc = lnet_add_route(net, hops, nid, priority);
+                       rc = lnet_add_route(net, hops, nid, priority, 1);
                        if (rc != 0 && rc != -EEXIST && rc != -EHOSTUNREACH) {
                                CERROR("Can't create route "
                                       "to %s via %s\n",
index 3bca6b7..354c976 100644 (file)
@@ -159,8 +159,6 @@ LNetEQFree(struct lnet_handle_eq eqh)
        int             size = 0;
        int             i;
 
-       LASSERT(the_lnet.ln_refcount > 0);
-
        lnet_res_lock(LNET_LOCK_EX);
        /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
         * both EQ lookup and poll event with only lnet_eq_wait_lock */
index 41ab43a..d3cf14b 100644 (file)
@@ -42,6 +42,8 @@
 #include <linux/nsproxy.h>
 #include <net/net_namespace.h>
 
+extern unsigned int lnet_current_net_count;
+
 static int local_nid_dist_zero = 1;
 module_param(local_nid_dist_zero, int, 0444);
 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
@@ -794,88 +796,35 @@ lnet_ni_eager_recv(struct lnet_ni *ni, struct lnet_msg *msg)
        return rc;
 }
 
-/*
- * This function can be called from two paths:
- *     1. when sending a message
- *     2. when decommiting a message (lnet_msg_decommit_tx())
- * In both these cases the peer_ni should have it's reference count
- * acquired by the caller and therefore it is safe to drop the spin
- * lock before calling lnd_query()
- */
-static void
-lnet_ni_query_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp)
-{
-       time64_t last_alive = 0;
-       int cpt = lnet_cpt_of_nid_locked(lp->lpni_nid, ni);
-
-       LASSERT(lnet_peer_aliveness_enabled(lp));
-       LASSERT(ni->ni_net->net_lnd->lnd_query != NULL);
-
-       lnet_net_unlock(cpt);
-       (ni->ni_net->net_lnd->lnd_query)(ni, lp->lpni_nid, &last_alive);
-       lnet_net_lock(cpt);
-
-       lp->lpni_last_query = ktime_get_seconds();
-
-       if (last_alive != 0) /* NI has updated timestamp */
-               lp->lpni_last_alive = last_alive;
-}
-
-/* NB: always called with lnet_net_lock held */
-static inline int
-lnet_peer_is_alive(struct lnet_peer_ni *lp, time64_t now)
+static bool
+lnet_is_peer_deadline_passed(struct lnet_peer_ni *lpni, time64_t now)
 {
-       int alive;
        time64_t deadline;
 
-       LASSERT (lnet_peer_aliveness_enabled(lp));
+       deadline = lpni->lpni_last_alive +
+                  lpni->lpni_net->net_tunables.lct_peer_timeout;
 
        /*
-        * Trust lnet_notify() if it has more recent aliveness news, but
-        * ignore the initial assumed death (see lnet_peers_start_down()).
+        * assume peer_ni is alive as long as we're within the configured
+        * peer timeout
         */
-       spin_lock(&lp->lpni_lock);
-       if (!lp->lpni_alive && lp->lpni_alive_count > 0 &&
-           lp->lpni_timestamp >= lp->lpni_last_alive) {
-               spin_unlock(&lp->lpni_lock);
-               return 0;
-       }
-
-       deadline = lp->lpni_last_alive +
-                  lp->lpni_net->net_tunables.lct_peer_timeout;
-       alive = deadline > now;
-
-       /*
-        * Update obsolete lp_alive except for routers assumed to be dead
-        * initially, because router checker would update aliveness in this
-        * case, and moreover lpni_last_alive at peer creation is assumed.
-        */
-       if (alive && !lp->lpni_alive &&
-           !(lnet_isrouter(lp) && lp->lpni_alive_count == 0)) {
-               spin_unlock(&lp->lpni_lock);
-               lnet_notify_locked(lp, 0, 1, lp->lpni_last_alive);
-       } else {
-               spin_unlock(&lp->lpni_lock);
-       }
+       if (deadline > now)
+               return false;
 
-       return alive;
+       return true;
 }
 
-
 /* NB: returns 1 when alive, 0 when dead, negative when error;
  *     may drop the lnet_net_lock */
 static int
-lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp,
+lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lpni,
                       struct lnet_msg *msg)
 {
        time64_t now = ktime_get_seconds();
 
-       if (!lnet_peer_aliveness_enabled(lp))
+       if (!lnet_peer_aliveness_enabled(lpni))
                return -ENODEV;
 
-       if (lnet_peer_is_alive(lp, now))
-               return 1;
-
        /*
         * If we're resending a message, let's attempt to send it even if
         * the peer is down to fulfill our resend quota on the message
@@ -883,36 +832,19 @@ lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp,
        if (msg->msg_retry_count > 0)
                return 1;
 
-       /*
-        * Peer appears dead, but we should avoid frequent NI queries (at
-        * most once per lnet_queryinterval seconds).
-        */
-       if (lp->lpni_last_query != 0) {
-               static const int lnet_queryinterval = 1;
-               time64_t next_query;
-
-               next_query = lp->lpni_last_query + lnet_queryinterval;
-
-               if (now < next_query) {
-                       if (lp->lpni_alive)
-                               CWARN("Unexpected aliveness of peer %s: "
-                                     "%lld < %lld (%d/%d)\n",
-                                     libcfs_nid2str(lp->lpni_nid),
-                                     now, next_query,
-                                     lnet_queryinterval,
-                                     lp->lpni_net->net_tunables.lct_peer_timeout);
-                       return 0;
-               }
-       }
-
-       /* query NI for latest aliveness news */
-       lnet_ni_query_locked(ni, lp);
+       /* try and send recovery messages irregardless */
+       if (msg->msg_recovery)
+               return 1;
 
-       if (lnet_peer_is_alive(lp, now))
+       /* always send any responses */
+       if (msg->msg_type == LNET_MSG_ACK ||
+           msg->msg_type == LNET_MSG_REPLY)
                return 1;
 
-       lnet_notify_locked(lp, 0, 0, lp->lpni_last_alive);
-       return 0;
+       if (!lnet_is_peer_deadline_passed(lpni, now))
+               return true;
+
+       return lnet_is_peer_ni_alive(lpni);
 }
 
 /**
@@ -938,6 +870,8 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
        LASSERT(!do_send || msg->msg_tx_delayed);
        LASSERT(!msg->msg_receiving);
        LASSERT(msg->msg_tx_committed);
+       /* can't get here if we're sending to the loopback interface */
+       LASSERT(lp->lpni_nid != the_lnet.ln_loni->ni_nid);
 
        /* NB 'lp' is always the next hop */
        if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
@@ -957,7 +891,7 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
 
                CNETERR("Dropping message for %s: peer not alive\n",
                        libcfs_id2str(msg->msg_target));
-               msg->msg_health_status = LNET_MSG_STATUS_LOCAL_DROPPED;
+               msg->msg_health_status = LNET_MSG_STATUS_REMOTE_DROPPED;
                if (do_send)
                        lnet_finalize(msg, -EHOSTUNREACH);
 
@@ -974,6 +908,8 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
                        libcfs_id2str(msg->msg_target));
                if (do_send) {
                        msg->msg_no_resend = true;
+                       CDEBUG(D_NET, "msg %p to %s canceled and will not be resent\n",
+                              msg, libcfs_id2str(msg->msg_target));
                        lnet_finalize(msg, -ECANCELED);
                }
 
@@ -1059,39 +995,47 @@ lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
         * sets do_recv FALSE and I don't do the unlock/send/lock bit.
         * I return LNET_CREDIT_WAIT if msg blocked and LNET_CREDIT_OK if
         * received or OK to receive */
-       struct lnet_peer_ni *lp = msg->msg_rxpeer;
+       struct lnet_peer_ni *lpni = msg->msg_rxpeer;
+       struct lnet_peer *lp;
        struct lnet_rtrbufpool *rbp;
        struct lnet_rtrbuf *rb;
 
-       LASSERT (msg->msg_iov == NULL);
-       LASSERT (msg->msg_kiov == NULL);
-       LASSERT (msg->msg_niov == 0);
-       LASSERT (msg->msg_routing);
-       LASSERT (msg->msg_receiving);
-       LASSERT (!msg->msg_sending);
+       LASSERT(msg->msg_iov == NULL);
+       LASSERT(msg->msg_kiov == NULL);
+       LASSERT(msg->msg_niov == 0);
+       LASSERT(msg->msg_routing);
+       LASSERT(msg->msg_receiving);
+       LASSERT(!msg->msg_sending);
+       LASSERT(lpni->lpni_peer_net);
+       LASSERT(lpni->lpni_peer_net->lpn_peer);
+
+       lp = lpni->lpni_peer_net->lpn_peer;
 
        /* non-lnet_parse callers only receive delayed messages */
        LASSERT(!do_recv || msg->msg_rx_delayed);
 
        if (!msg->msg_peerrtrcredit) {
-               spin_lock(&lp->lpni_lock);
-               LASSERT((lp->lpni_rtrcredits < 0) ==
-                       !list_empty(&lp->lpni_rtrq));
+               /* lpni_lock protects the credit manipulation */
+               spin_lock(&lpni->lpni_lock);
+               /* lp_lock protects the lp_rtrq */
+               spin_lock(&lp->lp_lock);
 
                msg->msg_peerrtrcredit = 1;
-               lp->lpni_rtrcredits--;
-               if (lp->lpni_rtrcredits < lp->lpni_minrtrcredits)
-                       lp->lpni_minrtrcredits = lp->lpni_rtrcredits;
+               lpni->lpni_rtrcredits--;
+               if (lpni->lpni_rtrcredits < lpni->lpni_minrtrcredits)
+                       lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
 
-               if (lp->lpni_rtrcredits < 0) {
+               if (lpni->lpni_rtrcredits < 0) {
                        /* must have checked eager_recv before here */
                        LASSERT(msg->msg_rx_ready_delay);
                        msg->msg_rx_delayed = 1;
-                       list_add_tail(&msg->msg_list, &lp->lpni_rtrq);
-                       spin_unlock(&lp->lpni_lock);
+                       list_add_tail(&msg->msg_list, &lp->lp_rtrq);
+                       spin_unlock(&lp->lp_lock);
+                       spin_unlock(&lpni->lpni_lock);
                        return LNET_CREDIT_WAIT;
                }
-               spin_unlock(&lp->lpni_lock);
+               spin_unlock(&lp->lp_lock);
+               spin_unlock(&lpni->lpni_lock);
        }
 
        rbp = lnet_msg2bufpool(msg);
@@ -1252,6 +1196,7 @@ lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
                             0, 0, 0, msg->msg_hdr.payload_length);
                list_del_init(&msg->msg_list);
                msg->msg_no_resend = true;
+               msg->msg_health_status = LNET_MSG_STATUS_REMOTE_ERROR;
                lnet_finalize(msg, -ECANCELED);
        }
 
@@ -1261,7 +1206,8 @@ lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
 void
 lnet_return_rx_credits_locked(struct lnet_msg *msg)
 {
-       struct lnet_peer_ni *rxpeer = msg->msg_rxpeer;
+       struct lnet_peer_ni *rxpeerni = msg->msg_rxpeer;
+       struct lnet_peer *lp;
        struct lnet_ni *rxni = msg->msg_rxni;
        struct lnet_msg *msg2;
 
@@ -1311,40 +1257,65 @@ lnet_return_rx_credits_locked(struct lnet_msg *msg)
 
 routing_off:
        if (msg->msg_peerrtrcredit) {
+               LASSERT(rxpeerni);
+               LASSERT(rxpeerni->lpni_peer_net);
+               LASSERT(rxpeerni->lpni_peer_net->lpn_peer);
+
+               lp = rxpeerni->lpni_peer_net->lpn_peer;
+
                /* give back peer router credits */
                msg->msg_peerrtrcredit = 0;
 
-               spin_lock(&rxpeer->lpni_lock);
-               LASSERT((rxpeer->lpni_rtrcredits < 0) ==
-                       !list_empty(&rxpeer->lpni_rtrq));
+               spin_lock(&rxpeerni->lpni_lock);
+               spin_lock(&lp->lp_lock);
 
-               rxpeer->lpni_rtrcredits++;
+               rxpeerni->lpni_rtrcredits++;
 
                /* drop all messages which are queued to be routed on that
                 * peer. */
                if (!the_lnet.ln_routing) {
                        struct list_head drop;
                        INIT_LIST_HEAD(&drop);
-                       list_splice_init(&rxpeer->lpni_rtrq, &drop);
-                       spin_unlock(&rxpeer->lpni_lock);
+                       list_splice_init(&lp->lp_rtrq, &drop);
+                       spin_unlock(&lp->lp_lock);
+                       spin_unlock(&rxpeerni->lpni_lock);
                        lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt);
-               } else if (rxpeer->lpni_rtrcredits <= 0) {
-                       msg2 = list_entry(rxpeer->lpni_rtrq.next,
+               } else if (!list_empty(&lp->lp_rtrq)) {
+                       int msg2_cpt;
+
+                       msg2 = list_entry(lp->lp_rtrq.next,
                                          struct lnet_msg, msg_list);
                        list_del(&msg2->msg_list);
-                       spin_unlock(&rxpeer->lpni_lock);
+                       msg2_cpt = msg2->msg_rx_cpt;
+                       spin_unlock(&lp->lp_lock);
+                       spin_unlock(&rxpeerni->lpni_lock);
+                       /*
+                        * messages on the lp_rtrq can be from any NID in
+                        * the peer, which means they might have different
+                        * cpts. We need to make sure we lock the right
+                        * one.
+                        */
+                       if (msg2_cpt != msg->msg_rx_cpt) {
+                               lnet_net_unlock(msg->msg_rx_cpt);
+                               lnet_net_lock(msg2_cpt);
+                       }
                        (void) lnet_post_routed_recv_locked(msg2, 1);
+                       if (msg2_cpt != msg->msg_rx_cpt) {
+                               lnet_net_unlock(msg2_cpt);
+                               lnet_net_lock(msg->msg_rx_cpt);
+                       }
                } else {
-                       spin_unlock(&rxpeer->lpni_lock);
+                       spin_unlock(&lp->lp_lock);
+                       spin_unlock(&rxpeerni->lpni_lock);
                }
        }
        if (rxni != NULL) {
                msg->msg_rxni = NULL;
                lnet_ni_decref_locked(rxni, msg->msg_rx_cpt);
        }
-       if (rxpeer != NULL) {
+       if (rxpeerni != NULL) {
                msg->msg_rxpeer = NULL;
-               lnet_peer_ni_decref_locked(rxpeer);
+               lnet_peer_ni_decref_locked(rxpeerni);
        }
 }
 
@@ -1366,48 +1337,198 @@ lnet_compare_peers(struct lnet_peer_ni *p1, struct lnet_peer_ni *p2)
        return 0;
 }
 
+static struct lnet_peer_ni *
+lnet_select_peer_ni(struct lnet_ni *best_ni, lnet_nid_t dst_nid,
+                   struct lnet_peer *peer,
+                   struct lnet_peer_net *peer_net)
+{
+       /*
+        * Look at the peer NIs for the destination peer that connect
+        * to the chosen net. If a peer_ni is preferred when using the
+        * best_ni to communicate, we use that one. If there is no
+        * preferred peer_ni, or there are multiple preferred peer_ni,
+        * the available transmit credits are used. If the transmit
+        * credits are equal, we round-robin over the peer_ni.
+        */
+       struct lnet_peer_ni *lpni = NULL;
+       struct lnet_peer_ni *best_lpni = NULL;
+       int best_lpni_credits = INT_MIN;
+       bool preferred = false;
+       bool ni_is_pref;
+       int best_lpni_healthv = 0;
+       int lpni_healthv;
+
+       while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
+               /*
+                * if the best_ni we've chosen aleady has this lpni
+                * preferred, then let's use it
+                */
+               if (best_ni) {
+                       ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
+                                                               best_ni->ni_nid);
+                       CDEBUG(D_NET, "%s ni_is_pref = %d\n",
+                              libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
+               } else {
+                       ni_is_pref = false;
+               }
+
+               lpni_healthv = atomic_read(&lpni->lpni_healthv);
+
+               if (best_lpni)
+                       CDEBUG(D_NET, "%s c:[%d, %d], s:[%d, %d]\n",
+                               libcfs_nid2str(lpni->lpni_nid),
+                               lpni->lpni_txcredits, best_lpni_credits,
+                               lpni->lpni_seq, best_lpni->lpni_seq);
+
+               /* pick the healthiest peer ni */
+               if (lpni_healthv < best_lpni_healthv) {
+                       continue;
+               } else if (lpni_healthv > best_lpni_healthv) {
+                       best_lpni_healthv = lpni_healthv;
+               /* if this is a preferred peer use it */
+               } else if (!preferred && ni_is_pref) {
+                       preferred = true;
+               } else if (preferred && !ni_is_pref) {
+                       /*
+                        * this is not the preferred peer so let's ignore
+                        * it.
+                        */
+                       continue;
+               } else if (lpni->lpni_txcredits < best_lpni_credits) {
+                       /*
+                        * We already have a peer that has more credits
+                        * available than this one. No need to consider
+                        * this peer further.
+                        */
+                       continue;
+               } else if (lpni->lpni_txcredits == best_lpni_credits) {
+                       /*
+                        * The best peer found so far and the current peer
+                        * have the same number of available credits let's
+                        * make sure to select between them using Round
+                        * Robin
+                        */
+                       if (best_lpni) {
+                               if (best_lpni->lpni_seq <= lpni->lpni_seq)
+                                       continue;
+                       }
+               }
+
+               best_lpni = lpni;
+               best_lpni_credits = lpni->lpni_txcredits;
+       }
+
+       /* if we still can't find a peer ni then we can't reach it */
+       if (!best_lpni) {
+               __u32 net_id = (peer_net) ? peer_net->lpn_net_id :
+                       LNET_NIDNET(dst_nid);
+               CDEBUG(D_NET, "no peer_ni found on peer net %s\n",
+                               libcfs_net2str(net_id));
+               return NULL;
+       }
+
+       CDEBUG(D_NET, "sd_best_lpni = %s\n",
+              libcfs_nid2str(best_lpni->lpni_nid));
+
+       return best_lpni;
+}
+
+/*
+ * Prerequisite: the best_ni should already be set in the sd
+ */
+static inline struct lnet_peer_ni *
+lnet_find_best_lpni_on_net(struct lnet_send_data *sd, struct lnet_peer *peer,
+                          __u32 net_id)
+{
+       struct lnet_peer_net *peer_net;
+
+       /*
+        * The gateway is Multi-Rail capable so now we must select the
+        * proper peer_ni
+        */
+       peer_net = lnet_peer_get_net_locked(peer, net_id);
+
+       if (!peer_net) {
+               CERROR("gateway peer %s has no NI on net %s\n",
+                      libcfs_nid2str(peer->lp_primary_nid),
+                      libcfs_net2str(net_id));
+               return NULL;
+       }
+
+       return lnet_select_peer_ni(sd->sd_best_ni, sd->sd_dst_nid,
+                                  peer, peer_net);
+}
+
 static int
-lnet_compare_routes(struct lnet_route *r1, struct lnet_route *r2)
+lnet_compare_routes(struct lnet_route *r1, struct lnet_route *r2,
+                   struct lnet_peer_ni **best_lpni)
 {
-       struct lnet_peer_ni *p1 = r1->lr_gateway;
-       struct lnet_peer_ni *p2 = r2->lr_gateway;
        int r1_hops = (r1->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r1->lr_hops;
        int r2_hops = (r2->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r2->lr_hops;
+       struct lnet_peer *lp1 = r1->lr_gateway;
+       struct lnet_peer *lp2 = r2->lr_gateway;
+       struct lnet_peer_ni *lpni1;
+       struct lnet_peer_ni *lpni2;
+       struct lnet_send_data sd;
        int rc;
 
-       if (r1->lr_priority < r2->lr_priority)
+       sd.sd_best_ni = NULL;
+       sd.sd_dst_nid = LNET_NID_ANY;
+       lpni1 = lnet_find_best_lpni_on_net(&sd, lp1, r1->lr_lnet);
+       lpni2 = lnet_find_best_lpni_on_net(&sd, lp2, r2->lr_lnet);
+       LASSERT(lpni1 && lpni2);
+
+       if (r1->lr_priority < r2->lr_priority) {
+               *best_lpni = lpni1;
                return 1;
+       }
 
-       if (r1->lr_priority > r2->lr_priority)
+       if (r1->lr_priority > r2->lr_priority) {
+               *best_lpni = lpni2;
                return -1;
+       }
 
-       if (r1_hops < r2_hops)
+       if (r1_hops < r2_hops) {
+               *best_lpni = lpni1;
                return 1;
+       }
 
-       if (r1_hops > r2_hops)
+       if (r1_hops > r2_hops) {
+               *best_lpni = lpni2;
                return -1;
+       }
 
-       rc = lnet_compare_peers(p1, p2);
-       if (rc)
+       rc = lnet_compare_peers(lpni1, lpni2);
+       if (rc == 1) {
+               *best_lpni = lpni1;
                return rc;
+       } else if (rc == -1) {
+               *best_lpni = lpni2;
+               return rc;
+       }
 
-       if (r1->lr_seq - r2->lr_seq <= 0)
+       if (r1->lr_seq - r2->lr_seq <= 0) {
+               *best_lpni = lpni1;
                return 1;
+       }
 
+       *best_lpni = lpni2;
        return -1;
 }
 
-static struct lnet_peer_ni *
+static struct lnet_route *
 lnet_find_route_locked(struct lnet_net *net, __u32 remote_net,
-                      lnet_nid_t rtr_nid)
+                      lnet_nid_t rtr_nid, struct lnet_route **prev_route,
+                      struct lnet_peer_ni **gwni)
 {
-       struct lnet_remotenet   *rnet;
-       struct lnet_route               *route;
-       struct lnet_route               *best_route;
-       struct lnet_route               *last_route;
-       struct lnet_peer_ni     *lpni_best;
-       struct lnet_peer_ni     *lp;
-       int                     rc;
+       struct lnet_peer_ni *best_gw_ni = NULL;
+       struct lnet_route *best_route;
+       struct lnet_route *last_route;
+       struct lnet_remotenet *rnet;
+       struct lnet_peer *lp_best;
+       struct lnet_route *route;
+       struct lnet_peer *lp;
+       int rc;
 
        /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
         * rtr_nid nid, otherwise find the best gateway I can use */
@@ -1416,7 +1537,7 @@ lnet_find_route_locked(struct lnet_net *net, __u32 remote_net,
        if (rnet == NULL)
                return NULL;
 
-       lpni_best = NULL;
+       lp_best = NULL;
        best_route = last_route = NULL;
        list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
                lp = route->lr_gateway;
@@ -1424,36 +1545,27 @@ lnet_find_route_locked(struct lnet_net *net, __u32 remote_net,
                if (!lnet_is_route_alive(route))
                        continue;
 
-               if (net != NULL && lp->lpni_net != net)
-                       continue;
-
-               if (lp->lpni_nid == rtr_nid) /* it's pre-determined router */
-                       return lp;
-
-               if (lpni_best == NULL) {
+               if (lp_best == NULL) {
                        best_route = last_route = route;
-                       lpni_best = lp;
-                       continue;
+                       lp_best = lp;
                }
 
                /* no protection on below fields, but it's harmless */
                if (last_route->lr_seq - route->lr_seq < 0)
                        last_route = route;
 
-               rc = lnet_compare_routes(route, best_route);
+               rc = lnet_compare_routes(route, best_route, &best_gw_ni);
                if (rc < 0)
                        continue;
 
                best_route = route;
-               lpni_best = lp;
+               lp_best = lp;
        }
 
-       /* set sequence number on the best router to the latest sequence + 1
-        * so we can round-robin all routers, it's race and inaccurate but
-        * harmless and functional  */
-       if (best_route != NULL)
-               best_route->lr_seq = last_route->lr_seq + 1;
-       return lpni_best;
+       *prev_route = last_route;
+       *gwni = best_gw_ni;
+
+       return best_route;
 }
 
 static struct lnet_ni *
@@ -1599,6 +1711,25 @@ lnet_msg_discovery(struct lnet_msg *msg)
 #define SRC_ANY_ROUTER_NMR_DST (SRC_ANY | REMOTE_DST | NMR_DST)
 
 static int
+lnet_handle_lo_send(struct lnet_send_data *sd)
+{
+       struct lnet_msg *msg = sd->sd_msg;
+       int cpt = sd->sd_cpt;
+
+       /* No send credit hassles with LOLND */
+       lnet_ni_addref_locked(the_lnet.ln_loni, cpt);
+       msg->msg_hdr.dest_nid = cpu_to_le64(the_lnet.ln_loni->ni_nid);
+       if (!msg->msg_routing)
+               msg->msg_hdr.src_nid =
+                       cpu_to_le64(the_lnet.ln_loni->ni_nid);
+       msg->msg_target.nid = the_lnet.ln_loni->ni_nid;
+       lnet_msg_commit(msg, cpt);
+       msg->msg_txni = the_lnet.ln_loni;
+
+       return LNET_CREDIT_OK;
+}
+
+static int
 lnet_handle_send(struct lnet_send_data *sd)
 {
        struct lnet_ni *best_ni = sd->sd_best_ni;
@@ -1730,125 +1861,6 @@ lnet_handle_send(struct lnet_send_data *sd)
        return rc;
 }
 
-static struct lnet_peer_ni *
-lnet_select_peer_ni(struct lnet_send_data *sd, struct lnet_peer *peer,
-                   struct lnet_peer_net *peer_net)
-{
-       /*
-        * Look at the peer NIs for the destination peer that connect
-        * to the chosen net. If a peer_ni is preferred when using the
-        * best_ni to communicate, we use that one. If there is no
-        * preferred peer_ni, or there are multiple preferred peer_ni,
-        * the available transmit credits are used. If the transmit
-        * credits are equal, we round-robin over the peer_ni.
-        */
-       struct lnet_peer_ni *lpni = NULL;
-       struct lnet_peer_ni *best_lpni = NULL;
-       struct lnet_ni *best_ni = sd->sd_best_ni;
-       lnet_nid_t dst_nid = sd->sd_dst_nid;
-       int best_lpni_credits = INT_MIN;
-       bool preferred = false;
-       bool ni_is_pref;
-       int best_lpni_healthv = 0;
-       int lpni_healthv;
-
-       while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
-               /*
-                * if the best_ni we've chosen aleady has this lpni
-                * preferred, then let's use it
-                */
-               ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
-                                                         best_ni->ni_nid);
-
-               lpni_healthv = atomic_read(&lpni->lpni_healthv);
-
-               CDEBUG(D_NET, "%s ni_is_pref = %d\n",
-                      libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
-
-               if (best_lpni)
-                       CDEBUG(D_NET, "%s c:[%d, %d], s:[%d, %d]\n",
-                               libcfs_nid2str(lpni->lpni_nid),
-                               lpni->lpni_txcredits, best_lpni_credits,
-                               lpni->lpni_seq, best_lpni->lpni_seq);
-
-               /* pick the healthiest peer ni */
-               if (lpni_healthv < best_lpni_healthv) {
-                       continue;
-               } else if (lpni_healthv > best_lpni_healthv) {
-                       best_lpni_healthv = lpni_healthv;
-               /* if this is a preferred peer use it */
-               } else if (!preferred && ni_is_pref) {
-                       preferred = true;
-               } else if (preferred && !ni_is_pref) {
-                       /*
-                        * this is not the preferred peer so let's ignore
-                        * it.
-                        */
-                       continue;
-               } else if (lpni->lpni_txcredits < best_lpni_credits) {
-                       /*
-                        * We already have a peer that has more credits
-                        * available than this one. No need to consider
-                        * this peer further.
-                        */
-                       continue;
-               } else if (lpni->lpni_txcredits == best_lpni_credits) {
-                       /*
-                        * The best peer found so far and the current peer
-                        * have the same number of available credits let's
-                        * make sure to select between them using Round
-                        * Robin
-                        */
-                       if (best_lpni) {
-                               if (best_lpni->lpni_seq <= lpni->lpni_seq)
-                                       continue;
-                       }
-               }
-
-               best_lpni = lpni;
-               best_lpni_credits = lpni->lpni_txcredits;
-       }
-
-       /* if we still can't find a peer ni then we can't reach it */
-       if (!best_lpni) {
-               __u32 net_id = (peer_net) ? peer_net->lpn_net_id :
-                       LNET_NIDNET(dst_nid);
-               CDEBUG(D_NET, "no peer_ni found on peer net %s\n",
-                               libcfs_net2str(net_id));
-               return NULL;
-       }
-
-       CDEBUG(D_NET, "sd_best_lpni = %s\n",
-              libcfs_nid2str(best_lpni->lpni_nid));
-
-       return best_lpni;
-}
-
-/*
- * Prerequisite: the best_ni should already be set in the sd
- */
-static inline struct lnet_peer_ni *
-lnet_find_best_lpni_on_net(struct lnet_send_data *sd, struct lnet_peer *peer,
-                          __u32 net_id)
-{
-       struct lnet_peer_net *peer_net;
-
-       /*
-        * The gateway is Multi-Rail capable so now we must select the
-        * proper peer_ni
-        */
-       peer_net = lnet_peer_get_net_locked(peer, net_id);
-
-       if (!peer_net) {
-               CERROR("gateway peer %s has no NI on net %s\n",
-                      libcfs_nid2str(peer->lp_primary_nid),
-                      libcfs_net2str(net_id));
-               return NULL;
-       }
-
-       return lnet_select_peer_ni(sd, peer, peer_net);
-}
-
 static inline void
 lnet_set_non_mr_pref_nid(struct lnet_send_data *sd)
 {
@@ -1923,7 +1935,10 @@ lnet_handle_spec_local_mr_dst(struct lnet_send_data *sd)
                                             sd->sd_best_ni->ni_net->net_id);
        }
 
-       if (sd->sd_best_lpni)
+       if (sd->sd_best_lpni &&
+           sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid)
+               return lnet_handle_lo_send(sd);
+       else if (sd->sd_best_lpni)
                return lnet_handle_send(sd);
 
        CERROR("can't send to %s. no NI on %s\n",
@@ -1965,62 +1980,157 @@ lnet_find_best_ni_on_spec_net(struct lnet_ni *cur_best_ni,
 }
 
 static int
+lnet_initiate_peer_discovery(struct lnet_peer_ni *lpni,
+                            struct lnet_msg *msg, lnet_nid_t rtr_nid,
+                            int cpt)
+{
+       struct lnet_peer *peer;
+       lnet_nid_t primary_nid;
+       int rc;
+
+       lnet_peer_ni_addref_locked(lpni);
+
+       peer = lpni->lpni_peer_net->lpn_peer;
+
+       if (lnet_peer_gw_discovery(peer)) {
+               lnet_peer_ni_decref_locked(lpni);
+               return 0;
+       }
+
+       if (!lnet_msg_discovery(msg) || lnet_peer_is_uptodate(peer)) {
+               lnet_peer_ni_decref_locked(lpni);
+               return 0;
+       }
+
+       rc = lnet_discover_peer_locked(lpni, cpt, false);
+       if (rc) {
+               lnet_peer_ni_decref_locked(lpni);
+               return rc;
+       }
+       /* The peer may have changed. */
+       peer = lpni->lpni_peer_net->lpn_peer;
+       /* queue message and return */
+       msg->msg_rtr_nid_param = rtr_nid;
+       msg->msg_sending = 0;
+       msg->msg_txpeer = NULL;
+       spin_lock(&peer->lp_lock);
+       list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
+       spin_unlock(&peer->lp_lock);
+       lnet_peer_ni_decref_locked(lpni);
+       primary_nid = peer->lp_primary_nid;
+
+       CDEBUG(D_NET, "msg %p delayed. %s pending discovery\n",
+               msg, libcfs_nid2str(primary_nid));
+
+       return LNET_DC_WAIT;
+}
+
+static int
 lnet_handle_find_routed_path(struct lnet_send_data *sd,
                             lnet_nid_t dst_nid,
                             struct lnet_peer_ni **gw_lpni,
                             struct lnet_peer **gw_peer)
 {
-       struct lnet_peer_ni *gw;
+       int rc;
+       struct lnet_peer *gw;
+       struct lnet_peer *lp;
+       struct lnet_peer_net *lpn;
+       struct lnet_peer_net *best_lpn = NULL;
+       struct lnet_remotenet *rnet;
+       struct lnet_route *best_route;
+       struct lnet_route *last_route;
+       struct lnet_peer_ni *lpni = NULL;
+       struct lnet_peer_ni *gwni = NULL;
        lnet_nid_t src_nid = sd->sd_src_nid;
 
-       gw = lnet_find_route_locked(NULL, LNET_NIDNET(dst_nid),
-                                   sd->sd_rtr_nid);
-       if (!gw) {
+       /* we've already looked up the initial lpni using dst_nid */
+       lpni = sd->sd_best_lpni;
+       /* the peer tree must be in existence */
+       LASSERT(lpni && lpni->lpni_peer_net && lpni->lpni_peer_net->lpn_peer);
+       lp = lpni->lpni_peer_net->lpn_peer;
+
+       list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
+               /* is this remote network reachable?  */
+               rnet = lnet_find_rnet_locked(lpn->lpn_net_id);
+               if (!rnet)
+                       continue;
+
+               if (!best_lpn)
+                       best_lpn = lpn;
+
+               if (best_lpn->lpn_seq <= lpn->lpn_seq)
+                       continue;
+
+               best_lpn = lpn;
+       }
+
+       if (!best_lpn) {
+               CERROR("peer %s has no available nets \n",
+                      libcfs_nid2str(sd->sd_dst_nid));
+               return -EHOSTUNREACH;
+       }
+
+       sd->sd_best_lpni = lnet_find_best_lpni_on_net(sd, lp, best_lpn->lpn_net_id);
+       if (!sd->sd_best_lpni) {
+               CERROR("peer %s down\n", libcfs_nid2str(sd->sd_dst_nid));
+               return -EHOSTUNREACH;
+       }
+
+       best_route = lnet_find_route_locked(NULL, best_lpn->lpn_net_id,
+                                           sd->sd_rtr_nid, &last_route,
+                                           &gwni);
+       if (!best_route) {
                CERROR("no route to %s from %s\n",
                       libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
                return -EHOSTUNREACH;
        }
 
-       /* get the peer of the gw_ni */
-       LASSERT(gw->lpni_peer_net);
-       LASSERT(gw->lpni_peer_net->lpn_peer);
+       if (!gwni) {
+               CERROR("Internal Error. Route expected to %s from %s\n",
+                       libcfs_nid2str(dst_nid),
+                       libcfs_nid2str(src_nid));
+               return -EFAULT;
+       }
+
+       gw = best_route->lr_gateway;
+       LASSERT(gw == gwni->lpni_peer_net->lpn_peer);
 
-       *gw_peer = gw->lpni_peer_net->lpn_peer;
+       /*
+        * Discover this gateway if it hasn't already been discovered.
+        * This means we might delay the message until discovery has
+        * completed
+        */
+       sd->sd_msg->msg_src_nid_param = sd->sd_src_nid;
+       rc = lnet_initiate_peer_discovery(gwni, sd->sd_msg, sd->sd_rtr_nid,
+                                         sd->sd_cpt);
+       if (rc)
+               return rc;
 
        if (!sd->sd_best_ni)
-               sd->sd_best_ni = lnet_find_best_ni_on_spec_net(NULL, *gw_peer,
-                                       gw->lpni_peer_net,
+               sd->sd_best_ni = lnet_find_best_ni_on_spec_net(NULL, gw,
+                                       lnet_peer_get_net_locked(gw,
+                                               best_route->lr_lnet),
                                        sd->sd_md_cpt,
                                        true);
 
        if (!sd->sd_best_ni) {
                CERROR("Internal Error. Expected local ni on %s "
                       "but non found :%s\n",
-                      libcfs_net2str(gw->lpni_peer_net->lpn_net_id),
+                      libcfs_net2str(best_route->lr_lnet),
                       libcfs_nid2str(sd->sd_src_nid));
                return -EFAULT;
        }
 
+       *gw_lpni = gwni;
+       *gw_peer = gw;
+
        /*
-        * if gw is MR let's find its best peer_ni
+        * increment the sequence numbers since now we're sure we're
+        * going to use this path
         */
-       if (lnet_peer_is_multi_rail(*gw_peer)) {
-               gw = lnet_find_best_lpni_on_net(sd, *gw_peer,
-                                               sd->sd_best_ni->ni_net->net_id);
-               /*
-                * We've already verified that the gw has an NI on that
-                * desired net, but we're not finding it. Something is
-                * wrong.
-                */
-               if (!gw) {
-                       CERROR("Internal Error. Route expected to %s from %s\n",
-                               libcfs_nid2str(dst_nid),
-                               libcfs_nid2str(src_nid));
-                       return -EFAULT;
-               }
-       }
-
-       *gw_lpni = gw;
+       LASSERT(best_route && last_route);
+       best_route->lr_seq = last_route->lr_seq + 1;
+       best_lpn->lpn_seq++;
 
        return 0;
 }
@@ -2059,7 +2169,7 @@ lnet_handle_spec_router_dst(struct lnet_send_data *sd)
 
        rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
                                     &gw_peer);
-       if (rc < 0)
+       if (rc)
                return rc;
 
        if (sd->sd_send_case & NMR_DST)
@@ -2080,7 +2190,8 @@ lnet_handle_spec_router_dst(struct lnet_send_data *sd)
 }
 
 struct lnet_ni *
-lnet_find_best_ni_on_local_net(struct lnet_peer *peer, int md_cpt)
+lnet_find_best_ni_on_local_net(struct lnet_peer *peer, int md_cpt,
+                              bool discovery)
 {
        struct lnet_peer_net *peer_net = NULL;
        struct lnet_ni *best_ni = NULL;
@@ -2102,6 +2213,14 @@ lnet_find_best_ni_on_local_net(struct lnet_peer *peer, int md_cpt)
                        continue;
                best_ni = lnet_find_best_ni_on_spec_net(best_ni, peer,
                                                   peer_net, md_cpt, false);
+
+               /*
+                * if this is a discovery message and lp_disc_net_id is
+                * specified then use that net to send the discovery on.
+                */
+               if (peer->lp_disc_net_id == peer_net->lpn_net_id &&
+                   discovery)
+                       break;
        }
 
        if (best_ni)
@@ -2271,7 +2390,8 @@ lnet_handle_any_mr_dsta(struct lnet_send_data *sd)
         * networks.
         */
        sd->sd_best_ni = lnet_find_best_ni_on_local_net(sd->sd_peer,
-                                                       sd->sd_md_cpt);
+                                       sd->sd_md_cpt,
+                                       lnet_msg_discovery(sd->sd_msg));
        if (sd->sd_best_ni) {
                sd->sd_best_lpni =
                  lnet_find_best_lpni_on_net(sd, sd->sd_peer,
@@ -2283,7 +2403,16 @@ lnet_handle_any_mr_dsta(struct lnet_send_data *sd)
                 * try and see if we can reach it over another routed
                 * network
                 */
-               if (sd->sd_best_lpni) {
+               if (sd->sd_best_lpni &&
+                   sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid) {
+                       /*
+                        * in case we initially started with a routed
+                        * destination, let's reset to local
+                        */
+                       sd->sd_send_case &= ~REMOTE_DST;
+                       sd->sd_send_case |= LOCAL_DST;
+                       return lnet_handle_lo_send(sd);
+               } else if (sd->sd_best_lpni) {
                        /*
                         * in case we initially started with a routed
                         * destination, let's reset to local
@@ -2349,6 +2478,8 @@ lnet_handle_any_mr_dst(struct lnet_send_data *sd)
                               "No route available\n",
                                libcfs_nid2str(sd->sd_dst_nid));
                        return -EHOSTUNREACH;
+               } else if (rc > 0) {
+                       return rc;
                }
 
                sd->sd_best_lpni = gw;
@@ -2370,15 +2501,15 @@ lnet_handle_any_mr_dst(struct lnet_send_data *sd)
                return rc;
 
        /*
-        * TODO; One possible enhancement is to run the selection
-        * algorithm on the peer. However for remote peers the credits are
-        * not decremented, so we'll be basically going over the peer NIs
-        * in round robin. An MR router will run the selection algorithm
-        * on the next-hop interfaces.
+        * Now that we must route to the destination, we must consider the
+        * MR case, where the destination has multiple interfaces, some of
+        * which we can route to and others we do not. For this reason we
+        * need to select the destination which we can route to and if
+        * there are multiple, we need to round robin.
         */
        rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
                                          &gw_peer);
-       if (rc < 0)
+       if (rc)
                return rc;
 
        sd->sd_send_case &= ~LOCAL_DST;
@@ -2417,7 +2548,7 @@ lnet_handle_any_router_nmr_dst(struct lnet_send_data *sd)
         */
        rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
                                          &gw_peer);
-       if (rc < 0)
+       if (rc)
                return rc;
 
        /*
@@ -2511,19 +2642,12 @@ again:
         * is no need to go through any selection. We can just shortcut
         * the entire process and send over lolnd
         */
+       send_data.sd_msg = msg;
+       send_data.sd_cpt = cpt;
        if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) {
-               /* No send credit hassles with LOLND */
-               lnet_ni_addref_locked(the_lnet.ln_loni, cpt);
-               msg->msg_hdr.dest_nid = cpu_to_le64(the_lnet.ln_loni->ni_nid);
-               if (!msg->msg_routing)
-                       msg->msg_hdr.src_nid =
-                               cpu_to_le64(the_lnet.ln_loni->ni_nid);
-               msg->msg_target.nid = the_lnet.ln_loni->ni_nid;
-               lnet_msg_commit(msg, cpt);
-               msg->msg_txni = the_lnet.ln_loni;
+               rc = lnet_handle_lo_send(&send_data);
                lnet_net_unlock(cpt);
-
-               return LNET_CREDIT_OK;
+               return rc;
        }
 
        /*
@@ -2552,28 +2676,11 @@ again:
         * trigger discovery.
         */
        peer = lpni->lpni_peer_net->lpn_peer;
-       if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) {
-               lnet_nid_t primary_nid;
-               rc = lnet_discover_peer_locked(lpni, cpt, false);
-               if (rc) {
-                       lnet_peer_ni_decref_locked(lpni);
-                       lnet_net_unlock(cpt);
-                       return rc;
-               }
-               /* The peer may have changed. */
-               peer = lpni->lpni_peer_net->lpn_peer;
-               /* queue message and return */
-               msg->msg_rtr_nid_param = rtr_nid;
-               msg->msg_sending = 0;
-               list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
+       rc = lnet_initiate_peer_discovery(lpni, msg, rtr_nid, cpt);
+       if (rc) {
                lnet_peer_ni_decref_locked(lpni);
-               primary_nid = peer->lp_primary_nid;
                lnet_net_unlock(cpt);
-
-               CDEBUG(D_NET, "%s pending discovery\n",
-                      libcfs_nid2str(primary_nid));
-
-               return LNET_DC_WAIT;
+               return rc;
        }
        lnet_peer_ni_decref_locked(lpni);
 
@@ -2605,7 +2712,6 @@ again:
                send_case |= SND_RESP;
 
        /* assign parameters to the send_data */
-       send_data.sd_msg = msg;
        send_data.sd_rtr_nid = rtr_nid;
        send_data.sd_src_nid = src_nid;
        send_data.sd_dst_nid = dst_nid;
@@ -2617,15 +2723,20 @@ again:
        send_data.sd_final_dst_lpni = lpni;
        send_data.sd_peer = peer;
        send_data.sd_md_cpt = md_cpt;
-       send_data.sd_cpt = cpt;
        send_data.sd_send_case = send_case;
 
        rc = lnet_handle_send_case_locked(&send_data);
 
+       /*
+        * Update the local cpt since send_data.sd_cpt might've been
+        * updated as a result of calling lnet_handle_send_case_locked().
+        */
+       cpt = send_data.sd_cpt;
+
        if (rc == REPEAT_SEND)
                goto again;
 
-       lnet_net_unlock(send_data.sd_cpt);
+       lnet_net_unlock(cpt);
 
        return rc;
 }
@@ -2653,8 +2764,13 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
        LASSERT(!msg->msg_tx_committed);
 
        rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid);
-       if (rc < 0)
+       if (rc < 0) {
+               if (rc == -EHOSTUNREACH)
+                       msg->msg_health_status = LNET_MSG_STATUS_REMOTE_ERROR;
+               else
+                       msg->msg_health_status = LNET_MSG_STATUS_LOCAL_ERROR;
                return rc;
+       }
 
        if (rc == LNET_CREDIT_OK)
                lnet_ni_send(msg->msg_txni, msg);
@@ -3368,19 +3484,24 @@ lnet_monitor_thread(void *arg)
                 * if we wake up every 1 second? Although, we've seen
                 * cases where we get a complaint that an idle thread
                 * is waking up unnecessarily.
+                *
+                * Take into account the current net_count when you wake
+                * up for alive router checking, since we need to check
+                * possibly as many networks as we have configured.
                 */
                interval = min(lnet_recovery_interval,
-                              lnet_transaction_timeout / 2);
+                              min((unsigned int) alive_router_check_interval /
+                                       lnet_current_net_count,
+                                  lnet_transaction_timeout / 2));
                wait_event_interruptible_timeout(the_lnet.ln_mt_waitq,
                                                false,
                                                cfs_time_seconds(interval));
        }
 
-       /* clean up the router checker */
-       lnet_prune_rc_data(1);
-
        /* Shutting down */
+       lnet_net_lock(LNET_LOCK_EX);
        the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+       lnet_net_unlock(LNET_LOCK_EX);
 
        /* signal that the monitor thread is exiting */
        up(&the_lnet.ln_mt_signal);
@@ -3453,7 +3574,7 @@ fail_error:
 
 static void
 lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
-                          int status)
+                          int status, bool unlink_event)
 {
        lnet_nid_t nid = ev_info->mt_nid;
 
@@ -3485,7 +3606,8 @@ lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
                 * carry forward too much information.
                 * In the peer case, it'll naturally be incremented
                 */
-               lnet_inc_healthv(&ni->ni_healthv);
+               if (!unlink_event)
+                       lnet_inc_healthv(&ni->ni_healthv);
        } else {
                struct lnet_peer_ni *lpni;
                int cpt;
@@ -3510,7 +3632,7 @@ lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
        }
 }
 
-static void
+void
 lnet_mt_event_handler(struct lnet_event *event)
 {
        struct lnet_mt_event_info *ev_info = event->md.user_ptr;
@@ -3529,14 +3651,14 @@ lnet_mt_event_handler(struct lnet_event *event)
                CDEBUG(D_NET, "%s recovery ping unlinked\n",
                       libcfs_nid2str(ev_info->mt_nid));
        case LNET_EVENT_REPLY:
-               lnet_handle_recovery_reply(ev_info, event->status);
+               lnet_handle_recovery_reply(ev_info, event->status,
+                                          event->type == LNET_EVENT_UNLINK);
                break;
        case LNET_EVENT_SEND:
                CDEBUG(D_NET, "%s recovery message sent %s:%d\n",
                               libcfs_nid2str(ev_info->mt_nid),
                               (event->status) ? "unsuccessfully" :
                               "successfully", event->status);
-               lnet_handle_recovery_reply(ev_info, event->status);
                break;
        default:
                CERROR("Unexpected event: %d\n", event->type);
@@ -3588,20 +3710,11 @@ int lnet_monitor_thr_start(void)
        if (rc)
                goto clean_queues;
 
-       rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh);
-       if (rc != 0) {
-               CERROR("Can't allocate monitor thread EQ: %d\n", rc);
-               goto clean_queues;
-       }
-
-       /* Pre monitor thread start processing */
-       rc = lnet_router_pre_mt_start();
-       if (rc)
-               goto free_mem;
-
        sema_init(&the_lnet.ln_mt_signal, 0);
 
+       lnet_net_lock(LNET_LOCK_EX);
        the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING;
+       lnet_net_unlock(LNET_LOCK_EX);
        task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
@@ -3609,24 +3722,22 @@ int lnet_monitor_thr_start(void)
                goto clean_thread;
        }
 
-       /* post monitor thread start processing */
-       lnet_router_post_mt_start();
-
        return 0;
 
 clean_thread:
+       lnet_net_lock(LNET_LOCK_EX);
        the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
+       lnet_net_unlock(LNET_LOCK_EX);
        /* block until event callback signals exit */
        down(&the_lnet.ln_mt_signal);
        /* clean up */
-       lnet_router_cleanup();
-free_mem:
+       lnet_net_lock(LNET_LOCK_EX);
        the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+       lnet_net_unlock(LNET_LOCK_EX);
        lnet_rsp_tracker_clean();
        lnet_clean_local_ni_recoveryq();
        lnet_clean_peer_ni_recoveryq();
        lnet_clean_resendqs();
-       LNetEQFree(the_lnet.ln_mt_eqh);
        LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
        return rc;
 clean_queues:
@@ -3639,13 +3750,13 @@ clean_queues:
 
 void lnet_monitor_thr_stop(void)
 {
-       int rc;
-
        if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
                return;
 
        LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
+       lnet_net_lock(LNET_LOCK_EX);
        the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
+       lnet_net_unlock(LNET_LOCK_EX);
 
        /* tell the monitor thread that we're shutting down */
        wake_up(&the_lnet.ln_mt_waitq);
@@ -3655,13 +3766,11 @@ void lnet_monitor_thr_stop(void)
        LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN);
 
        /* perform cleanup tasks */
-       lnet_router_cleanup();
        lnet_rsp_tracker_clean();
        lnet_clean_local_ni_recoveryq();
        lnet_clean_peer_ni_recoveryq();
        lnet_clean_resendqs();
-       rc = LNetEQFree(the_lnet.ln_mt_eqh);
-       LASSERT(rc == 0);
+
        return;
 }
 
@@ -4084,16 +4193,17 @@ int
 lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
           void *private, int rdma_req)
 {
-       int             rc = 0;
-       int             cpt;
-       int             for_me;
-       struct lnet_msg *msg;
-       lnet_pid_t     dest_pid;
-       lnet_nid_t     dest_nid;
-       lnet_nid_t     src_nid;
        struct lnet_peer_ni *lpni;
-       __u32          payload_length;
-       __u32          type;
+       struct lnet_msg *msg;
+       __u32 payload_length;
+       lnet_pid_t dest_pid;
+       lnet_nid_t dest_nid;
+       lnet_nid_t src_nid;
+       bool push = false;
+       int for_me;
+       __u32 type;
+       int rc = 0;
+       int cpt;
 
        LASSERT (!in_interrupt ());
 
@@ -4148,16 +4258,22 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
        }
 
        if (the_lnet.ln_routing &&
-           ni->ni_last_alive != ktime_get_real_seconds()) {
-               /* NB: so far here is the only place to set NI status to "up */
+           ni->ni_net->net_last_alive != ktime_get_real_seconds()) {
                lnet_ni_lock(ni);
-               ni->ni_last_alive = ktime_get_real_seconds();
+               spin_lock(&ni->ni_net->net_lock);
+               ni->ni_net->net_last_alive = ktime_get_real_seconds();
+               spin_unlock(&ni->ni_net->net_lock);
                if (ni->ni_status != NULL &&
-                   ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
+                   ni->ni_status->ns_status == LNET_NI_STATUS_DOWN) {
                        ni->ni_status->ns_status = LNET_NI_STATUS_UP;
+                       push = true;
+               }
                lnet_ni_unlock(ni);
        }
 
+       if (push)
+               lnet_push_update_to_peers(1);
+
        /* Regard a bad destination NID as a protocol error.  Senders should
         * know what they're doing; if they don't they're misconfigured, buggy
         * or malicious so we chop them off at the knees :) */
@@ -4215,7 +4331,7 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
        }
 
        if (!list_empty(&the_lnet.ln_drop_rules) &&
-           lnet_drop_rule_match(hdr, NULL)) {
+           lnet_drop_rule_match(hdr, ni->ni_nid, NULL)) {
                CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate"
                              "silent message loss\n",
                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
@@ -4243,17 +4359,25 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
 
                rnet = lnet_find_rnet_locked(LNET_NIDNET(src_nid));
                if (rnet) {
-                       struct lnet_peer_ni *gw = NULL;
+                       struct lnet_peer *gw = NULL;
+                       struct lnet_peer_ni *lpni = NULL;
                        struct lnet_route *route;
 
                        list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
                                found = false;
                                gw = route->lr_gateway;
-                               if (gw->lpni_net != net)
+                               if (route->lr_lnet != net->net_id)
                                        continue;
-                               if (gw->lpni_nid == from_nid) {
-                                       found = true;
-                                       break;
+                               /*
+                                * if the nid is one of the gateway's NIDs
+                                * then this is a valid gateway
+                                */
+                               while ((lpni = lnet_get_next_peer_ni_locked(gw,
+                                               NULL, lpni)) != NULL) {
+                                       if (lpni->lpni_nid == from_nid) {
+                                               found = true;
+                                               break;
+                                       }
                                }
                        }
                }
@@ -4319,24 +4443,22 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
                        return 0;
                goto drop;
        }
+
+       if (the_lnet.ln_routing)
+               lpni->lpni_last_alive = ktime_get_seconds();
+
        msg->msg_rxpeer = lpni;
        msg->msg_rxni = ni;
        lnet_ni_addref_locked(ni, cpt);
        /* Multi-Rail: Primary NID of source. */
        msg->msg_initiator = lnet_peer_primary_nid_locked(src_nid);
 
-       if (lnet_isrouter(msg->msg_rxpeer)) {
-               lnet_peer_set_alive(msg->msg_rxpeer);
-               if (avoid_asym_router_failure &&
-                   LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
-                       /* received a remote message from router, update
-                        * remote NI status on this router.
-                        * NB: multi-hop routed message will be ignored.
-                        */
-                       lnet_router_ni_update_locked(msg->msg_rxpeer,
-                                                    LNET_NIDNET(src_nid));
-               }
-       }
+       /*
+        * mark the status of this lpni as UP since we received a message
+        * from it. The ping response reports back the ns_status which is
+        * marked on the remote as up or down and we cache it here.
+        */
+       msg->msg_rxpeer->lpni_ns_status = LNET_NI_STATUS_UP;
 
        lnet_msg_commit(msg, cpt);
 
@@ -4963,9 +5085,10 @@ LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
                        LASSERT(shortest != NULL);
                        hops = shortest_hops;
                        if (srcnidp != NULL) {
-                               ni = lnet_get_next_ni_locked(
-                                       shortest->lr_gateway->lpni_net,
-                                       NULL);
+                               struct lnet_net *net;
+                               net = lnet_get_net_locked(shortest->lr_lnet);
+                               LASSERT(net);
+                               ni = lnet_get_next_ni_locked(net, NULL);
                                *srcnidp = ni->ni_nid;
                        }
                        if (orderp != NULL)
index 3497157..445d5a2 100644 (file)
@@ -440,25 +440,21 @@ lnet_complete_msg_locked(struct lnet_msg *msg, int cpt)
 }
 
 static void
-lnet_dec_healthv_locked(atomic_t *healthv)
+lnet_dec_healthv_locked(atomic_t *healthv, int sensitivity)
 {
        int h = atomic_read(healthv);
 
-       if (h < lnet_health_sensitivity) {
+       if (h < sensitivity) {
                atomic_set(healthv, 0);
        } else {
-               h -= lnet_health_sensitivity;
+               h -= sensitivity;
                atomic_set(healthv, h);
        }
 }
 
 static void
-lnet_handle_local_failure(struct lnet_msg *msg)
+lnet_handle_local_failure(struct lnet_ni *local_ni)
 {
-       struct lnet_ni *local_ni;
-
-       local_ni = msg->msg_txni;
-
        /*
         * the lnet_net_lock(0) is used to protect the addref on the ni
         * and the recovery queue.
@@ -470,7 +466,7 @@ lnet_handle_local_failure(struct lnet_msg *msg)
                return;
        }
 
-       lnet_dec_healthv_locked(&local_ni->ni_healthv);
+       lnet_dec_healthv_locked(&local_ni->ni_healthv, lnet_health_sensitivity);
        /*
         * add the NI to the recovery queue if it's not already there
         * and it's health value is actually below the maximum. It's
@@ -493,11 +489,22 @@ lnet_handle_local_failure(struct lnet_msg *msg)
 void
 lnet_handle_remote_failure_locked(struct lnet_peer_ni *lpni)
 {
+       __u32 sensitivity = lnet_health_sensitivity;
+       __u32 lp_sensitivity;
+
        /* lpni could be NULL if we're in the LOLND case */
        if (!lpni)
                return;
 
-       lnet_dec_healthv_locked(&lpni->lpni_healthv);
+       /*
+        * If there is a health sensitivity in the peer then use that
+        * instead of the globally set one.
+        */
+       lp_sensitivity = lpni->lpni_peer_net->lpn_peer->lp_health_sensitivity;
+       if (lp_sensitivity)
+               sensitivity = lp_sensitivity;
+
+       lnet_dec_healthv_locked(&lpni->lpni_healthv, sensitivity);
        /*
         * add the peer NI to the recovery queue if it's not already there
         * and it's health value is actually below the maximum. It's
@@ -516,6 +523,11 @@ lnet_handle_remote_failure(struct lnet_peer_ni *lpni)
                return;
 
        lnet_net_lock(0);
+       /* the mt could've shutdown and cleaned up the queues */
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
+               lnet_net_unlock(0);
+               return;
+       }
        lnet_handle_remote_failure_locked(lpni);
        lnet_net_unlock(0);
 }
@@ -593,20 +605,24 @@ lnet_health_check(struct lnet_msg *msg)
 {
        enum lnet_msg_hstatus hstatus = msg->msg_health_status;
        bool lo = false;
+       struct lnet_ni *ni;
+       struct lnet_peer_ni *lpni;
 
        /* if we're shutting down no point in handling health. */
-       if (the_lnet.ln_state != LNET_STATE_RUNNING)
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
                return -1;
 
-       LASSERT(msg->msg_txni);
+       LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
 
        /*
         * if we're sending to the LOLND then the msg_txpeer will not be
         * set. So no need to sanity check it.
         */
-       if (LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) != LOLND)
+       if (msg->msg_tx_committed &&
+           LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) != LOLND)
                LASSERT(msg->msg_txpeer);
-       else
+       else if (msg->msg_tx_committed &&
+                LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) == LOLND)
                lo = true;
 
        if (hstatus != LNET_MSG_STATUS_OK &&
@@ -623,21 +639,56 @@ lnet_health_check(struct lnet_msg *msg)
                lnet_net_unlock(0);
        }
 
+       /*
+        * always prefer txni/txpeer if they message is committed for both
+        * directions.
+        */
+       if (msg->msg_tx_committed) {
+               ni = msg->msg_txni;
+               lpni = msg->msg_txpeer;
+       } else {
+               ni = msg->msg_rxni;
+               lpni = msg->msg_rxpeer;
+       }
+
+       if (!lo)
+               LASSERT(ni && lpni);
+       else
+               LASSERT(ni);
+
        CDEBUG(D_NET, "health check: %s->%s: %s: %s\n",
-              libcfs_nid2str(msg->msg_txni->ni_nid),
-              (lo) ? "self" : libcfs_nid2str(msg->msg_txpeer->lpni_nid),
+              libcfs_nid2str(ni->ni_nid),
+              (lo) ? "self" : libcfs_nid2str(lpni->lpni_nid),
               lnet_msgtyp2str(msg->msg_type),
               lnet_health_error2str(hstatus));
 
        switch (hstatus) {
        case LNET_MSG_STATUS_OK:
-               lnet_inc_healthv(&msg->msg_txni->ni_healthv);
+               /*
+                * increment the local ni health weather we successfully
+                * received or sent a message on it.
+                */
+               lnet_inc_healthv(&ni->ni_healthv);
                /*
                 * It's possible msg_txpeer is NULL in the LOLND
-                * case.
+                * case. Only increment the peer's health if we're
+                * receiving a message from it. It's the only sure way to
+                * know that a remote interface is up.
+                * If this interface is part of a router, then take that
+                * as indication that the router is fully healthy.
                 */
-               if (msg->msg_txpeer)
-                       lnet_inc_healthv(&msg->msg_txpeer->lpni_healthv);
+               if (lpni && msg->msg_rx_committed) {
+                       /*
+                        * If we're receiving a message from the router or
+                        * I'm a router, then set that lpni's health to
+                        * maximum so we can commence communication
+                        */
+                       if (lnet_isrouter(lpni) || the_lnet.ln_routing)
+                               lnet_set_healthv(&lpni->lpni_healthv,
+                                                LNET_MAX_HEALTH_VALUE);
+                       else
+                               lnet_inc_healthv(&lpni->lpni_healthv);
+               }
 
                /* we can finalize this message */
                return -1;
@@ -646,16 +697,18 @@ lnet_health_check(struct lnet_msg *msg)
        case LNET_MSG_STATUS_LOCAL_ABORTED:
        case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
        case LNET_MSG_STATUS_LOCAL_TIMEOUT:
-               lnet_handle_local_failure(msg);
-               /* add to the re-send queue */
-               goto resend;
+               lnet_handle_local_failure(ni);
+               if (msg->msg_tx_committed)
+                       /* add to the re-send queue */
+                       goto resend;
+               break;
 
        /*
         * These errors will not trigger a resend so simply
         * finalize the message
         */
        case LNET_MSG_STATUS_LOCAL_ERROR:
-               lnet_handle_local_failure(msg);
+               lnet_handle_local_failure(ni);
                return -1;
 
        /*
@@ -663,19 +716,24 @@ lnet_health_check(struct lnet_msg *msg)
         * attempt a resend safely.
         */
        case LNET_MSG_STATUS_REMOTE_DROPPED:
-               lnet_handle_remote_failure(msg->msg_txpeer);
-               goto resend;
+               lnet_handle_remote_failure(lpni);
+               if (msg->msg_tx_committed)
+                       goto resend;
+               break;
 
        case LNET_MSG_STATUS_REMOTE_ERROR:
        case LNET_MSG_STATUS_REMOTE_TIMEOUT:
        case LNET_MSG_STATUS_NETWORK_TIMEOUT:
-               lnet_handle_remote_failure(msg->msg_txpeer);
+               lnet_handle_remote_failure(lpni);
                return -1;
        default:
                LBUG();
        }
 
 resend:
+       /* we can only resend tx_committed messages */
+       LASSERT(msg->msg_tx_committed);
+
        /* don't resend recovery messages */
        if (msg->msg_recovery) {
                CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n",
@@ -709,6 +767,12 @@ resend:
 
        lnet_net_lock(msg->msg_tx_cpt);
 
+       /* check again under lock */
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
+               lnet_net_unlock(msg->msg_tx_cpt);
+               return -1;
+       }
+
        /*
         * remove message from the active list and reset it in preparation
         * for a resend. Two exception to this
@@ -769,46 +833,39 @@ lnet_msg_detach_md(struct lnet_msg *msg, int cpt, int status)
        }
 
        if (unlink) {
-               /*
-                * if this is an ACK or a REPLY then make sure to remove the
-                * response tracker.
-                */
-               if (msg->msg_ev.type == LNET_EVENT_REPLY ||
-                   msg->msg_ev.type == LNET_EVENT_ACK)
-                       lnet_detach_rsp_tracker(msg->msg_md, cpt);
+               lnet_detach_rsp_tracker(md, cpt);
                lnet_md_unlink(md);
        }
 
        msg->msg_md = NULL;
 }
 
-static void
-lnet_detach_md(struct lnet_msg *msg, int status)
-{
-       int cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
-
-       lnet_res_lock(cpt);
-       lnet_msg_detach_md(msg, cpt, status);
-       lnet_res_unlock(cpt);
-}
-
 static bool
 lnet_is_health_check(struct lnet_msg *msg)
 {
-       bool hc;
+       bool hc = true;
        int status = msg->msg_ev.status;
 
-       /*
-        * perform a health check for any message committed for transmit
-        */
-       hc = msg->msg_tx_committed;
+       if ((!msg->msg_tx_committed && !msg->msg_rx_committed) ||
+           !msg->msg_onactivelist) {
+               CDEBUG(D_NET, "msg %p not committed for send or receive\n",
+                      msg);
+               return false;
+       }
+
+       if ((msg->msg_tx_committed && !msg->msg_txpeer) ||
+           (msg->msg_rx_committed && !msg->msg_rxpeer)) {
+               CDEBUG(D_NET, "msg %p failed too early to retry and send\n",
+                      msg);
+               return false;
+       }
 
        /* Check for status inconsistencies */
-       if (hc &&
-           ((!status && msg->msg_health_status != LNET_MSG_STATUS_OK) ||
-            (status && msg->msg_health_status == LNET_MSG_STATUS_OK))) {
-               CERROR("Msg is in inconsistent state, don't perform health "
-                      "checking (%d, %d)\n", status, msg->msg_health_status);
+       if ((!status && msg->msg_health_status != LNET_MSG_STATUS_OK) ||
+            (status && msg->msg_health_status == LNET_MSG_STATUS_OK)) {
+               CDEBUG(D_NET, "Msg %p is in inconsistent state, don't perform health "
+                      "checking (%d, %d)\n", msg, status,
+                      msg->msg_health_status);
                hc = false;
        }
 
@@ -860,11 +917,13 @@ lnet_send_error_simulation(struct lnet_msg *msg,
            return false;
 
        /* match only health rules */
-       if (!lnet_drop_rule_match(&msg->msg_hdr, hstatus))
+       if (!lnet_drop_rule_match(&msg->msg_hdr, LNET_NID_ANY,
+                                 hstatus))
                return false;
 
-       CDEBUG(D_NET, "src %sdst %s: %s simulate health error: %s\n",
+       CDEBUG(D_NET, "src %s(%s)->dst %s: %s simulate health error: %s\n",
                libcfs_nid2str(msg->msg_hdr.src_nid),
+               libcfs_nid2str(msg->msg_txni->ni_nid),
                libcfs_nid2str(msg->msg_hdr.dest_nid),
                lnet_msgtyp2str(msg->msg_type),
                lnet_health_error2str(*hstatus));
@@ -881,7 +940,6 @@ lnet_finalize(struct lnet_msg *msg, int status)
        int cpt;
        int rc;
        int i;
-       bool hc;
 
        LASSERT(!in_interrupt());
 
@@ -890,37 +948,7 @@ lnet_finalize(struct lnet_msg *msg, int status)
 
        msg->msg_ev.status = status;
 
-       /* if the message is successfully sent, no need to keep the MD around */
-       if (msg->msg_md != NULL && !status)
-               lnet_detach_md(msg, status);
-
-again:
-       hc = lnet_is_health_check(msg);
-
-       /*
-        * the MD would've been detached from the message if it was
-        * successfully sent. However, if it wasn't successfully sent the
-        * MD would be around. And since we recalculate whether to
-        * health check or not, it's possible that we change our minds and
-        * we don't want to health check this message. In this case also
-        * free the MD.
-        *
-        * If the message is successful we're going to
-        * go through the lnet_health_check() function, but that'll just
-        * increment the appropriate health value and return.
-        */
-       if (msg->msg_md != NULL && !hc)
-               lnet_detach_md(msg, status);
-
-       rc = 0;
-       if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
-               /* not committed to network yet */
-               LASSERT(!msg->msg_onactivelist);
-               lnet_msg_free(msg);
-               return;
-       }
-
-       if (hc) {
+       if (lnet_is_health_check(msg)) {
                /*
                 * Check the health status of the message. If it has one
                 * of the errors that we're supposed to handle, and it has
@@ -934,14 +962,27 @@ again:
                 * put on the resend queue.
                 */
                if (!lnet_health_check(msg))
+                       /* Message is queued for resend */
                        return;
+       }
 
-               /*
-                * if we get here then we need to clean up the md because we're
-                * finalizing the message.
-               */
-               if (msg->msg_md != NULL)
-                       lnet_detach_md(msg, status);
+       /*
+        * We're not going to resend this message so detach its MD and invoke
+        * the appropriate callbacks
+        */
+       if (msg->msg_md != NULL) {
+               cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
+               lnet_res_lock(cpt);
+               lnet_msg_detach_md(msg, cpt, status);
+               lnet_res_unlock(cpt);
+       }
+
+again:
+       if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
+               /* not committed to network yet */
+               LASSERT(!msg->msg_onactivelist);
+               lnet_msg_free(msg);
+               return;
        }
 
        /*
@@ -974,6 +1015,7 @@ again:
 
        container->msc_finalizers[my_slot] = current;
 
+       rc = 0;
        while (!list_empty(&container->msc_finalizing)) {
                msg = list_entry(container->msc_finalizing.next,
                                 struct lnet_msg, msg_list);
index 04c98d5..7fdd8df 100644 (file)
@@ -79,10 +79,12 @@ lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
 
 static bool
 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
-                     lnet_nid_t dst, unsigned int type, unsigned int portal)
+                     lnet_nid_t local_nid, lnet_nid_t dst,
+                     unsigned int type, unsigned int portal)
 {
        if (!lnet_fault_nid_match(attr->fa_src, src) ||
-           !lnet_fault_nid_match(attr->fa_dst, dst))
+           !lnet_fault_nid_match(attr->fa_dst, dst) ||
+           !lnet_fault_nid_match(attr->fa_local_nid, local_nid))
                return false;
 
        if (!(attr->fa_msg_mask & (1 << type)))
@@ -344,15 +346,22 @@ lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
  */
 static bool
 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
-               lnet_nid_t dst, unsigned int type, unsigned int portal,
+               lnet_nid_t local_nid, lnet_nid_t dst,
+               unsigned int type, unsigned int portal,
                enum lnet_msg_hstatus *hstatus)
 {
        struct lnet_fault_attr  *attr = &rule->dr_attr;
        bool                     drop;
 
-       if (!lnet_fault_attr_match(attr, src, dst, type, portal))
+       if (!lnet_fault_attr_match(attr, src, local_nid, dst, type, portal))
                return false;
 
+       if (attr->u.drop.da_drop_all) {
+               CDEBUG(D_NET, "set to drop all messages\n");
+               drop = true;
+               goto drop_matched;
+       }
+
        /*
         * if we're trying to match a health status error but it hasn't
         * been set in the rule, then don't match
@@ -402,6 +411,8 @@ drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
                }
        }
 
+drop_matched:
+
        if (drop) { /* drop this message, update counters */
                if (hstatus)
                        lnet_fault_match_health(hstatus,
@@ -418,7 +429,9 @@ drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
  * Check if message from \a src to \a dst can match any existed drop rule
  */
 bool
-lnet_drop_rule_match(struct lnet_hdr *hdr, enum lnet_msg_hstatus *hstatus)
+lnet_drop_rule_match(struct lnet_hdr *hdr,
+                    lnet_nid_t local_nid,
+                    enum lnet_msg_hstatus *hstatus)
 {
        lnet_nid_t src = le64_to_cpu(hdr->src_nid);
        lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
@@ -437,7 +450,7 @@ lnet_drop_rule_match(struct lnet_hdr *hdr, enum lnet_msg_hstatus *hstatus)
 
        cpt = lnet_net_lock_current();
        list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
-               drop = drop_rule_match(rule, src, dst, typ, ptl,
+               drop = drop_rule_match(rule, src, local_nid, dst, typ, ptl,
                                       hstatus);
                if (drop)
                        break;
@@ -528,7 +541,8 @@ delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
        struct lnet_fault_attr  *attr = &rule->dl_attr;
        bool                     delay;
 
-       if (!lnet_fault_attr_match(attr, src, dst, type, portal))
+       if (!lnet_fault_attr_match(attr, src, LNET_NID_ANY,
+                                  dst, type, portal))
                return false;
 
        /* match this rule, check delay rate now */
index 989b1f4..f58e0e0 100644 (file)
@@ -162,17 +162,18 @@ lnet_peer_ni_alloc(lnet_nid_t nid)
                return NULL;
 
        INIT_LIST_HEAD(&lpni->lpni_txq);
-       INIT_LIST_HEAD(&lpni->lpni_rtrq);
-       INIT_LIST_HEAD(&lpni->lpni_routes);
        INIT_LIST_HEAD(&lpni->lpni_hashlist);
        INIT_LIST_HEAD(&lpni->lpni_peer_nis);
        INIT_LIST_HEAD(&lpni->lpni_recovery);
        INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
+       LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
 
        spin_lock_init(&lpni->lpni_lock);
 
-       lpni->lpni_alive = !lnet_peers_start_down(); /* 1 bit!! */
-       lpni->lpni_last_alive = ktime_get_seconds(); /* assumes alive */
+       if (lnet_peers_start_down())
+               lpni->lpni_ns_status = LNET_NI_STATUS_DOWN;
+       else
+               lpni->lpni_ns_status = LNET_NI_STATUS_UP;
        lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
        lpni->lpni_nid = nid;
        lpni->lpni_cpt = cpt;
@@ -247,13 +248,24 @@ lnet_peer_alloc(lnet_nid_t nid)
        if (!lp)
                return NULL;
 
+       INIT_LIST_HEAD(&lp->lp_rtrq);
+       INIT_LIST_HEAD(&lp->lp_routes);
        INIT_LIST_HEAD(&lp->lp_peer_list);
        INIT_LIST_HEAD(&lp->lp_peer_nets);
        INIT_LIST_HEAD(&lp->lp_dc_list);
        INIT_LIST_HEAD(&lp->lp_dc_pendq);
+       INIT_LIST_HEAD(&lp->lp_rtr_list);
        init_waitqueue_head(&lp->lp_dc_waitq);
        spin_lock_init(&lp->lp_lock);
        lp->lp_primary_nid = nid;
+
+       /*
+        * all peers created on a router should have health on
+        * if it's not already on.
+        */
+       if (the_lnet.ln_routing && !lnet_health_sensitivity)
+               lp->lp_health_sensitivity = 1;
+
        /*
         * Turn off discovery for loopback peer. If you're creating a peer
         * for the loopback interface then that was initiated when we
@@ -276,6 +288,7 @@ lnet_destroy_peer_locked(struct lnet_peer *lp)
        CDEBUG(D_NET, "%p nid %s\n", lp, libcfs_nid2str(lp->lp_primary_nid));
 
        LASSERT(atomic_read(&lp->lp_refcount) == 0);
+       LASSERT(lp->lp_rtr_refcount == 0);
        LASSERT(list_empty(&lp->lp_peer_nets));
        LASSERT(list_empty(&lp->lp_peer_list));
        LASSERT(list_empty(&lp->lp_dc_list));
@@ -296,7 +309,9 @@ lnet_destroy_peer_locked(struct lnet_peer *lp)
         * Releasing the lock can cause an inconsistent state
         */
        spin_lock(&the_lnet.ln_msg_resend_lock);
+       spin_lock(&lp->lp_lock);
        list_splice(&lp->lp_dc_pendq, &the_lnet.ln_msg_resend);
+       spin_unlock(&lp->lp_lock);
        spin_unlock(&the_lnet.ln_msg_resend_lock);
        wake_up(&the_lnet.ln_dc_waitq);
 
@@ -359,12 +374,12 @@ lnet_peer_detach_peer_ni_locked(struct lnet_peer_ni *lpni)
 
 /* called with lnet_net_lock LNET_LOCK_EX held */
 static int
-lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
+lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni, bool force)
 {
        struct lnet_peer_table *ptable = NULL;
 
        /* don't remove a peer_ni if it's also a gateway */
-       if (lpni->lpni_rtr_refcount > 0) {
+       if (lnet_isrouter(lpni) && !force) {
                CERROR("Peer NI %s is a gateway. Can not delete it\n",
                       libcfs_nid2str(lpni->lpni_nid));
                return -EBUSY;
@@ -421,7 +436,7 @@ void lnet_peer_uninit(void)
        /* remove all peer_nis from the remote peer and the hash list */
        list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_remote_peer_ni_list,
                                 lpni_on_remote_peer_ni_list)
-               lnet_peer_ni_del_locked(lpni);
+               lnet_peer_ni_del_locked(lpni, false);
 
        lnet_peer_tables_destroy();
 
@@ -439,7 +454,7 @@ lnet_peer_del_locked(struct lnet_peer *peer)
        lpni = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
        while (lpni != NULL) {
                lpni2 = lnet_get_next_peer_ni_locked(peer, NULL, lpni);
-               rc = lnet_peer_ni_del_locked(lpni);
+               rc = lnet_peer_ni_del_locked(lpni, false);
                if (rc != 0)
                        rc2 = rc;
                lpni = lpni2;
@@ -473,6 +488,7 @@ lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
        struct lnet_peer_ni *lpni;
        lnet_nid_t primary_nid = lp->lp_primary_nid;
        int rc = 0;
+       bool force = (flags & LNET_PEER_RTR_NI_FORCE_DEL) ? true : false;
 
        if (!(flags & LNET_PEER_CONFIGURED)) {
                if (lp->lp_state & LNET_PEER_CONFIGURED) {
@@ -495,14 +511,21 @@ lnet_peer_del_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
         * This function only allows deletion of the primary NID if it
         * is the only NID.
         */
-       if (nid == lp->lp_primary_nid && lp->lp_nnis != 1) {
+       if (nid == lp->lp_primary_nid && lp->lp_nnis != 1 && !force) {
                rc = -EBUSY;
                goto out;
        }
 
        lnet_net_lock(LNET_LOCK_EX);
 
-       rc = lnet_peer_ni_del_locked(lpni);
+       if (nid == lp->lp_primary_nid && lp->lp_nnis != 1 && force) {
+               struct lnet_peer_ni *lpni2;
+               /* assign the next peer_ni to be the primary */
+               lpni2 = lnet_get_next_peer_ni_locked(lp, NULL, lpni);
+               LASSERT(lpni2);
+               lp->lp_primary_nid = lpni->lpni_nid;
+       }
+       rc = lnet_peer_ni_del_locked(lpni, force);
 
        lnet_net_unlock(LNET_LOCK_EX);
 
@@ -530,7 +553,7 @@ lnet_peer_table_cleanup_locked(struct lnet_net *net,
 
                        peer = lpni->lpni_peer_net->lpn_peer;
                        if (peer->lp_primary_nid != lpni->lpni_nid) {
-                               lnet_peer_ni_del_locked(lpni);
+                               lnet_peer_ni_del_locked(lpni, false);
                                continue;
                        }
                        /*
@@ -575,7 +598,7 @@ lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
 {
        struct lnet_peer_ni     *lp;
        struct lnet_peer_ni     *tmp;
-       lnet_nid_t              lpni_nid;
+       lnet_nid_t              gw_nid;
        int                     i;
 
        for (i = 0; i < LNET_PEER_HASH_SIZE; i++) {
@@ -584,13 +607,13 @@ lnet_peer_table_del_rtrs_locked(struct lnet_net *net,
                        if (net != lp->lpni_net)
                                continue;
 
-                       if (lp->lpni_rtr_refcount == 0)
+                       if (!lnet_isrouter(lp))
                                continue;
 
-                       lpni_nid = lp->lpni_nid;
+                       gw_nid = lp->lpni_peer_net->lpn_peer->lp_primary_nid;
 
                        lnet_net_unlock(LNET_LOCK_EX);
-                       lnet_del_route(LNET_NIDNET(LNET_NID_ANY), lpni_nid);
+                       lnet_del_route(LNET_NIDNET(LNET_NID_ANY), gw_nid);
                        lnet_net_lock(LNET_LOCK_EX);
                }
        }
@@ -656,6 +679,24 @@ lnet_find_peer_ni_locked(lnet_nid_t nid)
        return lpni;
 }
 
+struct lnet_peer_ni *
+lnet_peer_get_ni_locked(struct lnet_peer *lp, lnet_nid_t nid)
+{
+       struct lnet_peer_net *lpn;
+       struct lnet_peer_ni *lpni;
+
+       lpn = lnet_peer_get_net_locked(lp, LNET_NIDNET(nid));
+       if (!lpn)
+               return NULL;
+
+       list_for_each_entry(lpni, &lpn->lpn_peer_nis, lpni_peer_nis) {
+               if (lpni->lpni_nid == nid)
+                       return lpni;
+       }
+
+       return NULL;
+}
+
 struct lnet_peer *
 lnet_find_peer(lnet_nid_t nid)
 {
@@ -675,6 +716,39 @@ lnet_find_peer(lnet_nid_t nid)
        return lp;
 }
 
+struct lnet_peer_net *
+lnet_get_next_peer_net_locked(struct lnet_peer *lp, __u32 prev_lpn_id)
+{
+       struct lnet_peer_net *net;
+
+       if (!prev_lpn_id) {
+               /* no net id provided return the first net */
+               net = list_first_entry_or_null(&lp->lp_peer_nets,
+                                              struct lnet_peer_net,
+                                              lpn_peer_nets);
+
+               return net;
+       }
+
+       /* find the net after the one provided */
+       list_for_each_entry(net, &lp->lp_peer_nets, lpn_peer_nets) {
+               if (net->lpn_net_id == prev_lpn_id) {
+                       /*
+                        * if we reached the end of the list loop to the
+                        * beginning.
+                        */
+                       if (net->lpn_peer_nets.next == &lp->lp_peer_nets)
+                               return list_first_entry_or_null(&lp->lp_peer_nets,
+                                                               struct lnet_peer_net,
+                                                               lpn_peer_nets);
+                       else
+                               return list_next_entry(net, lpn_peer_nets);
+               }
+       }
+
+       return NULL;
+}
+
 struct lnet_peer_ni *
 lnet_get_next_peer_ni_locked(struct lnet_peer *peer,
                             struct lnet_peer_net *peer_net,
@@ -1329,6 +1403,18 @@ lnet_peer_add_nid(struct lnet_peer *lp, lnet_nid_t nid, unsigned flags)
                }
                /* If this is the primary NID, destroy the peer. */
                if (lnet_peer_ni_is_primary(lpni)) {
+                       struct lnet_peer *rtr_lp =
+                         lpni->lpni_peer_net->lpn_peer;
+                       int rtr_refcount = rtr_lp->lp_rtr_refcount;
+                       /*
+                        * if we're trying to delete a router it means
+                        * we're moving this peer NI to a new peer so must
+                        * transfer router properties to the new peer
+                        */
+                       if (rtr_refcount > 0) {
+                               flags |= LNET_PEER_RTR_NI_FORCE_DEL;
+                               lnet_rtr_transfer_to_peer(rtr_lp, lp);
+                       }
                        lnet_peer_del(lpni->lpni_peer_net->lpn_peer);
                        lpni = lnet_peer_ni_alloc(nid);
                        if (!lpni) {
@@ -1405,7 +1491,11 @@ lnet_peer_ni_traffic_add(lnet_nid_t nid, lnet_nid_t pref)
        struct lnet_peer *lp;
        struct lnet_peer_net *lpn;
        struct lnet_peer_ni *lpni;
-       unsigned flags = 0;
+       /*
+        * Assume peer is Multi-Rail capable and let discovery find out
+        * otherwise.
+        */
+       unsigned flags = LNET_PEER_MULTI_RAIL;
        int rc = 0;
 
        if (nid == LNET_NID_ANY) {
@@ -1551,6 +1641,15 @@ lnet_del_peer_ni(lnet_nid_t prim_nid, lnet_nid_t nid)
                return -ENODEV;
        }
 
+       lnet_net_lock(LNET_LOCK_EX);
+       if (lp->lp_rtr_refcount > 0) {
+               lnet_net_unlock(LNET_LOCK_EX);
+               CERROR("%s is a router. Can not be deleted\n",
+                      libcfs_nid2str(prim_nid));
+               return -EBUSY;
+       }
+       lnet_net_unlock(LNET_LOCK_EX);
+
        if (nid == LNET_NID_ANY || nid == lp->lp_primary_nid)
                return lnet_peer_del(lp);
 
@@ -1570,7 +1669,6 @@ lnet_destroy_peer_ni_locked(struct lnet_peer_ni *lpni)
        CDEBUG(D_NET, "%p nid %s\n", lpni, libcfs_nid2str(lpni->lpni_nid));
 
        LASSERT(atomic_read(&lpni->lpni_refcount) == 0);
-       LASSERT(lpni->lpni_rtr_refcount == 0);
        LASSERT(list_empty(&lpni->lpni_txq));
        LASSERT(lpni->lpni_txqnob == 0);
        LASSERT(list_empty(&lpni->lpni_peer_nis));
@@ -1697,9 +1795,47 @@ out_mutex_unlock:
        return lpni;
 }
 
+bool
+lnet_is_discovery_disabled_locked(struct lnet_peer *lp)
+{
+       if (lnet_peer_discovery_disabled)
+               return true;
+
+       if (!(lp->lp_state & LNET_PEER_MULTI_RAIL) ||
+           (lp->lp_state & LNET_PEER_NO_DISCOVERY)) {
+               return true;
+       }
+
+       return false;
+}
+
 /*
  * Peer Discovery
  */
+bool
+lnet_is_discovery_disabled(struct lnet_peer *lp)
+{
+       bool rc = false;
+
+       spin_lock(&lp->lp_lock);
+       rc = lnet_is_discovery_disabled_locked(lp);
+       spin_unlock(&lp->lp_lock);
+
+       return rc;
+}
+
+bool
+lnet_peer_gw_discovery(struct lnet_peer *lp)
+{
+       bool rc = false;
+
+       spin_lock(&lp->lp_lock);
+       if (lp->lp_state & LNET_PEER_RTR_DISCOVERY)
+               rc = true;
+       spin_unlock(&lp->lp_lock);
+
+       return rc;
+}
 
 /*
  * Is a peer uptodate from the point of view of discovery?
@@ -1719,13 +1855,8 @@ lnet_peer_is_uptodate(struct lnet_peer *lp)
                            LNET_PEER_FORCE_PING |
                            LNET_PEER_FORCE_PUSH)) {
                rc = false;
-       } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
-               rc = true;
        } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
-               if (lnet_peer_discovery_disabled)
-                       rc = true;
-               else
-                       rc = false;
+               rc = false;
        } else if (lnet_peer_needs_push(lp)) {
                rc = false;
        } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
@@ -1785,9 +1916,14 @@ static void lnet_peer_discovery_complete(struct lnet_peer *lp)
               libcfs_nid2str(lp->lp_primary_nid));
 
        list_del_init(&lp->lp_dc_list);
+       spin_lock(&lp->lp_lock);
        list_splice_init(&lp->lp_dc_pendq, &pending_msgs);
+       spin_unlock(&lp->lp_lock);
        wake_up_all(&lp->lp_dc_waitq);
 
+       if (lp->lp_rtr_refcount > 0)
+               lnet_router_discovery_complete(lp);
+
        lnet_net_unlock(LNET_LOCK_EX);
 
        /* iterate through all pending messages and send them again */
@@ -1923,38 +2059,9 @@ void lnet_peer_push_event(struct lnet_event *ev)
                goto out;
        }
 
-       /*
-        * Check whether the Put data is stale. Stale data can just be
-        * dropped.
-        */
-       if (pbuf->pb_info.pi_nnis > 1 &&
-           lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid &&
-           LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
-               CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
-                      libcfs_nid2str(lp->lp_primary_nid),
-                      LNET_PING_BUFFER_SEQNO(pbuf),
-                      lp->lp_peer_seqno);
-               goto out;
-       }
-
-       /*
-        * Check whether the Put data is new, in which case we clear
-        * the UPTODATE flag and prepare to process it.
-        *
-        * If the Put data is current, and the peer is UPTODATE then
-        * we assome everything is all right and drop the data as
-        * stale.
-        */
-       if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno) {
-               lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
-               lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
-       } else if (lp->lp_state & LNET_PEER_NIDS_UPTODATE) {
-               CDEBUG(D_NET, "Stale Push from %s: got %u have %u\n",
-                      libcfs_nid2str(lp->lp_primary_nid),
-                      LNET_PING_BUFFER_SEQNO(pbuf),
-                      lp->lp_peer_seqno);
-               goto out;
-       }
+       /* always assume new data */
+       lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
+       lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
 
        /*
         * If there is data present that hasn't been processed yet,
@@ -2060,6 +2167,9 @@ again:
                if (lnet_peer_is_uptodate(lp))
                        break;
                lnet_peer_queue_for_discovery(lp);
+
+               if (lnet_is_discovery_disabled(lp))
+                       break;
                /*
                 * if caller requested a non-blocking operation then
                 * return immediately. Once discovery is complete then the
@@ -2099,7 +2209,7 @@ again:
                rc = lp->lp_dc_error;
        else if (!block)
                CDEBUG(D_NET, "non-blocking discovery\n");
-       else if (!lnet_peer_is_uptodate(lp))
+       else if (!lnet_peer_is_uptodate(lp) && !lnet_is_discovery_disabled(lp))
                goto again;
 
        CDEBUG(D_NET, "peer %s NID %s: %d. %s\n",
@@ -2170,6 +2280,36 @@ lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
                goto out;
        }
 
+
+       /*
+        * Only enable the multi-rail feature on the peer if both sides of
+        * the connection have discovery on
+        */
+       if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL) {
+               CDEBUG(D_NET, "Peer %s has Multi-Rail feature enabled\n",
+                      libcfs_nid2str(lp->lp_primary_nid));
+               lp->lp_state |= LNET_PEER_MULTI_RAIL;
+       } else {
+               CDEBUG(D_NET, "Peer %s has Multi-Rail feature disabled\n",
+                      libcfs_nid2str(lp->lp_primary_nid));
+               lp->lp_state &= ~LNET_PEER_MULTI_RAIL;
+       }
+
+       /*
+        * The peer may have discovery disabled at its end. Set
+        * NO_DISCOVERY as appropriate.
+        */
+       if ((pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY) &&
+           !lnet_peer_discovery_disabled) {
+               CDEBUG(D_NET, "Peer %s has discovery enabled\n",
+                      libcfs_nid2str(lp->lp_primary_nid));
+               lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
+       } else {
+               CDEBUG(D_NET, "Peer %s has discovery disabled\n",
+                      libcfs_nid2str(lp->lp_primary_nid));
+               lp->lp_state |= LNET_PEER_NO_DISCOVERY;
+       }
+
        /*
         * Update the MULTI_RAIL flag based on the reply. If the peer
         * was configured with DLC then the setting should match what
@@ -2182,8 +2322,17 @@ lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
                        CWARN("Reply says %s is Multi-Rail, DLC says not\n",
                              libcfs_nid2str(lp->lp_primary_nid));
                } else {
-                       lp->lp_state |= LNET_PEER_MULTI_RAIL;
-                       lnet_peer_clr_non_mr_pref_nids(lp);
+                       /*
+                        * if discovery is disabled then we don't want to
+                        * update the state of the peer. All we'll do is
+                        * update the peer_nis which were reported back in
+                        * the initial ping
+                        */
+
+                       if (!lnet_is_discovery_disabled_locked(lp)) {
+                               lp->lp_state |= LNET_PEER_MULTI_RAIL;
+                               lnet_peer_clr_non_mr_pref_nids(lp);
+                       }
                }
        } else if (lp->lp_state & LNET_PEER_MULTI_RAIL) {
                if (lp->lp_state & LNET_PEER_CONFIGURED) {
@@ -2204,20 +2353,6 @@ lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
                lp->lp_data_nnis = pbuf->pb_info.pi_nnis;
 
        /*
-        * The peer may have discovery disabled at its end. Set
-        * NO_DISCOVERY as appropriate.
-        */
-       if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_DISCOVERY)) {
-               CDEBUG(D_NET, "Peer %s has discovery disabled\n",
-                      libcfs_nid2str(lp->lp_primary_nid));
-               lp->lp_state |= LNET_PEER_NO_DISCOVERY;
-       } else if (lp->lp_state & LNET_PEER_NO_DISCOVERY) {
-               CDEBUG(D_NET, "Peer %s has discovery enabled\n",
-                      libcfs_nid2str(lp->lp_primary_nid));
-               lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
-       }
-
-       /*
         * Check for truncation of the Reply. Clear PING_SENT and set
         * PING_FAILED to trigger a retry.
         */
@@ -2239,21 +2374,18 @@ lnet_discovery_event_reply(struct lnet_peer *lp, struct lnet_event *ev)
        if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL &&
            pbuf->pb_info.pi_nnis > 1 &&
            lp->lp_primary_nid == pbuf->pb_info.pi_ni[1].ns_nid) {
-               if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno) {
-                       CDEBUG(D_NET, "Stale Reply from %s: got %u have %u\n",
+               if (LNET_PING_BUFFER_SEQNO(pbuf) < lp->lp_peer_seqno)
+                       CDEBUG(D_NET, "peer %s: seq# got %u have %u. peer rebooted?\n",
                                libcfs_nid2str(lp->lp_primary_nid),
                                LNET_PING_BUFFER_SEQNO(pbuf),
                                lp->lp_peer_seqno);
-                       goto out;
-               }
 
-               if (LNET_PING_BUFFER_SEQNO(pbuf) > lp->lp_peer_seqno)
-                       lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
+               lp->lp_peer_seqno = LNET_PING_BUFFER_SEQNO(pbuf);
        }
 
        /* We're happy with the state of the data in the buffer. */
-       CDEBUG(D_NET, "peer %s data present %u\n",
-              libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno);
+       CDEBUG(D_NET, "peer %s data present %u. state = 0x%x\n",
+              libcfs_nid2str(lp->lp_primary_nid), lp->lp_peer_seqno, lp->lp_state);
        if (lp->lp_state & LNET_PEER_DATA_PRESENT)
                lnet_ping_buffer_decref(lp->lp_data);
        else
@@ -2402,7 +2534,7 @@ static int lnet_peer_merge_data(struct lnet_peer *lp,
 {
        struct lnet_peer_ni *lpni;
        lnet_nid_t *curnis = NULL;
-       lnet_nid_t *addnis = NULL;
+       struct lnet_ni_status *addnis = NULL;
        lnet_nid_t *delnis = NULL;
        unsigned flags;
        int ncurnis;
@@ -2417,10 +2549,21 @@ static int lnet_peer_merge_data(struct lnet_peer *lp,
        if (pbuf->pb_info.pi_features & LNET_PING_FEAT_MULTI_RAIL)
                flags |= LNET_PEER_MULTI_RAIL;
 
+       /*
+        * Cache the routing feature for the peer; whether it is enabled
+        * for disabled as reported by the remote peer.
+        */
+       spin_lock(&lp->lp_lock);
+       if (!(pbuf->pb_info.pi_features & LNET_PING_FEAT_RTE_DISABLED))
+               lp->lp_state |= LNET_PEER_ROUTER_ENABLED;
+       else
+               lp->lp_state &= ~LNET_PEER_ROUTER_ENABLED;
+       spin_unlock(&lp->lp_lock);
+
        nnis = MAX(lp->lp_nnis, pbuf->pb_info.pi_nnis);
-       LIBCFS_ALLOC(curnis, nnis * sizeof(lnet_nid_t));
-       LIBCFS_ALLOC(addnis, nnis * sizeof(lnet_nid_t));
-       LIBCFS_ALLOC(delnis, nnis * sizeof(lnet_nid_t));
+       LIBCFS_ALLOC(curnis, nnis * sizeof(*curnis));
+       LIBCFS_ALLOC(addnis, nnis * sizeof(*addnis));
+       LIBCFS_ALLOC(delnis, nnis * sizeof(*delnis));
        if (!curnis || !addnis || !delnis) {
                rc = -ENOMEM;
                goto out;
@@ -2443,7 +2586,7 @@ static int lnet_peer_merge_data(struct lnet_peer *lp,
                        if (pbuf->pb_info.pi_ni[i].ns_nid == curnis[j])
                                break;
                if (j == ncurnis)
-                       addnis[naddnis++] = pbuf->pb_info.pi_ni[i].ns_nid;
+                       addnis[naddnis++] = pbuf->pb_info.pi_ni[i];
        }
        /*
         * Check for NIDs in curnis[] not present in pbuf.
@@ -2455,24 +2598,59 @@ static int lnet_peer_merge_data(struct lnet_peer *lp,
        for (i = 0; i < ncurnis; i++) {
                if (LNET_NETTYP(LNET_NIDNET(curnis[i])) == LOLND)
                        continue;
-               for (j = 1; j < pbuf->pb_info.pi_nnis; j++)
-                       if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid)
+               for (j = 1; j < pbuf->pb_info.pi_nnis; j++) {
+                       if (curnis[i] == pbuf->pb_info.pi_ni[j].ns_nid) {
+                               /*
+                                * update the information we cache for the
+                                * peer with the latest information we
+                                * received
+                                */
+                               lpni = lnet_find_peer_ni_locked(curnis[i]);
+                               if (lpni) {
+                                       lpni->lpni_ns_status = pbuf->pb_info.pi_ni[j].ns_status;
+                                       lnet_peer_ni_decref_locked(lpni);
+                               }
                                break;
+                       }
+               }
                if (j == pbuf->pb_info.pi_nnis)
                        delnis[ndelnis++] = curnis[i];
        }
 
+       /*
+        * If we get here and the discovery is disabled then we don't want
+        * to add or delete any NIs. We just updated the ones we have some
+        * information on, and call it a day
+        */
+       rc = 0;
+       if (lnet_is_discovery_disabled(lp))
+               goto out;
+
        for (i = 0; i < naddnis; i++) {
-               rc = lnet_peer_add_nid(lp, addnis[i], flags);
+               rc = lnet_peer_add_nid(lp, addnis[i].ns_nid, flags);
                if (rc) {
                        CERROR("Error adding NID %s to peer %s: %d\n",
-                              libcfs_nid2str(addnis[i]),
+                              libcfs_nid2str(addnis[i].ns_nid),
                               libcfs_nid2str(lp->lp_primary_nid), rc);
                        if (rc == -ENOMEM)
                                goto out;
                }
+               lpni = lnet_find_peer_ni_locked(addnis[i].ns_nid);
+               if (lpni) {
+                       lpni->lpni_ns_status = addnis[i].ns_status;
+                       lnet_peer_ni_decref_locked(lpni);
+               }
        }
+
        for (i = 0; i < ndelnis; i++) {
+               /*
+                * for routers it's okay to delete the primary_nid because
+                * the upper layers don't really rely on it. So if we're
+                * being told that the router changed its primary_nid
+                * then it's okay to delete it.
+                */
+               if (lp->lp_rtr_refcount > 0)
+                       flags |= LNET_PEER_RTR_NI_FORCE_DEL;
                rc = lnet_peer_del_nid(lp, delnis[i], flags);
                if (rc) {
                        CERROR("Error deleting NID %s from peer %s: %d\n",
@@ -2489,11 +2667,11 @@ static int lnet_peer_merge_data(struct lnet_peer *lp,
         */
        rc = 0;
 out:
-       LIBCFS_FREE(curnis, nnis * sizeof(lnet_nid_t));
-       LIBCFS_FREE(addnis, nnis * sizeof(lnet_nid_t));
-       LIBCFS_FREE(delnis, nnis * sizeof(lnet_nid_t));
+       LIBCFS_FREE(curnis, nnis * sizeof(*curnis));
+       LIBCFS_FREE(addnis, nnis * sizeof(*addnis));
+       LIBCFS_FREE(delnis, nnis * sizeof(*delnis));
        lnet_ping_buffer_decref(pbuf);
-       CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
+       CDEBUG(D_NET, "peer %s (%p): %d\n", libcfs_nid2str(lp->lp_primary_nid), lp, rc);
 
        if (rc) {
                spin_lock(&lp->lp_lock);
@@ -2566,6 +2744,18 @@ lnet_peer_set_primary_data(struct lnet_peer *lp, struct lnet_ping_buffer *pbuf)
        return 0;
 }
 
+static bool lnet_is_nid_in_ping_info(lnet_nid_t nid, struct lnet_ping_info *pinfo)
+{
+       int i;
+
+       for (i = 0; i < pinfo->pi_nnis; i++) {
+               if (pinfo->pi_ni[i].ns_nid == nid)
+                       return true;
+       }
+
+       return false;
+}
+
 /*
  * Update a peer using the data received.
  */
@@ -2633,7 +2823,17 @@ __must_hold(&lp->lp_lock)
                rc = lnet_peer_set_primary_nid(lp, nid, flags);
                if (!rc)
                        rc = lnet_peer_merge_data(lp, pbuf);
-       } else if (lp->lp_primary_nid == nid) {
+       /*
+        * if the primary nid of the peer is present in the ping info returned
+        * from the peer, but it's not the local primary peer we have
+        * cached and discovery is disabled, then we don't want to update
+        * our local peer info, by adding or removing NIDs, we just want
+        * to update the status of the nids that we currently have
+        * recorded in that peer.
+        */
+       } else if (lp->lp_primary_nid == nid ||
+                  (lnet_is_nid_in_ping_info(lp->lp_primary_nid, &pbuf->pb_info) &&
+                   lnet_is_discovery_disabled(lp))) {
                rc = lnet_peer_merge_data(lp, pbuf);
        } else {
                lpni = lnet_find_peer_ni_locked(nid);
@@ -2647,13 +2847,26 @@ __must_hold(&lp->lp_lock)
                                rc = lnet_peer_merge_data(lp, pbuf);
                        }
                } else {
-                       rc = lnet_peer_set_primary_data(
-                               lpni->lpni_peer_net->lpn_peer, pbuf);
+                       struct lnet_peer *new_lp;
+                       new_lp = lpni->lpni_peer_net->lpn_peer;
+                       /*
+                        * if lp has discovery/MR enabled that means new_lp
+                        * should have discovery/MR enabled as well, since
+                        * it's the same peer, which we're about to merge
+                        */
+                       if (!(lp->lp_state & LNET_PEER_NO_DISCOVERY))
+                               new_lp->lp_state &= ~LNET_PEER_NO_DISCOVERY;
+                       if (lp->lp_state & LNET_PEER_MULTI_RAIL)
+                               new_lp->lp_state |= LNET_PEER_MULTI_RAIL;
+
+                       rc = lnet_peer_set_primary_data(new_lp, pbuf);
+                       lnet_consolidate_routes_locked(lp, new_lp);
                        lnet_peer_ni_decref_locked(lpni);
                }
        }
 out:
-       CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
+       CDEBUG(D_NET, "peer %s(%p): %d. state = 0x%x\n", libcfs_nid2str(lp->lp_primary_nid), lp, rc,
+              lp->lp_state);
        mutex_unlock(&the_lnet.ln_api_mutex);
 
        spin_lock(&lp->lp_lock);
@@ -2873,7 +3086,7 @@ fail_unlink:
        LNetMDUnlink(lp->lp_push_mdh);
        LNetInvalidateMDHandle(&lp->lp_push_mdh);
 fail_error:
-       CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
+       CDEBUG(D_NET, "peer %s(%p): %d\n", libcfs_nid2str(lp->lp_primary_nid), lp, rc);
        /*
         * The errors that get us here are considered hard errors and
         * cause Discovery to terminate. So we clear PUSH_SENT, but do
@@ -2917,19 +3130,6 @@ __must_hold(&lp->lp_lock)
        return 0;
 }
 
-/*
- * Mark the peer as to be rediscovered.
- */
-static int lnet_peer_rediscover(struct lnet_peer *lp)
-__must_hold(&lp->lp_lock)
-{
-       lp->lp_state |= LNET_PEER_REDISCOVER;
-       lp->lp_state &= ~LNET_PEER_DISCOVERING;
-
-       CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
-
-       return 0;
-}
 
 /*
  * Discovering this peer is taking too long. Cancel any Ping or Push
@@ -3104,8 +3304,8 @@ static int lnet_peer_discovery(void *arg)
                         * forcing a Ping or Push.
                         */
                        spin_lock(&lp->lp_lock);
-                       CDEBUG(D_NET, "peer %s state %#x\n",
-                               libcfs_nid2str(lp->lp_primary_nid),
+                       CDEBUG(D_NET, "peer %s(%p) state %#x\n",
+                               libcfs_nid2str(lp->lp_primary_nid), lp,
                                lp->lp_state);
                        if (lp->lp_state & LNET_PEER_DATA_PRESENT)
                                rc = lnet_peer_data_present(lp);
@@ -3117,16 +3317,14 @@ static int lnet_peer_discovery(void *arg)
                                rc = lnet_peer_send_ping(lp);
                        else if (lp->lp_state & LNET_PEER_FORCE_PUSH)
                                rc = lnet_peer_send_push(lp);
-                       else if (lnet_peer_discovery_disabled)
-                               rc = lnet_peer_rediscover(lp);
                        else if (!(lp->lp_state & LNET_PEER_NIDS_UPTODATE))
                                rc = lnet_peer_send_ping(lp);
                        else if (lnet_peer_needs_push(lp))
                                rc = lnet_peer_send_push(lp);
                        else
                                rc = lnet_peer_discovered(lp);
-                       CDEBUG(D_NET, "peer %s state %#x rc %d\n",
-                               libcfs_nid2str(lp->lp_primary_nid),
+                       CDEBUG(D_NET, "peer %s(%p) state %#x rc %d\n",
+                               libcfs_nid2str(lp->lp_primary_nid), lp,
                                lp->lp_state, rc);
                        spin_unlock(&lp->lp_lock);
 
@@ -3153,8 +3351,6 @@ static int lnet_peer_discovery(void *arg)
         * size of the thundering herd if there are multiple threads
         * waiting on discovery of a single peer.
         */
-       LNetEQFree(the_lnet.ln_dc_eqh);
-       LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
 
        /* Queue cleanup 1: stop all pending pings and pushes. */
        lnet_net_lock(LNET_LOCK_EX);
@@ -3182,6 +3378,9 @@ static int lnet_peer_discovery(void *arg)
        }
        lnet_net_unlock(LNET_LOCK_EX);
 
+       LNetEQFree(the_lnet.ln_dc_eqh);
+       LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
+
        the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
        wake_up(&the_lnet.ln_dc_waitq);
 
@@ -3262,7 +3461,7 @@ lnet_debug_peer(lnet_nid_t nid)
        }
 
        if (lnet_isrouter(lp) || lnet_peer_aliveness_enabled(lp))
-               aliveness = lp->lpni_alive ? "up" : "down";
+               aliveness = (lnet_is_peer_ni_alive(lp)) ? "up" : "down";
 
        CDEBUG(D_WARNING, "%-24s %4d %5s %5d %5d %5d %5d %5d %ld\n",
               libcfs_nid2str(lp->lpni_nid), atomic_read(&lp->lpni_refcount),
@@ -3318,7 +3517,7 @@ int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
                        if (lnet_isrouter(lp) ||
                                lnet_peer_aliveness_enabled(lp))
                                snprintf(aliveness, LNET_MAX_STR_LEN,
-                                        lp->lpni_alive ? "up" : "down");
+                                        lnet_is_peer_ni_alive(lp) ? "up" : "down");
 
                        *nid = lp->lpni_nid;
                        *refcount = atomic_read(&lp->lpni_refcount);
@@ -3405,7 +3604,7 @@ int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
                if (lnet_isrouter(lpni) ||
                        lnet_peer_aliveness_enabled(lpni))
                        snprintf(lpni_info->cr_aliveness, LNET_MAX_STR_LEN,
-                               lpni->lpni_alive ? "up" : "down");
+                               lnet_is_peer_ni_alive(lpni) ? "up" : "down");
 
                lpni_info->cr_refcount = atomic_read(&lpni->lpni_refcount);
                lpni_info->cr_ni_peer_tx_credits = (lpni->lpni_net != NULL) ?
index 4b41feb..7d7e08d 100644 (file)
@@ -35,6 +35,8 @@
 #define LNET_NRB_LARGE_PAGES   ((LNET_MTU + PAGE_SIZE - 1) >> \
                                  PAGE_SHIFT)
 
+extern unsigned int lnet_current_net_count;
+
 static char *forwarding = "";
 module_param(forwarding, charp, 0444);
 MODULE_PARM_DESC(forwarding, "Explicitly enable/disable forwarding between networks");
@@ -78,165 +80,271 @@ int avoid_asym_router_failure = 1;
 module_param(avoid_asym_router_failure, int, 0644);
 MODULE_PARM_DESC(avoid_asym_router_failure, "Avoid asymmetrical router failures (0 to disable)");
 
-static int dead_router_check_interval = 60;
-module_param(dead_router_check_interval, int, 0644);
-MODULE_PARM_DESC(dead_router_check_interval, "Seconds between dead router health checks (<= 0 to disable)");
-
-static int live_router_check_interval = 60;
-module_param(live_router_check_interval, int, 0644);
-MODULE_PARM_DESC(live_router_check_interval, "Seconds between live router health checks (<= 0 to disable)");
+int alive_router_check_interval = 60;
+module_param(alive_router_check_interval, int, 0644);
+MODULE_PARM_DESC(alive_router_check_interval, "Seconds between live router health checks (<= 0 to disable)");
 
 static int router_ping_timeout = 50;
 module_param(router_ping_timeout, int, 0644);
 MODULE_PARM_DESC(router_ping_timeout, "Seconds to wait for the reply to a router health query");
 
-int
-lnet_peers_start_down(void)
-{
-       return check_routers_before_use;
-}
+/*
+ * A value between 0 and 100. 0 meaning that even if router's interfaces
+ * have the worse health still consider the gateway usable.
+ * 100 means that at least one interface on the route's remote net is 100%
+ * healthy to consider the route alive.
+ * The default is set to 100 to ensure we maintain the original behavior.
+ */
+unsigned int router_sensitivity_percentage = 100;
+static int rtr_sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp);
+static struct kernel_param_ops param_ops_rtr_sensitivity = {
+       .set = rtr_sensitivity_set,
+       .get = param_get_int,
+};
+#define param_check_rtr_sensitivity(name, p) \
+               __param_check(name, p, int)
+#ifdef HAVE_KERNEL_PARAM_OPS
+module_param(router_sensitivity_percentage, rtr_sensitivity, S_IRUGO|S_IWUSR);
+#else
+module_param_call(router_sensitivity_percentage, rtr_sensitivity_set, param_get_int,
+                 &router_sensitivity_percentage, S_IRUGO|S_IWUSR);
+#endif
+MODULE_PARM_DESC(router_sensitivity_percentage,
+               "How healthy a gateway should be to be used in percent");
 
-void
-lnet_notify_locked(struct lnet_peer_ni *lp, int notifylnd, int alive,
-                  time64_t when)
+static int
+rtr_sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp)
 {
-       if (lp->lpni_timestamp > when) { /* out of date information */
-               CDEBUG(D_NET, "Out of date\n");
-               return;
+       int rc;
+       unsigned *sen = (unsigned *)kp->arg;
+       unsigned long value;
+
+       rc = kstrtoul(val, 0, &value);
+       if (rc) {
+               CERROR("Invalid module parameter value for 'router_sensitivity_percentage'\n");
+               return rc;
+       }
+
+       if (value < 0 || value > 100) {
+               CERROR("Invalid value: %lu for 'router_sensitivity_percentage'\n", value);
+               return -EINVAL;
        }
 
        /*
-        * This function can be called with different cpt locks being
-        * held. lpni_alive_count modification needs to be properly protected.
-        * Significant reads to lpni_alive_count are also protected with
-        * the same lock
+        * The purpose of locking the api_mutex here is to ensure that
+        * the correct value ends up stored properly.
         */
-       spin_lock(&lp->lpni_lock);
+       mutex_lock(&the_lnet.ln_api_mutex);
 
-       lp->lpni_timestamp = when; /* update timestamp */
-       lp->lpni_ping_deadline = 0;               /* disable ping timeout */
+       *sen = value;
 
-       if (lp->lpni_alive_count != 0 &&          /* got old news */
-           (!lp->lpni_alive) == (!alive)) {      /* new date for old news */
-               spin_unlock(&lp->lpni_lock);
-               CDEBUG(D_NET, "Old news\n");
-               return;
-       }
+       mutex_unlock(&the_lnet.ln_api_mutex);
 
-       /* Flag that notification is outstanding */
+       return 0;
+}
 
-       lp->lpni_alive_count++;
-       lp->lpni_alive = (alive) ? 1 : 0;
-       lp->lpni_notify = 1;
-       lp->lpni_notifylnd = notifylnd;
-       if (lp->lpni_alive)
-               lp->lpni_ping_feats = LNET_PING_FEAT_INVAL; /* reset */
+void
+lnet_rtr_transfer_to_peer(struct lnet_peer *src, struct lnet_peer *target)
+{
+       struct lnet_route *route;
 
-       spin_unlock(&lp->lpni_lock);
+       lnet_net_lock(LNET_LOCK_EX);
+       target->lp_rtr_refcount += src->lp_rtr_refcount;
+       /* move the list of queued messages to the new peer */
+       list_splice_init(&src->lp_rtrq, &target->lp_rtrq);
+       /* move all the routes that reference the peer */
+       list_splice_init(&src->lp_routes, &target->lp_routes);
+       /* update all the routes to point to the new peer */
+       list_for_each_entry(route, &target->lp_routes, lr_gwlist)
+               route->lr_gateway = target;
+       /* remove the old peer from the ln_routers list */
+       list_del_init(&src->lp_rtr_list);
+       /* add the new peer to the ln_routers list */
+       if (list_empty(&target->lp_rtr_list)) {
+               lnet_peer_addref_locked(target);
+               list_add_tail(&target->lp_rtr_list, &the_lnet.ln_routers);
+       }
+       /* reset the ref count on the old peer and decrement its ref count */
+       src->lp_rtr_refcount = 0;
+       lnet_peer_decref_locked(src);
+       /* update the router version */
+       the_lnet.ln_routers_version++;
+       lnet_net_unlock(LNET_LOCK_EX);
+}
 
-       CDEBUG(D_NET, "set %s %d\n", libcfs_nid2str(lp->lpni_nid), alive);
+int
+lnet_peers_start_down(void)
+{
+       return check_routers_before_use;
 }
 
 /*
- * This function will always be called with lp->lpni_cpt lock held.
+ * A net is alive if at least one gateway NI on the network is alive.
  */
-static void
-lnet_ni_notify_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp)
+static bool
+lnet_is_gateway_net_alive(struct lnet_peer_net *lpn)
 {
-       int alive;
-       int notifylnd;
+       struct lnet_peer_ni *lpni;
 
-       /* Notify only in 1 thread at any time to ensure ordered notification.
-        * NB individual events can be missed; the only guarantee is that you
-        * always get the most recent news */
+       list_for_each_entry(lpni, &lpn->lpn_peer_nis, lpni_peer_nis) {
+               if (lnet_is_peer_ni_alive(lpni))
+                       return true;
+       }
 
-       spin_lock(&lp->lpni_lock);
+       return false;
+}
 
-       if (lp->lpni_notifying || ni == NULL) {
-               spin_unlock(&lp->lpni_lock);
-               return;
+/*
+ * a gateway is alive only if all its nets are alive
+ * called with cpt lock held
+ */
+bool lnet_is_gateway_alive(struct lnet_peer *gw)
+{
+       struct lnet_peer_net *lpn;
+
+       list_for_each_entry(lpn, &gw->lp_peer_nets, lpn_peer_nets) {
+               if (!lnet_is_gateway_net_alive(lpn))
+                       return false;
        }
 
-       lp->lpni_notifying = 1;
+       return true;
+}
+
+/*
+ * lnet_is_route_alive() needs to be called with cpt lock held
+ * A route is alive if the gateway can route between the local network and
+ * the remote network of the route.
+ * This means at least one NI is alive on each of the local and remote
+ * networks of the gateway.
+ */
+bool lnet_is_route_alive(struct lnet_route *route)
+{
+       struct lnet_peer *gw = route->lr_gateway;
+       struct lnet_peer_net *llpn;
+       struct lnet_peer_net *rlpn;
+       bool route_alive;
 
        /*
-        * lp->lpni_notify needs to be protected because it can be set in
-        * lnet_notify_locked().
+        * check the gateway's interfaces on the route rnet to make sure
+        * that the gateway is viable.
         */
-       while (lp->lpni_notify) {
-               alive     = lp->lpni_alive;
-               notifylnd = lp->lpni_notifylnd;
+       llpn = lnet_peer_get_net_locked(gw, route->lr_lnet);
+       if (!llpn)
+               return false;
 
-               lp->lpni_notifylnd = 0;
-               lp->lpni_notify    = 0;
+       route_alive = lnet_is_gateway_net_alive(llpn);
 
-               if (notifylnd && ni->ni_net->net_lnd->lnd_notify != NULL) {
-                       spin_unlock(&lp->lpni_lock);
-                       lnet_net_unlock(lp->lpni_cpt);
+       if (avoid_asym_router_failure) {
+               rlpn = lnet_peer_get_net_locked(gw, route->lr_net);
+               if (!rlpn)
+                       return false;
+               route_alive = route_alive &&
+                             lnet_is_gateway_net_alive(rlpn);
+       }
+
+       if (!route_alive)
+               return route_alive;
 
-                       /* A new notification could happen now; I'll handle it
-                        * when control returns to me */
+       spin_lock(&gw->lp_lock);
+       if (!(gw->lp_state & LNET_PEER_ROUTER_ENABLED)) {
+               if (gw->lp_rtr_refcount > 0)
+                       CERROR("peer %s is being used as a gateway but routing feature is not turned on\n",
+                              libcfs_nid2str(gw->lp_primary_nid));
+               route_alive = false;
+       }
+       spin_unlock(&gw->lp_lock);
 
-                       (ni->ni_net->net_lnd->lnd_notify)(ni, lp->lpni_nid,
-                                                         alive);
+       return route_alive;
+}
 
-                       lnet_net_lock(lp->lpni_cpt);
-                       spin_lock(&lp->lpni_lock);
+void
+lnet_consolidate_routes_locked(struct lnet_peer *orig_lp,
+                              struct lnet_peer *new_lp)
+{
+       struct lnet_peer_ni *lpni;
+       struct lnet_route *route;
+
+       /*
+        * Although a route is correlated with a peer, but when it's added
+        * a specific NID is used. That NID refers to a peer_ni within
+        * a peer. There could be other peer_nis on the same net, which
+        * can be used to send to that gateway. However when we are
+        * consolidating gateways because of discovery, the nid used to
+        * add the route might've moved between gateway peers. In this
+        * case we want to move the route to the new gateway as well. The
+        * intent here is not to confuse the user who added the route.
+        */
+       list_for_each_entry(route, &orig_lp->lp_routes, lr_gwlist) {
+               lpni = lnet_peer_get_ni_locked(orig_lp, route->lr_nid);
+               if (!lpni) {
+                       lnet_net_lock(LNET_LOCK_EX);
+                       list_move(&route->lr_gwlist, &new_lp->lp_routes);
+                       lnet_net_unlock(LNET_LOCK_EX);
                }
        }
 
-       lp->lpni_notifying = 0;
-       spin_unlock(&lp->lpni_lock);
 }
 
-static void
-lnet_rtr_addref_locked(struct lnet_peer_ni *lp)
+void
+lnet_router_discovery_complete(struct lnet_peer *lp)
 {
-       LASSERT(atomic_read(&lp->lpni_refcount) > 0);
-       LASSERT(lp->lpni_rtr_refcount >= 0);
+       struct lnet_peer_ni *lpni = NULL;
 
-       /* lnet_net_lock must be exclusively locked */
-       lp->lpni_rtr_refcount++;
-       if (lp->lpni_rtr_refcount == 1) {
-               struct list_head *pos;
+       spin_lock(&lp->lp_lock);
+       lp->lp_state &= ~LNET_PEER_RTR_DISCOVERY;
+       spin_unlock(&lp->lp_lock);
 
-               /* a simple insertion sort */
-               list_for_each_prev(pos, &the_lnet.ln_routers) {
-                       struct lnet_peer_ni *rtr;
+       /*
+        * Router discovery successful? All peer information would've been
+        * updated already. No need to do any more processing
+        */
+       if (!lp->lp_dc_error)
+               return;
+       /*
+        * discovery failed? then we need to set the status of each lpni
+        * to DOWN. It will be updated the next time we discover the
+        * router. For router peer NIs not on local networks, we never send
+        * messages directly to them, so their health will always remain
+        * at maximum. We can only tell if they are up or down from the
+        * status returned in the PING response. If we fail to get that
+        * status in our scheduled router discovery, then we'll assume
+        * it's down until we're told otherwise.
+        */
+       CDEBUG(D_NET, "%s: Router discovery failed %d\n",
+              libcfs_nid2str(lp->lp_primary_nid), lp->lp_dc_error);
+       while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL)
+               lpni->lpni_ns_status = LNET_NI_STATUS_DOWN;
+}
 
-                       rtr = list_entry(pos, struct lnet_peer_ni,
-                                        lpni_rtr_list);
-                       if (rtr->lpni_nid < lp->lpni_nid)
-                               break;
-               }
+static void
+lnet_rtr_addref_locked(struct lnet_peer *lp)
+{
+       LASSERT(lp->lp_rtr_refcount >= 0);
 
-               list_add(&lp->lpni_rtr_list, pos);
+       /* lnet_net_lock must be exclusively locked */
+       lp->lp_rtr_refcount++;
+       if (lp->lp_rtr_refcount == 1) {
+               list_add_tail(&lp->lp_rtr_list, &the_lnet.ln_routers);
                /* addref for the_lnet.ln_routers */
-               lnet_peer_ni_addref_locked(lp);
+               lnet_peer_addref_locked(lp);
                the_lnet.ln_routers_version++;
        }
 }
 
 static void
-lnet_rtr_decref_locked(struct lnet_peer_ni *lp)
+lnet_rtr_decref_locked(struct lnet_peer *lp)
 {
-       LASSERT(atomic_read(&lp->lpni_refcount) > 0);
-       LASSERT(lp->lpni_rtr_refcount > 0);
+       LASSERT(atomic_read(&lp->lp_refcount) > 0);
+       LASSERT(lp->lp_rtr_refcount > 0);
 
        /* lnet_net_lock must be exclusively locked */
-       lp->lpni_rtr_refcount--;
-       if (lp->lpni_rtr_refcount == 0) {
-               LASSERT(list_empty(&lp->lpni_routes));
-
-               if (lp->lpni_rcd != NULL) {
-                       list_add(&lp->lpni_rcd->rcd_list,
-                                &the_lnet.ln_rcd_deathrow);
-                       lp->lpni_rcd = NULL;
-               }
+       lp->lp_rtr_refcount--;
+       if (lp->lp_rtr_refcount == 0) {
+               LASSERT(list_empty(&lp->lp_routes));
 
-               list_del(&lp->lpni_rtr_list);
+               list_del(&lp->lp_rtr_list);
                /* decref for the_lnet.ln_routers */
-               lnet_peer_ni_decref_locked(lp);
+               lnet_peer_decref_locked(lp);
                the_lnet.ln_routers_version++;
        }
 }
@@ -269,8 +377,7 @@ static void lnet_shuffle_seed(void)
                return;
 
        /* Nodes with small feet have little entropy
-        * the NID for this node gives the most entropy in the low bits
-        */
+        * the NID for this node gives the most entropy in the low bits */
        while ((ni = lnet_get_next_ni_locked(NULL, ni)))
                add_device_randomness(&ni->ni_nid, sizeof(ni->ni_nid));
 
@@ -282,17 +389,21 @@ static void lnet_shuffle_seed(void)
 static void
 lnet_add_route_to_rnet(struct lnet_remotenet *rnet, struct lnet_route *route)
 {
-       unsigned int      len = 0;
-       unsigned int      offset = 0;
+       struct lnet_peer_net *lpn;
+       unsigned int offset = 0;
+       unsigned int len = 0;
        struct list_head *e;
 
        lnet_shuffle_seed();
 
-       list_for_each(e, &rnet->lrn_routes) {
+       list_for_each(e, &rnet->lrn_routes)
                len++;
-       }
 
-       /* len+1 positions to add a new entry, also prevents division by 0 */
+       /*
+        * Randomly adding routes to the list is done to ensure that when
+        * different nodes are using the same list of routers, they end up
+        * preferring different routers.
+        */
        offset = cfs_rand() % (len + 1);
        list_for_each(e, &rnet->lrn_routes) {
                if (offset == 0)
@@ -300,26 +411,38 @@ lnet_add_route_to_rnet(struct lnet_remotenet *rnet, struct lnet_route *route)
                offset--;
        }
        list_add(&route->lr_list, e);
-       list_add(&route->lr_gwlist, &route->lr_gateway->lpni_routes);
+       /*
+        * force a router check on the gateway to make sure the route is
+        * alive
+        */
+       list_for_each_entry(lpn, &route->lr_gateway->lp_peer_nets,
+                           lpn_peer_nets) {
+               lpn->lpn_rtrcheck_timestamp = 0;
+       }
 
        the_lnet.ln_remote_nets_version++;
+
+       /* add the route on the gateway list */
+       list_add(&route->lr_gwlist, &route->lr_gateway->lp_routes);
+
+       /* take a router reference count on the gateway */
        lnet_rtr_addref_locked(route->lr_gateway);
 }
 
 int
 lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
-              unsigned int priority)
+              __u32 priority, __u32 sensitivity)
 {
-       struct list_head        *e;
-       struct lnet_remotenet   *rnet;
-       struct lnet_remotenet   *rnet2;
-       struct lnet_route               *route;
-       struct lnet_ni          *ni;
-       struct lnet_peer_ni     *lpni;
-       int                     add_route;
-       int                     rc;
-
-       CDEBUG(D_NET, "Add route: net %s hops %d priority %u gw %s\n",
+       struct list_head *route_entry;
+       struct lnet_remotenet *rnet;
+       struct lnet_remotenet *rnet2;
+       struct lnet_route *route;
+       struct lnet_peer_ni *lpni;
+       struct lnet_peer *gw;
+       int add_route;
+       int rc;
+
+       CDEBUG(D_NET, "Add route: remote net %s hops %d priority %u gw %s\n",
               libcfs_net2str(net), hops, priority, libcfs_nid2str(gateway));
 
        if (gateway == LNET_NID_ANY ||
@@ -330,7 +453,8 @@ lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
            (hops != LNET_UNDEFINED_HOPS && (hops < 1 || hops > 255)))
                return -EINVAL;
 
-       if (lnet_islocalnet(net))       /* it's a local network */
+       /* it's a local network */
+       if (lnet_islocalnet(net))
                return -EEXIST;
 
        /* Assume net, route, all new */
@@ -348,12 +472,19 @@ lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
 
        INIT_LIST_HEAD(&rnet->lrn_routes);
        rnet->lrn_net = net;
-       route->lr_hops = hops;
+       /* store the local and remote net that the route represents */
+       route->lr_lnet = LNET_NIDNET(gateway);
        route->lr_net = net;
+       route->lr_nid = gateway;
        route->lr_priority = priority;
+       route->lr_hops = hops;
 
        lnet_net_lock(LNET_LOCK_EX);
 
+       /*
+        * lnet_nid2peerni_ex() grabs a ref on the lpni. We will need to
+        * lose that once we're done
+        */
        lpni = lnet_nid2peerni_ex(gateway, LNET_LOCK_EX);
        if (IS_ERR(lpni)) {
                lnet_net_unlock(LNET_LOCK_EX);
@@ -362,15 +493,16 @@ lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
                LIBCFS_FREE(rnet, sizeof(*rnet));
 
                rc = PTR_ERR(lpni);
-               if (rc == -EHOSTUNREACH) /* gateway is not on a local net. */
-                       return rc;       /* ignore the route entry */
                CERROR("Error %d creating route %s %d %s\n", rc,
                        libcfs_net2str(net), hops,
                        libcfs_nid2str(gateway));
                return rc;
        }
-       route->lr_gateway = lpni;
-       LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
+
+       LASSERT(lpni->lpni_peer_net && lpni->lpni_peer_net->lpn_peer);
+       gw = lpni->lpni_peer_net->lpn_peer;
+
+       route->lr_gateway = gw;
 
        rnet2 = lnet_find_rnet_locked(net);
        if (rnet2 == NULL) {
@@ -381,35 +513,36 @@ lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
 
        /* Search for a duplicate route (it's a NOOP if it is) */
        add_route = 1;
-       list_for_each(e, &rnet2->lrn_routes) {
+       list_for_each(route_entry, &rnet2->lrn_routes) {
                struct lnet_route *route2;
 
-               route2 = list_entry(e, struct lnet_route, lr_list);
+               route2 = list_entry(route_entry, struct lnet_route, lr_list);
                if (route2->lr_gateway == route->lr_gateway) {
                        add_route = 0;
                        break;
                }
 
                /* our lookups must be true */
-               LASSERT(route2->lr_gateway->lpni_nid != gateway);
+               LASSERT(route2->lr_gateway->lp_primary_nid != gateway);
        }
 
+       /*
+        * It is possible to add multiple routes through the same peer,
+        * but it'll be using a different NID of that peer. When the
+        * gateway is discovered, discovery will consolidate the different
+        * peers into one peer. In this case the discovery code will have
+        * to move the routes from the peer that's being deleted to the
+        * consolidated peer lp_routes list
+        */
        if (add_route) {
-               lnet_peer_ni_addref_locked(route->lr_gateway); /* +1 for notify */
+               gw->lp_health_sensitivity = sensitivity;
                lnet_add_route_to_rnet(rnet2, route);
-
-               ni = lnet_get_next_ni_locked(route->lr_gateway->lpni_net, NULL);
-               lnet_net_unlock(LNET_LOCK_EX);
-
-               /* XXX Assume alive */
-               if (ni->ni_net->net_lnd->lnd_notify != NULL)
-                       (ni->ni_net->net_lnd->lnd_notify)(ni, gateway, 1);
-
-               lnet_net_lock(LNET_LOCK_EX);
        }
 
-       /* -1 for notify or !add_route */
-       lnet_peer_ni_decref_locked(route->lr_gateway);
+       /*
+        * get rid of the reference on the lpni.
+        */
+       lnet_peer_ni_decref_locked(lpni);
        lnet_net_unlock(LNET_LOCK_EX);
 
        rc = 0;
@@ -428,74 +561,51 @@ lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
        return rc;
 }
 
-int
-lnet_check_routes(void)
+static void
+lnet_del_route_from_rnet(lnet_nid_t gw_nid, struct list_head *route_list,
+                        struct list_head *zombies)
 {
-       struct lnet_remotenet *rnet;
-       struct lnet_route        *route;
-       struct lnet_route        *route2;
-       struct list_head *e1;
-       struct list_head *e2;
-       int               cpt;
-       struct list_head *rn_list;
-       int               i;
-
-       cpt = lnet_net_lock_current();
-
-       for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
-               rn_list = &the_lnet.ln_remote_nets_hash[i];
-               list_for_each(e1, rn_list) {
-                       rnet = list_entry(e1, struct lnet_remotenet, lrn_list);
-
-                       route2 = NULL;
-                       list_for_each(e2, &rnet->lrn_routes) {
-                               lnet_nid_t      nid1;
-                               lnet_nid_t      nid2;
-                               int             net;
-
-                               route = list_entry(e2, struct lnet_route,
-                                                  lr_list);
-
-                               if (route2 == NULL) {
-                                       route2 = route;
-                                       continue;
-                               }
-
-                               if (route->lr_gateway->lpni_net ==
-                                   route2->lr_gateway->lpni_net)
-                                       continue;
-
-                               nid1 = route->lr_gateway->lpni_nid;
-                               nid2 = route2->lr_gateway->lpni_nid;
-                               net = rnet->lrn_net;
+       struct lnet_peer *gateway;
+       struct lnet_route *route;
+       struct lnet_route *tmp;
+
+       list_for_each_entry_safe(route, tmp, route_list, lr_list) {
+               gateway = route->lr_gateway;
+               if (gw_nid != LNET_NID_ANY &&
+                   gw_nid != gateway->lp_primary_nid)
+                       continue;
 
-                               lnet_net_unlock(cpt);
+               /*
+                * move to zombie to delete outside the lock
+                * Note that this function is called with the
+                * ln_api_mutex held as well as the exclusive net
+                * lock. Adding to the remote net list happens
+                * under the same conditions. Same goes for the
+                * gateway router list
+                */
+               list_move(&route->lr_list, zombies);
+               the_lnet.ln_remote_nets_version++;
 
-                               CERROR("Routes to %s via %s and %s not "
-                                      "supported\n",
-                                      libcfs_net2str(net),
-                                      libcfs_nid2str(nid1),
-                                      libcfs_nid2str(nid2));
-                               return -EINVAL;
-                       }
-               }
+               list_del(&route->lr_gwlist);
+               lnet_rtr_decref_locked(gateway);
        }
-
-       lnet_net_unlock(cpt);
-       return 0;
 }
 
 int
 lnet_del_route(__u32 net, lnet_nid_t gw_nid)
 {
-       struct lnet_peer_ni     *gateway;
-       struct lnet_remotenet   *rnet;
-       struct lnet_route               *route;
-       struct list_head        *e1;
-       struct list_head        *e2;
-       int                     rc = -ENOENT;
-       struct list_head        *rn_list;
-       int                     idx = 0;
+       struct list_head rnet_zombies;
+       struct lnet_remotenet *rnet;
+       struct lnet_remotenet *tmp;
+       struct list_head *rn_list;
+       struct lnet_peer_ni *lpni;
+       struct lnet_route *route;
+       struct list_head zombies;
+       struct lnet_peer *lp;
+       int i = 0;
+
+       INIT_LIST_HEAD(&rnet_zombies);
+       INIT_LIST_HEAD(&zombies);
 
        CDEBUG(D_NET, "Del route: net %s : gw %s\n",
               libcfs_net2str(net), libcfs_nid2str(gw_nid));
@@ -504,60 +614,68 @@ lnet_del_route(__u32 net, lnet_nid_t gw_nid)
         * or a specific route entry actual NIDs) */
 
        lnet_net_lock(LNET_LOCK_EX);
-       if (net == LNET_NIDNET(LNET_NID_ANY))
-               rn_list = &the_lnet.ln_remote_nets_hash[0];
-       else
-               rn_list = lnet_net2rnethash(net);
-
-again:
-       list_for_each(e1, rn_list) {
-               rnet = list_entry(e1, struct lnet_remotenet, lrn_list);
-
-               if (!(net == LNET_NIDNET(LNET_NID_ANY) ||
-                       net == rnet->lrn_net))
-                       continue;
 
-               list_for_each(e2, &rnet->lrn_routes) {
-                       route = list_entry(e2, struct lnet_route, lr_list);
+       lpni = lnet_find_peer_ni_locked(gw_nid);
+       if (lpni) {
+               lp = lpni->lpni_peer_net->lpn_peer;
+               LASSERT(lp);
+               gw_nid = lp->lp_primary_nid;
+               lnet_peer_ni_decref_locked(lpni);
+       }
 
-                       gateway = route->lr_gateway;
-                       if (!(gw_nid == LNET_NID_ANY ||
-                             gw_nid == gateway->lpni_nid))
-                               continue;
+       if (net != LNET_NIDNET(LNET_NID_ANY)) {
+               rnet = lnet_find_rnet_locked(net);
+               if (!rnet) {
+                       lnet_net_unlock(LNET_LOCK_EX);
+                       return -ENOENT;
+               }
+               lnet_del_route_from_rnet(gw_nid, &rnet->lrn_routes,
+                                        &zombies);
+               if (list_empty(&rnet->lrn_routes))
+                       list_move(&rnet->lrn_list, &rnet_zombies);
+               goto delete_zombies;
+       }
 
-                       list_del(&route->lr_list);
-                       list_del(&route->lr_gwlist);
-                       the_lnet.ln_remote_nets_version++;
+       for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++) {
+               rn_list = &the_lnet.ln_remote_nets_hash[i];
 
+               list_for_each_entry_safe(rnet, tmp, rn_list, lrn_list) {
+                       lnet_del_route_from_rnet(gw_nid, &rnet->lrn_routes,
+                                                &zombies);
                        if (list_empty(&rnet->lrn_routes))
-                               list_del(&rnet->lrn_list);
-                       else
-                               rnet = NULL;
-
-                       lnet_rtr_decref_locked(gateway);
-                       lnet_peer_ni_decref_locked(gateway);
-
-                       lnet_net_unlock(LNET_LOCK_EX);
+                               list_move(&rnet->lrn_list, &rnet_zombies);
+               }
+       }
 
-                       LIBCFS_FREE(route, sizeof(*route));
+delete_zombies:
+       /*
+        * check if there are any routes remaining on the gateway
+        * If there are no more routes make sure to set the peer's
+        * lp_disc_net_id to 0 (invalid), in case we add more routes in
+        * the future on that gateway, then we start our discovery process
+        * from scratch
+        */
+       if (lpni) {
+               if (list_empty(&lp->lp_routes))
+                       lp->lp_disc_net_id = 0;
+       }
 
-                       if (rnet != NULL)
-                               LIBCFS_FREE(rnet, sizeof(*rnet));
+       lnet_net_unlock(LNET_LOCK_EX);
 
-                       rc = 0;
-                       lnet_net_lock(LNET_LOCK_EX);
-                       goto again;
-               }
+       while (!list_empty(&zombies)) {
+               route = list_first_entry(&zombies, struct lnet_route, lr_list);
+               list_del(&route->lr_list);
+               LIBCFS_FREE(route, sizeof(*route));
        }
 
-       if (net == LNET_NIDNET(LNET_NID_ANY) &&
-           ++idx < LNET_REMOTE_NETS_HASH_SIZE) {
-               rn_list = &the_lnet.ln_remote_nets_hash[idx];
-               goto again;
+       while (!list_empty(&rnet_zombies)) {
+               rnet = list_first_entry(&rnet_zombies, struct lnet_remotenet,
+                                       lrn_list);
+               list_del(&rnet->lrn_list);
+               LIBCFS_FREE(rnet, sizeof(*rnet));
        }
-       lnet_net_unlock(LNET_LOCK_EX);
 
-       return rc;
+       return 0;
 }
 
 void
@@ -600,15 +718,15 @@ int lnet_get_rtr_pool_cfg(int cpt, struct lnet_ioctl_pool_cfg *pool_cfg)
 
 int
 lnet_get_route(int idx, __u32 *net, __u32 *hops,
-              lnet_nid_t *gateway, __u32 *alive, __u32 *priority)
+              lnet_nid_t *gateway, __u32 *alive, __u32 *priority, __u32 *sensitivity)
 {
-       struct list_head *e1;
-       struct list_head *e2;
        struct lnet_remotenet *rnet;
-       struct lnet_route        *route;
-       int               cpt;
-       int               i;
        struct list_head *rn_list;
+       struct lnet_route *route;
+       struct list_head *e1;
+       struct list_head *e2;
+       int cpt;
+       int i;
 
        cpt = lnet_net_lock_current();
 
@@ -623,9 +741,11 @@ lnet_get_route(int idx, __u32 *net, __u32 *hops,
 
                                if (idx-- == 0) {
                                        *net      = rnet->lrn_net;
+                                       *gateway  = route->lr_nid;
                                        *hops     = route->lr_hops;
                                        *priority = route->lr_priority;
-                                       *gateway  = route->lr_gateway->lpni_nid;
+                                       *sensitivity = route->lr_gateway->
+                                               lp_health_sensitivity;
                                        *alive    = lnet_is_route_alive(route);
                                        lnet_net_unlock(cpt);
                                        return 0;
@@ -638,190 +758,10 @@ lnet_get_route(int idx, __u32 *net, __u32 *hops,
        return -ENOENT;
 }
 
-void
-lnet_swap_pinginfo(struct lnet_ping_buffer *pbuf)
-{
-       struct lnet_ni_status *stat;
-       int nnis;
-       int i;
-
-       __swab32s(&pbuf->pb_info.pi_magic);
-       __swab32s(&pbuf->pb_info.pi_features);
-       __swab32s(&pbuf->pb_info.pi_pid);
-       __swab32s(&pbuf->pb_info.pi_nnis);
-       nnis = pbuf->pb_info.pi_nnis;
-       if (nnis > pbuf->pb_nnis)
-               nnis = pbuf->pb_nnis;
-       for (i = 0; i < nnis; i++) {
-               stat = &pbuf->pb_info.pi_ni[i];
-               __swab64s(&stat->ns_nid);
-               __swab32s(&stat->ns_status);
-       }
-       return;
-}
-
-/**
- * parse router-checker pinginfo, record number of down NIs for remote
- * networks on that router.
- */
-static void
-lnet_parse_rc_info(struct lnet_rc_data *rcd)
-{
-       struct lnet_ping_buffer *pbuf = rcd->rcd_pingbuffer;
-       struct lnet_peer_ni     *gw   = rcd->rcd_gateway;
-       struct lnet_route               *rte;
-       int                     nnis;
-
-       if (!gw->lpni_alive || !pbuf)
-               return;
-
-       /*
-        * Protect gw->lpni_ping_feats. This can be set from
-        * lnet_notify_locked with different locks being held
-        */
-       spin_lock(&gw->lpni_lock);
-
-       if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
-               lnet_swap_pinginfo(pbuf);
-
-       /* NB always racing with network! */
-       if (pbuf->pb_info.pi_magic != LNET_PROTO_PING_MAGIC) {
-               CDEBUG(D_NET, "%s: Unexpected magic %08x\n",
-                      libcfs_nid2str(gw->lpni_nid), pbuf->pb_info.pi_magic);
-               gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
-               goto out;
-       }
-
-       gw->lpni_ping_feats = pbuf->pb_info.pi_features;
-
-       /* Without NI status info there's nothing more to do. */
-       if ((gw->lpni_ping_feats & LNET_PING_FEAT_NI_STATUS) == 0)
-               goto out;
-
-       /* Determine the number of NIs for which there is data. */
-       nnis = pbuf->pb_info.pi_nnis;
-       if (pbuf->pb_nnis < nnis) {
-               if (rcd->rcd_nnis < nnis)
-                       rcd->rcd_nnis = nnis;
-               nnis = pbuf->pb_nnis;
-       }
-
-       list_for_each_entry(rte, &gw->lpni_routes, lr_gwlist) {
-               int     down = 0;
-               int     up = 0;
-               int     i;
-
-               /* If routing disabled then the route is down. */
-               if ((gw->lpni_ping_feats & LNET_PING_FEAT_RTE_DISABLED) != 0) {
-                       rte->lr_downis = 1;
-                       continue;
-               }
-
-               for (i = 0; i < nnis; i++) {
-                       struct lnet_ni_status *stat = &pbuf->pb_info.pi_ni[i];
-                       lnet_nid_t       nid = stat->ns_nid;
-
-                       if (nid == LNET_NID_ANY) {
-                               CDEBUG(D_NET, "%s: unexpected LNET_NID_ANY\n",
-                                      libcfs_nid2str(gw->lpni_nid));
-                               gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
-                               goto out;
-                       }
-
-                       if (LNET_NETTYP(LNET_NIDNET(nid)) == LOLND)
-                               continue;
-
-                       if (stat->ns_status == LNET_NI_STATUS_DOWN) {
-                               down++;
-                               continue;
-                       }
-
-                       if (stat->ns_status == LNET_NI_STATUS_UP) {
-                               if (LNET_NIDNET(nid) == rte->lr_net) {
-                                       up = 1;
-                                       break;
-                               }
-                               continue;
-                       }
-
-                       CDEBUG(D_NET, "%s: Unexpected status 0x%x\n",
-                              libcfs_nid2str(gw->lpni_nid), stat->ns_status);
-                       gw->lpni_ping_feats = LNET_PING_FEAT_INVAL;
-                       goto out;
-               }
-
-               if (up) { /* ignore downed NIs if NI for dest network is up */
-                       rte->lr_downis = 0;
-                       continue;
-               }
-               /* if @down is zero and this route is single-hop, it means
-                * we can't find NI for target network */
-               if (down == 0 && rte->lr_hops == 1)
-                       down = 1;
-
-               rte->lr_downis = down;
-       }
-out:
-       spin_unlock(&gw->lpni_lock);
-}
-
-static void
-lnet_router_checker_event(struct lnet_event *event)
-{
-       struct lnet_rc_data *rcd = event->md.user_ptr;
-       struct lnet_peer_ni *lp;
-
-       LASSERT(rcd != NULL);
-
-       if (event->unlinked) {
-               LNetInvalidateMDHandle(&rcd->rcd_mdh);
-               return;
-       }
-
-       LASSERT(event->type == LNET_EVENT_SEND ||
-               event->type == LNET_EVENT_REPLY);
-
-       lp = rcd->rcd_gateway;
-       LASSERT(lp != NULL);
-
-        /* NB: it's called with holding lnet_res_lock, we have a few
-         * places need to hold both locks at the same time, please take
-         * care of lock ordering */
-       lnet_net_lock(lp->lpni_cpt);
-       if (!lnet_isrouter(lp) || lp->lpni_rcd != rcd) {
-               /* ignore if no longer a router or rcd is replaced */
-               goto out;
-       }
-
-       if (event->type == LNET_EVENT_SEND) {
-               lp->lpni_ping_notsent = 0;
-               if (event->status == 0)
-                       goto out;
-       }
-
-       /* LNET_EVENT_REPLY */
-       /* A successful REPLY means the router is up.  If _any_ comms
-        * to the router fail I assume it's down (this will happen if
-        * we ping alive routers to try to detect router death before
-        * apps get burned). */
-
-       lnet_notify_locked(lp, 1, !event->status, ktime_get_seconds());
-       /* The router checker will wake up very shortly and do the
-        * actual notification.
-        * XXX If 'lp' stops being a router before then, it will still
-        * have the notification pending!!! */
-
-       if (avoid_asym_router_failure && event->status == 0)
-               lnet_parse_rc_info(rcd);
-
- out:
-       lnet_net_unlock(lp->lpni_cpt);
-}
-
 static void
 lnet_wait_known_routerstate(void)
 {
-       struct lnet_peer_ni *rtr;
+       struct lnet_peer *rtr;
        struct list_head *entry;
        int all_known;
 
@@ -832,17 +772,17 @@ lnet_wait_known_routerstate(void)
 
                all_known = 1;
                list_for_each(entry, &the_lnet.ln_routers) {
-                       rtr = list_entry(entry, struct lnet_peer_ni,
-                                        lpni_rtr_list);
+                       rtr = list_entry(entry, struct lnet_peer,
+                                        lp_rtr_list);
 
-                       spin_lock(&rtr->lpni_lock);
+                       spin_lock(&rtr->lp_lock);
 
-                       if (rtr->lpni_alive_count == 0) {
+                       if ((rtr->lp_state & LNET_PEER_DISCOVERED) == 0) {
                                all_known = 0;
-                               spin_unlock(&rtr->lpni_lock);
+                               spin_unlock(&rtr->lp_lock);
                                break;
                        }
-                       spin_unlock(&rtr->lpni_lock);
+                       spin_unlock(&rtr->lp_lock);
                }
 
                lnet_net_unlock(cpt);
@@ -855,290 +795,65 @@ lnet_wait_known_routerstate(void)
        }
 }
 
-void
-lnet_router_ni_update_locked(struct lnet_peer_ni *gw, __u32 net)
+static inline bool
+lnet_net_set_status_locked(struct lnet_net *net, __u32 status)
 {
-       struct lnet_route *rte;
+       struct lnet_ni *ni;
+       bool update = false;
 
-       if ((gw->lpni_ping_feats & LNET_PING_FEAT_NI_STATUS) != 0) {
-               list_for_each_entry(rte, &gw->lpni_routes, lr_gwlist) {
-                       if (rte->lr_net == net) {
-                               rte->lr_downis = 0;
-                               break;
-                       }
+       list_for_each_entry(ni, &net->net_ni_list, ni_netlist) {
+               lnet_ni_lock(ni);
+               if (ni->ni_status &&
+                   ni->ni_status->ns_status != status) {
+                   ni->ni_status->ns_status = status;
+                   update = true;
                }
+               lnet_ni_unlock(ni);
        }
+
+       return update;
 }
 
-static void
+static bool
 lnet_update_ni_status_locked(void)
 {
-       struct lnet_ni *ni = NULL;
+       struct lnet_net *net;
+       bool push = false;
        time64_t now;
        time64_t timeout;
 
        LASSERT(the_lnet.ln_routing);
 
-       timeout = router_ping_timeout +
-                 MAX(live_router_check_interval, dead_router_check_interval);
+       timeout = router_ping_timeout + alive_router_check_interval;
 
        now = ktime_get_real_seconds();
-       while ((ni = lnet_get_next_ni_locked(NULL, ni))) {
-               if (ni->ni_net->net_lnd->lnd_type == LOLND)
+       list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
+               if (net->net_lnd->lnd_type == LOLND)
                        continue;
 
-               if (now < ni->ni_last_alive + timeout)
+               if (now < net->net_last_alive + timeout)
                        continue;
 
-               lnet_ni_lock(ni);
+               spin_lock(&net->net_lock);
                /* re-check with lock */
-               if (now < ni->ni_last_alive + timeout) {
-                       lnet_ni_unlock(ni);
+               if (now < net->net_last_alive + timeout) {
+                       spin_unlock(&net->net_lock);
                        continue;
                }
+               spin_unlock(&net->net_lock);
 
-               LASSERT(ni->ni_status != NULL);
-
-               if (ni->ni_status->ns_status != LNET_NI_STATUS_DOWN) {
-                       CDEBUG(D_NET, "NI(%s:%lld) status changed to down\n",
-                              libcfs_nid2str(ni->ni_nid), timeout);
-                       /* NB: so far, this is the only place to set
-                        * NI status to "down" */
-                       ni->ni_status->ns_status = LNET_NI_STATUS_DOWN;
-               }
-               lnet_ni_unlock(ni);
-       }
-}
-
-static void
-lnet_destroy_rc_data(struct lnet_rc_data *rcd)
-{
-       LASSERT(list_empty(&rcd->rcd_list));
-       /* detached from network */
-       LASSERT(LNetMDHandleIsInvalid(rcd->rcd_mdh));
-
-       if (rcd->rcd_gateway != NULL) {
-               int cpt = rcd->rcd_gateway->lpni_cpt;
-
-               lnet_net_lock(cpt);
-               lnet_peer_ni_decref_locked(rcd->rcd_gateway);
-               lnet_net_unlock(cpt);
-       }
-
-       if (rcd->rcd_pingbuffer != NULL)
-               lnet_ping_buffer_decref(rcd->rcd_pingbuffer);
-
-       LIBCFS_FREE(rcd, sizeof(*rcd));
-}
-
-static struct lnet_rc_data *
-lnet_update_rc_data_locked(struct lnet_peer_ni *gateway)
-{
-       struct lnet_handle_md mdh;
-       struct lnet_rc_data *rcd;
-       struct lnet_ping_buffer *pbuf = NULL;
-       int nnis = LNET_INTERFACES_MIN;
-       int rc;
-       int i;
-
-       rcd = gateway->lpni_rcd;
-       if (rcd) {
-               nnis = rcd->rcd_nnis;
-               mdh = rcd->rcd_mdh;
-               LNetInvalidateMDHandle(&rcd->rcd_mdh);
-               pbuf = rcd->rcd_pingbuffer;
-               rcd->rcd_pingbuffer = NULL;
-       } else {
-               LNetInvalidateMDHandle(&mdh);
-       }
-
-       lnet_net_unlock(gateway->lpni_cpt);
-
-       if (rcd) {
-               LNetMDUnlink(mdh);
-               lnet_ping_buffer_decref(pbuf);
-       } else {
-               LIBCFS_ALLOC(rcd, sizeof(*rcd));
-               if (rcd == NULL)
-                       goto out;
-
-               LNetInvalidateMDHandle(&rcd->rcd_mdh);
-               INIT_LIST_HEAD(&rcd->rcd_list);
-               rcd->rcd_nnis = nnis;
-       }
-
-       pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
-       if (pbuf == NULL)
-               goto out;
-
-       for (i = 0; i < nnis; i++) {
-               pbuf->pb_info.pi_ni[i].ns_nid = LNET_NID_ANY;
-               pbuf->pb_info.pi_ni[i].ns_status = LNET_NI_STATUS_INVALID;
-       }
-       rcd->rcd_pingbuffer = pbuf;
-
-       LASSERT(!LNetEQHandleIsInvalid(the_lnet.ln_rc_eqh));
-       rc = LNetMDBind((struct lnet_md){.start     = &pbuf->pb_info,
-                                   .user_ptr  = rcd,
-                                   .length    = LNET_PING_INFO_SIZE(nnis),
-                                   .threshold = LNET_MD_THRESH_INF,
-                                   .options   = LNET_MD_TRUNCATE,
-                                   .eq_handle = the_lnet.ln_rc_eqh},
-                       LNET_UNLINK,
-                       &rcd->rcd_mdh);
-       if (rc < 0) {
-               CERROR("Can't bind MD: %d\n", rc);
-               goto out_ping_buffer_decref;
-       }
-       LASSERT(rc == 0);
-
-       lnet_net_lock(gateway->lpni_cpt);
-       /* Check if this is still a router. */
-       if (!lnet_isrouter(gateway))
-               goto out_unlock;
-       /* Check if someone else installed router data. */
-       if (gateway->lpni_rcd && gateway->lpni_rcd != rcd)
-               goto out_unlock;
-
-       /* Install and/or update the router data. */
-       if (!gateway->lpni_rcd) {
-               lnet_peer_ni_addref_locked(gateway);
-               rcd->rcd_gateway = gateway;
-               gateway->lpni_rcd = rcd;
-       }
-       gateway->lpni_ping_notsent = 0;
-
-       return rcd;
-
-out_unlock:
-       lnet_net_unlock(gateway->lpni_cpt);
-       rc = LNetMDUnlink(mdh);
-       LASSERT(rc == 0);
-out_ping_buffer_decref:
-       lnet_ping_buffer_decref(pbuf);
-out:
-       if (rcd && rcd != gateway->lpni_rcd)
-               lnet_destroy_rc_data(rcd);
-       lnet_net_lock(gateway->lpni_cpt);
-       return gateway->lpni_rcd;
-}
-
-static int
-lnet_router_check_interval(struct lnet_peer_ni *rtr)
-{
-       int secs;
-
-       secs = rtr->lpni_alive ? live_router_check_interval :
-                              dead_router_check_interval;
-       if (secs < 0)
-               secs = 0;
-
-       return secs;
-}
-
-static void
-lnet_ping_router_locked(struct lnet_peer_ni *rtr)
-{
-       struct lnet_rc_data *rcd = NULL;
-       time64_t now = ktime_get_seconds();
-       time64_t secs;
-       struct lnet_ni *ni;
-
-       lnet_peer_ni_addref_locked(rtr);
-
-       if (rtr->lpni_ping_deadline != 0 && /* ping timed out? */
-           now >  rtr->lpni_ping_deadline)
-               lnet_notify_locked(rtr, 1, 0, now);
-
-       /* Run any outstanding notifications */
-       ni = lnet_get_next_ni_locked(rtr->lpni_net, NULL);
-       lnet_ni_notify_locked(ni, rtr);
-
-       if (!lnet_isrouter(rtr) ||
-           the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
-               /* router table changed or router checker is shutting down */
-               lnet_peer_ni_decref_locked(rtr);
-               return;
-       }
-
-       rcd = rtr->lpni_rcd;
-
-       /*
-        * The response to the router checker ping could've timed out and
-        * the mdh might've been invalidated, so we need to update it
-        * again.
-        */
-       if (!rcd || rcd->rcd_nnis > rcd->rcd_pingbuffer->pb_nnis ||
-           LNetMDHandleIsInvalid(rcd->rcd_mdh))
-               rcd = lnet_update_rc_data_locked(rtr);
-       if (rcd == NULL)
-               return;
-
-       secs = lnet_router_check_interval(rtr);
-
-       CDEBUG(D_NET,
-              "rtr %s %lld: deadline %lld ping_notsent %d alive %d "
-              "alive_count %d lpni_ping_timestamp %lld\n",
-              libcfs_nid2str(rtr->lpni_nid), secs,
-              rtr->lpni_ping_deadline, rtr->lpni_ping_notsent,
-              rtr->lpni_alive, rtr->lpni_alive_count, rtr->lpni_ping_timestamp);
-
-       if (secs != 0 && !rtr->lpni_ping_notsent &&
-           now > rtr->lpni_ping_timestamp + secs) {
-               int               rc;
-               struct lnet_process_id id;
-               struct lnet_handle_md mdh;
-
-               id.nid = rtr->lpni_nid;
-               id.pid = LNET_PID_LUSTRE;
-               CDEBUG(D_NET, "Check: %s\n", libcfs_id2str(id));
-
-               rtr->lpni_ping_notsent   = 1;
-               rtr->lpni_ping_timestamp = now;
-
-               mdh = rcd->rcd_mdh;
-
-               if (rtr->lpni_ping_deadline == 0) {
-                       rtr->lpni_ping_deadline = ktime_get_seconds() +
-                                                 router_ping_timeout;
-               }
-
-               lnet_net_unlock(rtr->lpni_cpt);
-
-               rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
-                            LNET_PROTO_PING_MATCHBITS, 0, false);
-
-               lnet_net_lock(rtr->lpni_cpt);
-               if (rc != 0)
-                       rtr->lpni_ping_notsent = 0; /* no event pending */
+               /*
+                * if the net didn't receive any traffic for past the
+                * timeout on any of its constituent NIs, then mark all
+                * the NIs down.
+                */
+               push = lnet_net_set_status_locked(net, LNET_NI_STATUS_DOWN);
        }
 
-       lnet_peer_ni_decref_locked(rtr);
-       return;
+       return push;
 }
 
-int lnet_router_pre_mt_start(void)
-{
-       int rc;
-
-       if (check_routers_before_use &&
-           dead_router_check_interval <= 0) {
-               LCONSOLE_ERROR_MSG(0x10a, "'dead_router_check_interval' must be"
-                                  " set if 'check_routers_before_use' is set"
-                                  "\n");
-               return -EINVAL;
-       }
-
-       rc = LNetEQAlloc(0, lnet_router_checker_event, &the_lnet.ln_rc_eqh);
-       if (rc != 0) {
-               CERROR("Can't allocate EQ(0): %d\n", rc);
-               return -ENOMEM;
-       }
-
-       return 0;
-}
-
-void lnet_router_post_mt_start(void)
+void lnet_wait_router_start(void)
 {
        if (check_routers_before_use) {
                /* Note that a helpful side-effect of pinging all known routers
@@ -1148,97 +863,6 @@ void lnet_router_post_mt_start(void)
        }
 }
 
-void
-lnet_router_cleanup(void)
-{
-       int rc;
-
-       rc = LNetEQFree(the_lnet.ln_rc_eqh);
-       LASSERT(rc == 0);
-       return;
-}
-
-void
-lnet_prune_rc_data(int wait_unlink)
-{
-       struct lnet_rc_data *rcd;
-       struct lnet_rc_data *tmp;
-       struct lnet_peer_ni *lp;
-       struct list_head head;
-       int i = 2;
-
-       if (likely(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING &&
-                  list_empty(&the_lnet.ln_rcd_deathrow) &&
-                  list_empty(&the_lnet.ln_rcd_zombie)))
-               return;
-
-       INIT_LIST_HEAD(&head);
-
-       lnet_net_lock(LNET_LOCK_EX);
-
-       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
-               /* router checker is stopping, prune all */
-               list_for_each_entry(lp, &the_lnet.ln_routers,
-                                   lpni_rtr_list) {
-                       if (lp->lpni_rcd == NULL)
-                               continue;
-
-                       LASSERT(list_empty(&lp->lpni_rcd->rcd_list));
-                       list_add(&lp->lpni_rcd->rcd_list,
-                                &the_lnet.ln_rcd_deathrow);
-                       lp->lpni_rcd = NULL;
-               }
-       }
-
-       /* unlink all RCDs on deathrow list */
-       list_splice_init(&the_lnet.ln_rcd_deathrow, &head);
-
-       if (!list_empty(&head)) {
-               lnet_net_unlock(LNET_LOCK_EX);
-
-               list_for_each_entry(rcd, &head, rcd_list)
-                       LNetMDUnlink(rcd->rcd_mdh);
-
-               lnet_net_lock(LNET_LOCK_EX);
-       }
-
-       list_splice_init(&head, &the_lnet.ln_rcd_zombie);
-
-       /* release all zombie RCDs */
-       while (!list_empty(&the_lnet.ln_rcd_zombie)) {
-               list_for_each_entry_safe(rcd, tmp, &the_lnet.ln_rcd_zombie,
-                                        rcd_list) {
-                       if (LNetMDHandleIsInvalid(rcd->rcd_mdh))
-                               list_move(&rcd->rcd_list, &head);
-               }
-
-               wait_unlink = wait_unlink &&
-                             !list_empty(&the_lnet.ln_rcd_zombie);
-
-               lnet_net_unlock(LNET_LOCK_EX);
-
-               while (!list_empty(&head)) {
-                       rcd = list_entry(head.next,
-                                        struct lnet_rc_data, rcd_list);
-                       list_del_init(&rcd->rcd_list);
-                       lnet_destroy_rc_data(rcd);
-               }
-
-               if (!wait_unlink)
-                       return;
-
-               i++;
-               CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
-                      "Waiting for rc buffers to unlink\n");
-               set_current_state(TASK_UNINTERRUPTIBLE);
-               schedule_timeout(cfs_time_seconds(1) / 4);
-
-               lnet_net_lock(LNET_LOCK_EX);
-       }
-
-       lnet_net_unlock(LNET_LOCK_EX);
-}
-
 /*
  * This function is called from the monitor thread to check if there are
  * any active routers that need to be checked.
@@ -1246,52 +870,113 @@ lnet_prune_rc_data(int wait_unlink)
 inline bool
 lnet_router_checker_active(void)
 {
-       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
-               return true;
-
        /* Router Checker thread needs to run when routing is enabled in
         * order to call lnet_update_ni_status_locked() */
        if (the_lnet.ln_routing)
                return true;
 
-       /* if there are routers that need to be cleaned up then do so */
-       if (!list_empty(&the_lnet.ln_rcd_deathrow) ||
-           !list_empty(&the_lnet.ln_rcd_zombie))
-               return true;
-
        return !list_empty(&the_lnet.ln_routers) &&
-               (live_router_check_interval > 0 ||
-                dead_router_check_interval > 0);
+               alive_router_check_interval > 0;
 }
 
 void
 lnet_check_routers(void)
 {
-       struct lnet_peer_ni *rtr;
+       struct lnet_peer_net *first_lpn = NULL;
+       struct lnet_peer_net *lpn;
+       struct lnet_peer_ni *lpni;
        struct list_head *entry;
-       __u64   version;
-       int     cpt;
-       int     cpt2;
+       struct lnet_peer *rtr;
+       bool push = false;
+       bool found_lpn;
+       __u64 version;
+       __u32 net_id;
+       time64_t now;
+       int cpt;
+       int rc;
 
        cpt = lnet_net_lock_current();
 rescan:
        version = the_lnet.ln_routers_version;
 
        list_for_each(entry, &the_lnet.ln_routers) {
-               rtr = list_entry(entry, struct lnet_peer_ni,
-                                       lpni_rtr_list);
+               rtr = list_entry(entry, struct lnet_peer,
+                                lp_rtr_list);
+
+               now = ktime_get_real_seconds();
+
+               /*
+                * only discover the router if we've passed
+                * alive_router_check_interval seconds. Some of the router
+                * interfaces could be down and in that case they would be
+                * undergoing recovery separately from this discovery.
+                */
+               /* find next peer net which is also local */
+               net_id = rtr->lp_disc_net_id;
+               do {
+                       lpn = lnet_get_next_peer_net_locked(rtr, net_id);
+                       if (!lpn) {
+                               CERROR("gateway %s has no networks\n",
+                               libcfs_nid2str(rtr->lp_primary_nid));
+                               break;
+                       }
+                       if (first_lpn == lpn)
+                               break;
+                       if (!first_lpn)
+                               first_lpn = lpn;
+                       found_lpn = lnet_islocalnet_locked(lpn->lpn_net_id);
+                       net_id = lpn->lpn_net_id;
+               } while (!found_lpn);
+
+               if (!found_lpn || !lpn) {
+                       CERROR("no local network found for gateway %s\n",
+                              libcfs_nid2str(rtr->lp_primary_nid));
+                       continue;
+               }
 
-               cpt2 = rtr->lpni_cpt;
-               if (cpt != cpt2) {
-                       lnet_net_unlock(cpt);
-                       cpt = cpt2;
-                       lnet_net_lock(cpt);
-                       /* the routers list has changed */
-                       if (version != the_lnet.ln_routers_version)
-                               goto rescan;
+               if (now - lpn->lpn_rtrcheck_timestamp <
+                   alive_router_check_interval / lnet_current_net_count)
+                      continue;
+
+               /*
+                * If we're currently discovering the peer then don't
+                * issue another discovery
+                */
+               spin_lock(&rtr->lp_lock);
+               if (rtr->lp_state & LNET_PEER_RTR_DISCOVERY) {
+                       spin_unlock(&rtr->lp_lock);
+                       continue;
+               }
+               /* make sure we actively discover the router */
+               rtr->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
+               rtr->lp_state |= LNET_PEER_RTR_DISCOVERY;
+               spin_unlock(&rtr->lp_lock);
+
+               /* find the peer_ni associated with the primary NID */
+               lpni = lnet_peer_get_ni_locked(rtr, rtr->lp_primary_nid);
+               if (!lpni) {
+                       CDEBUG(D_NET, "Expected to find an lpni for %s, but non found\n",
+                              libcfs_nid2str(rtr->lp_primary_nid));
+                       continue;
                }
+               lnet_peer_ni_addref_locked(lpni);
+
+               /* specify the net to use */
+               rtr->lp_disc_net_id = lpn->lpn_net_id;
+
+               /* discover the router */
+               CDEBUG(D_NET, "discover %s, cpt = %d\n",
+                      libcfs_nid2str(lpni->lpni_nid), cpt);
+               rc = lnet_discover_peer_locked(lpni, cpt, false);
 
-               lnet_ping_router_locked(rtr);
+               /* decrement ref count acquired by find_peer_ni_locked() */
+               lnet_peer_ni_decref_locked(lpni);
+
+               if (!rc)
+                       lpn->lpn_rtrcheck_timestamp = now;
+               else
+                       CERROR("Failed to discover router %s\n",
+                              libcfs_nid2str(rtr->lp_primary_nid));
 
                /* NB dropped lock */
                if (version != the_lnet.ln_routers_version) {
@@ -1301,11 +986,13 @@ rescan:
        }
 
        if (the_lnet.ln_routing)
-               lnet_update_ni_status_locked();
+               push = lnet_update_ni_status_locked();
 
        lnet_net_unlock(cpt);
 
-       lnet_prune_rc_data(0); /* don't wait for UNLINK */
+       /* if the status of the ni changed update the peers */
+       if (push)
+               lnet_push_update_to_peers(1);
 }
 
 void
@@ -1621,6 +1308,7 @@ lnet_rtrpools_alloc(int im_a_router)
        lnet_net_lock(LNET_LOCK_EX);
        the_lnet.ln_routing = 1;
        lnet_net_unlock(LNET_LOCK_EX);
+       wake_up(&the_lnet.ln_mt_waitq);
        return 0;
 
  failed:
@@ -1733,12 +1421,27 @@ lnet_rtrpools_disable(void)
        lnet_rtrpools_free(1);
 }
 
+static inline void
+lnet_notify_peer_down(struct lnet_ni *ni, lnet_nid_t nid)
+{
+       if (ni->ni_net->net_lnd->lnd_notify_peer_down != NULL)
+               (ni->ni_net->net_lnd->lnd_notify_peer_down)(nid);
+}
+
+/*
+ * ni: local NI used to communicate with the peer
+ * nid: peer NID
+ * alive: true if peer is alive, false otherwise
+ * reset: reset health value. This is requested by the LND.
+ * when: notificaiton time.
+ */
 int
-lnet_notify(struct lnet_ni *ni, lnet_nid_t nid, int alive, time64_t when)
+lnet_notify(struct lnet_ni *ni, lnet_nid_t nid, bool alive, bool reset,
+           time64_t when)
 {
-       struct lnet_peer_ni *lp = NULL;
+       struct lnet_peer_ni *lpni = NULL;
        time64_t now = ktime_get_seconds();
-       int cpt = lnet_cpt_of_nid(nid, ni);
+       int cpt;
 
        LASSERT (!in_interrupt ());
 
@@ -1770,48 +1473,44 @@ lnet_notify(struct lnet_ni *ni, lnet_nid_t nid, int alive, time64_t when)
                return 0;
        }
 
-       lnet_net_lock(cpt);
+       /* must lock 0 since this is used for synchronization */
+       lnet_net_lock(0);
 
        if (the_lnet.ln_state != LNET_STATE_RUNNING) {
-               lnet_net_unlock(cpt);
+               lnet_net_unlock(0);
                return -ESHUTDOWN;
        }
 
-       lp = lnet_find_peer_ni_locked(nid);
-       if (lp == NULL) {
+       lpni = lnet_find_peer_ni_locked(nid);
+       if (lpni == NULL) {
                /* nid not found */
-               lnet_net_unlock(cpt);
+               lnet_net_unlock(0);
                CDEBUG(D_NET, "%s not found\n", libcfs_nid2str(nid));
                return 0;
        }
 
-       /*
-        * It is possible for this function to be called for the same peer
-        * but with different NIs. We want to synchronize the notification
-        * between the different calls. So we will use the lpni_cpt to
-        * grab the net lock.
-        */
-       if (lp->lpni_cpt != cpt) {
-               lnet_net_unlock(cpt);
-               cpt = lp->lpni_cpt;
-               lnet_net_lock(cpt);
+       if (alive) {
+               if (reset)
+                       lnet_set_healthv(&lpni->lpni_healthv,
+                                        LNET_MAX_HEALTH_VALUE);
+               else
+                       lnet_inc_healthv(&lpni->lpni_healthv);
+       } else {
+               lnet_handle_remote_failure_locked(lpni);
        }
 
-       /* We can't fully trust LND on reporting exact peer last_alive
-        * if he notifies us about dead peer. For example ksocklnd can
-        * call us with when == _time_when_the_node_was_booted_ if
-        * no connections were successfully established */
-       if (ni != NULL && !alive && when < lp->lpni_last_alive)
-               when = lp->lpni_last_alive;
-
-       lnet_notify_locked(lp, ni == NULL, alive, when);
+       /* recalculate aliveness */
+       alive = lnet_is_peer_ni_alive(lpni);
+       lnet_net_unlock(0);
 
-       if (ni != NULL)
-               lnet_ni_notify_locked(ni, lp);
-
-       lnet_peer_ni_decref_locked(lp);
+       if (ni != NULL && !alive)
+               lnet_notify_peer_down(ni, lpni->lpni_nid);
 
+       cpt = lpni->lpni_cpt;
+       lnet_net_lock(cpt);
+       lnet_peer_ni_decref_locked(lpni);
        lnet_net_unlock(cpt);
+
        return 0;
 }
 EXPORT_SYMBOL(lnet_notify);
index 24d9c0f..23ec36e 100644 (file)
@@ -224,18 +224,17 @@ proc_lnet_routes(struct ctl_table *table, int write, void __user *buffer,
                }
 
                if (route != NULL) {
-                       __u32        net        = rnet->lrn_net;
-                       __u32 hops              = route->lr_hops;
-                       unsigned int priority   = route->lr_priority;
-                       lnet_nid_t   nid        = route->lr_gateway->lpni_nid;
-                       int          alive      = lnet_is_route_alive(route);
+                       __u32 net = rnet->lrn_net;
+                       __u32 hops = route->lr_hops;
+                       unsigned int priority = route->lr_priority;
+                       int alive = lnet_is_route_alive(route);
 
                        s += snprintf(s, tmpstr + tmpsiz - s,
                                      "%-8s %4d %8u %7s %s\n",
                                      libcfs_net2str(net), hops,
                                      priority,
                                      alive ? "up" : "down",
-                                     libcfs_nid2str(nid));
+                                     libcfs_nid2str(route->lr_nid));
                        LASSERT(tmpstr + tmpsiz - s > 0);
                }
 
@@ -291,10 +290,8 @@ proc_lnet_routers(struct ctl_table *table, int write, void __user *buffer,
 
        if (*ppos == 0) {
                s += snprintf(s, tmpstr + tmpsiz - s,
-                             "%-4s %7s %9s %6s %12s %9s %8s %7s %s\n",
-                             "ref", "rtr_ref", "alive_cnt", "state",
-                             "last_ping", "ping_sent", "deadline",
-                             "down_ni", "router");
+                             "%-4s %7s %5s %s\n",
+                             "ref", "rtr_ref", "alive", "router");
                LASSERT(tmpstr + tmpsiz - s > 0);
 
                lnet_net_lock(0);
@@ -303,7 +300,7 @@ proc_lnet_routers(struct ctl_table *table, int write, void __user *buffer,
                *ppos = LNET_PROC_POS_MAKE(0, ver, 0, off);
        } else {
                struct list_head *r;
-               struct lnet_peer_ni *peer = NULL;
+               struct lnet_peer *peer = NULL;
                int               skip = off - 1;
 
                lnet_net_lock(0);
@@ -318,9 +315,9 @@ proc_lnet_routers(struct ctl_table *table, int write, void __user *buffer,
                r = the_lnet.ln_routers.next;
 
                while (r != &the_lnet.ln_routers) {
-                       struct lnet_peer_ni *lp =
-                         list_entry(r, struct lnet_peer_ni,
-                                    lpni_rtr_list);
+                       struct lnet_peer *lp =
+                         list_entry(r, struct lnet_peer,
+                                    lp_rtr_list);
 
                        if (skip == 0) {
                                peer = lp;
@@ -332,47 +329,16 @@ proc_lnet_routers(struct ctl_table *table, int write, void __user *buffer,
                }
 
                if (peer != NULL) {
-                       lnet_nid_t nid = peer->lpni_nid;
-                       time64_t now = ktime_get_seconds();
-                       time64_t deadline = peer->lpni_ping_deadline;
-                       int nrefs     = atomic_read(&peer->lpni_refcount);
-                       int nrtrrefs  = peer->lpni_rtr_refcount;
-                       int alive_cnt = peer->lpni_alive_count;
-                       int alive     = peer->lpni_alive;
-                       int pingsent  = !peer->lpni_ping_notsent;
-                       time64_t last_ping = now - peer->lpni_ping_timestamp;
-                       int down_ni   = 0;
-                       struct lnet_route *rtr;
-
-                       if ((peer->lpni_ping_feats &
-                            LNET_PING_FEAT_NI_STATUS) != 0) {
-                               list_for_each_entry(rtr, &peer->lpni_routes,
-                                                   lr_gwlist) {
-                                       /* downis on any route should be the
-                                        * number of downis on the gateway */
-                                       if (rtr->lr_downis != 0) {
-                                               down_ni = rtr->lr_downis;
-                                               break;
-                                       }
-                               }
-                       }
+                       lnet_nid_t nid = peer->lp_primary_nid;
+                       int nrefs     = atomic_read(&peer->lp_refcount);
+                       int nrtrrefs  = peer->lp_rtr_refcount;
+                       int alive     = lnet_is_gateway_alive(peer);
 
-                       if (deadline == 0)
-                               s += snprintf(s, tmpstr + tmpsiz - s,
-                                             "%-4d %7d %9d %6s %12llu %9d %8s %7d %s\n",
-                                             nrefs, nrtrrefs, alive_cnt,
-                                             alive ? "up" : "down", last_ping,
-                                             pingsent, "NA", down_ni,
-                                             libcfs_nid2str(nid));
-                       else
-                               s += snprintf(s, tmpstr + tmpsiz - s,
-                                             "%-4d %7d %9d %6s %12llu %9d %8llu %7d %s\n",
-                                             nrefs, nrtrrefs, alive_cnt,
-                                             alive ? "up" : "down", last_ping,
-                                             pingsent,
-                                             deadline - now,
-                                             down_ni, libcfs_nid2str(nid));
-                       LASSERT(tmpstr + tmpsiz - s > 0);
+                       s += snprintf(s, tmpstr + tmpsiz - s,
+                                     "%-4d %7d %5s %s\n",
+                                     nrefs, nrtrrefs,
+                                     alive ? "up" : "down",
+                                     libcfs_nid2str(nid));
                }
 
                lnet_net_unlock(0);
@@ -535,20 +501,8 @@ proc_lnet_peers(struct ctl_table *table, int write, void __user *buffer,
 
                        if (lnet_isrouter(peer) ||
                            lnet_peer_aliveness_enabled(peer))
-                               aliveness = peer->lpni_alive ? "up" : "down";
-
-                       if (lnet_peer_aliveness_enabled(peer)) {
-                               time64_t now = ktime_get_seconds();
-
-                               lastalive = now - peer->lpni_last_alive;
-
-                               /* No need to mess up peers contents with
-                                * arbitrarily long integers - it suffices to
-                                * know that lastalive is more than 10000s old
-                                */
-                               if (lastalive >= 10000)
-                                       lastalive = 9999;
-                       }
+                               aliveness = lnet_is_peer_ni_alive(peer) ?
+                                       "up" : "down";
 
                        lnet_net_unlock(cpt);
 
@@ -735,7 +689,7 @@ proc_lnet_nis(struct ctl_table *table, int write, void __user *buffer,
                        int j;
 
                        if (the_lnet.ln_routing)
-                               last_alive = now - ni->ni_last_alive;
+                               last_alive = now - ni->ni_net->net_last_alive;
 
                        lnet_ni_lock(ni);
                        LASSERT(ni->ni_status != NULL);
index bdcda35..e9d7463 100644 (file)
@@ -979,7 +979,7 @@ out:
 }
 
 int lustre_lnet_config_route(char *nw, char *gw, int hops, int prio,
-                            int seq_no, struct cYAML **err_rc)
+                            int sen, int seq_no, struct cYAML **err_rc)
 {
        struct lnet_ioctl_config_data data;
        lnet_nid_t gateway_nid;
@@ -1040,6 +1040,17 @@ int lustre_lnet_config_route(char *nw, char *gw, int hops, int prio,
                goto out;
        }
 
+       if (sen == -1) {
+               sen = 1;
+       } else if (sen < 1) {
+               snprintf(err_str,
+                        sizeof(err_str),
+                       "\"invalid health sensitivity %d, must be 1 or greater\"",
+                       sen );
+               rc = LUSTRE_CFG_RC_OUT_OF_RANGE_PARAM;
+               goto out;
+       }
+
        rc = lnet_expr2ips(gw, ip_list,
                           &ip2nets, &net, err_str);
        if (rc == LUSTRE_CFG_RC_LAST_ELEM)
@@ -1053,6 +1064,7 @@ int lustre_lnet_config_route(char *nw, char *gw, int hops, int prio,
        data.cfg_net = rnet;
        data.cfg_config_u.cfg_route.rtr_hop = hops;
        data.cfg_config_u.cfg_route.rtr_priority = prio;
+       data.cfg_config_u.cfg_route.rtr_sensitivity = sen;
 
        for (i = MAX_NUM_IPS - 1; i > ip_idx; i--) {
                gateway_nid = LNET_MKNID(net, ip_list[i]);
@@ -1283,6 +1295,11 @@ int lustre_lnet_show_route(char *nw, char *gw, int hops, int prio, int detail,
                                                cfg_route.rtr_priority) == NULL)
                                goto out;
 
+                       if (cYAML_create_number(item, "health_sensitivity",
+                                               data.cfg_config_u.
+                                               cfg_route.rtr_sensitivity) == NULL)
+                               goto out;
+
                        if (!backup &&
                            cYAML_create_string(item, "state",
                                                data.cfg_config_u.cfg_route.
@@ -2570,6 +2587,28 @@ int lustre_lnet_config_recov_intrv(int intrv, int seq_no, struct cYAML **err_rc)
        return rc;
 }
 
+int lustre_lnet_config_rtr_sensitivity(int sen, int seq_no, struct cYAML **err_rc)
+{
+       int rc = LUSTRE_CFG_RC_NO_ERR;
+       char err_str[LNET_MAX_STR_LEN];
+       char val[LNET_MAX_STR_LEN];
+
+       snprintf(err_str, sizeof(err_str), "\"success\"");
+
+       snprintf(val, sizeof(val), "%d", sen);
+
+       rc = write_sysfs_file(modparam_path, "router_sensitivity_percentage", val,
+                             1, strlen(val) + 1);
+       if (rc)
+               snprintf(err_str, sizeof(err_str),
+                        "\"cannot configure router health sensitivity: %s\"",
+                        strerror(errno));
+
+       cYAML_build_error(rc, seq_no, ADD_CMD, "router_sensitivity", err_str, err_rc);
+
+       return rc;
+}
+
 int lustre_lnet_config_hsensitivity(int sen, int seq_no, struct cYAML **err_rc)
 {
        int rc = LUSTRE_CFG_RC_NO_ERR;
@@ -3498,6 +3537,31 @@ int lustre_lnet_show_hsensitivity(int seq_no, struct cYAML **show_rc,
                                       err_rc, l_errno);
 }
 
+int lustre_lnet_show_rtr_sensitivity(int seq_no, struct cYAML **show_rc,
+                                    struct cYAML **err_rc)
+{
+       int rc = LUSTRE_CFG_RC_OUT_OF_MEM;
+       char val[LNET_MAX_STR_LEN];
+       int sen = -1, l_errno = 0;
+       char err_str[LNET_MAX_STR_LEN];
+
+       snprintf(err_str, sizeof(err_str), "\"out of memory\"");
+
+       rc = read_sysfs_file(modparam_path, "router_sensitivity_percentage", val,
+                            1, sizeof(val));
+       if (rc) {
+               l_errno = -errno;
+               snprintf(err_str, sizeof(err_str),
+                        "\"cannot get router sensitivity percentage: %d\"", rc);
+       } else {
+               sen = atoi(val);
+       }
+
+       return build_global_yaml_entry(err_str, sizeof(err_str), seq_no,
+                                      "router_sensitivity", sen, show_rc,
+                                      err_rc, l_errno);
+}
+
 int lustre_lnet_show_transaction_to(int seq_no, struct cYAML **show_rc,
                                    struct cYAML **err_rc)
 {
@@ -3883,18 +3947,20 @@ typedef int (*cmd_handler_t)(struct cYAML *tree,
 static int handle_yaml_config_route(struct cYAML *tree, struct cYAML **show_rc,
                                    struct cYAML **err_rc)
 {
-       struct cYAML *net, *gw, *hop, *prio, *seq_no;
+       struct cYAML *net, *gw, *hop, *prio, *sen, *seq_no;
 
        net = cYAML_get_object_item(tree, "net");
        gw = cYAML_get_object_item(tree, "gateway");
        hop = cYAML_get_object_item(tree, "hop");
        prio = cYAML_get_object_item(tree, "priority");
+       sen = cYAML_get_object_item(tree, "health_sensitivity");
        seq_no = cYAML_get_object_item(tree, "seq_no");
 
        return lustre_lnet_config_route((net) ? net->cy_valuestring : NULL,
                                        (gw) ? gw->cy_valuestring : NULL,
                                        (hop) ? hop->cy_valueint : -1,
                                        (prio) ? prio->cy_valueint : -1,
+                                       (sen) ? sen->cy_valueint : -1,
                                        (seq_no) ? seq_no->cy_valueint : -1,
                                        err_rc);
 }
@@ -4626,7 +4692,7 @@ static int handle_yaml_config_global_settings(struct cYAML *tree,
                                              struct cYAML **err_rc)
 {
        struct cYAML *max_intf, *numa, *discovery, *retry, *tto, *seq_no,
-                    *sen, *recov, *drop_asym_route;
+                    *sen, *recov, *rsen, *drop_asym_route;
        int rc = 0;
 
        seq_no = cYAML_get_object_item(tree, "seq_no");
@@ -4686,6 +4752,13 @@ static int handle_yaml_config_global_settings(struct cYAML *tree,
                                                        : -1,
                                                    err_rc);
 
+       rsen = cYAML_get_object_item(tree, "router_sensitivity");
+       if (rsen)
+               rc = lustre_lnet_config_rtr_sensitivity(rsen->cy_valueint,
+                                                    seq_no ? seq_no->cy_valueint
+                                                       : -1,
+                                                    err_rc);
+
        return rc;
 }
 
@@ -4733,7 +4806,7 @@ static int handle_yaml_show_global_settings(struct cYAML *tree,
                                            struct cYAML **err_rc)
 {
        struct cYAML *max_intf, *numa, *discovery, *retry, *tto, *seq_no,
-                    *sen, *recov, *drop_asym_route;
+                    *sen, *recov, *rsen, *drop_asym_route;
        int rc = 0;
 
        seq_no = cYAML_get_object_item(tree, "seq_no");
@@ -4785,6 +4858,12 @@ static int handle_yaml_show_global_settings(struct cYAML *tree,
                                                        : -1,
                                                  show_rc, err_rc);
 
+       rsen = cYAML_get_object_item(tree, "router_sensitivity");
+       if (rsen)
+               rc = lustre_lnet_show_hsensitivity(seq_no ? seq_no->cy_valueint
+                                                       : -1,
+                                                    show_rc, err_rc);
+
        return rc;
 }
 
index 8a5e6ac..a5cecda 100644 (file)
@@ -92,11 +92,12 @@ int lustre_lnet_config_ni_system(bool up, bool load_ni_from_mod,
  *   gw - gateway
  *   hops - number of hops passed down by the user
  *   prio - priority of the route
+ *   sen - health sensitivity value for the gateway
  *   seq_no - sequence number of the request
  *   err_rc - [OUT] struct cYAML tree describing the error. Freed by caller
  */
 int lustre_lnet_config_route(char *nw, char *gw, int hops, int prio,
-                            int seq_no, struct cYAML **err_rc);
+                            int sen, int seq_no, struct cYAML **err_rc);
 
 /*
  * lustre_lnet_del_route
@@ -279,6 +280,18 @@ int lustre_lnet_show_recov_intrv(int seq_no, struct cYAML **show_rc,
                                 struct cYAML **err_rc);
 
 /*
+ * lustre_lnet_config_rtr_sensitivity
+ *   sets the router sensitivity percentage. If the percentage health
+ *   of a router interface drops below that it's considered failed
+ *
+ *   sen - sensitivity value to configure
+ *   seq_no - sequence number of the request
+ *   err_rc - [OUT] struct cYAML tree describing the error. Freed by
+ *   caller
+ */
+int lustre_lnet_config_rtr_sensitivity(int sen, int seq_no, struct cYAML **err_rc);
+
+/*
  * lustre_lnet_config_hsensitivity
  *   sets the health sensitivity; the value by which to decrement the
  *   health value of a local or peer NI. If 0 then health is turned off
@@ -303,6 +316,18 @@ int lustre_lnet_show_hsensitivity(int seq_no, struct cYAML **show_rc,
                                  struct cYAML **err_rc);
 
 /*
+ * lustre_lnet_show_rtr_sensitivity
+ *    show the router sensitivity percentage in the system
+ *
+ *   seq_no - sequence number of the request
+ *   show_rc - [OUT] struct cYAML tree containing health sensitivity info
+ *   err_rc - [OUT] struct cYAML tree describing the error. Freed by
+ *   caller
+ */
+int lustre_lnet_show_rtr_sensitivity(int seq_no, struct cYAML **show_rc,
+                                    struct cYAML **err_rc);
+
+/*
  * lustre_lnet_config_transaction_to
  *   sets the timeout after which a message expires or a timeout event is
  *   propagated for an expired response.
index c503223..6bdce42 100644 (file)
@@ -57,6 +57,7 @@ static int jt_set_numa(int argc, char **argv);
 static int jt_set_retry_count(int argc, char **argv);
 static int jt_set_transaction_to(int argc, char **argv);
 static int jt_set_recov_intrv(int argc, char **argv);
+static int jt_set_rtr_sensitivity(int argc, char **argv);
 static int jt_set_hsensitivity(int argc, char **argv);
 static int jt_add_peer_nid(int argc, char **argv);
 static int jt_del_peer_nid(int argc, char **argv);
@@ -117,7 +118,8 @@ command_t route_cmds[] = {
         "\t--net: net name (e.g. tcp0)\n"
         "\t--gateway: gateway nid (e.g. 10.1.1.2@tcp)\n"
         "\t--hop: number to final destination (1 < hops < 255)\n"
-        "\t--priority: priority of route (0 - highest prio\n"},
+        "\t--priority: priority of route (0 - highest prio\n"
+        "\t--health_sensitivity: gateway health sensitivity (>= 1)\n"},
        {"del", jt_del_route, 0, "delete a route\n"
         "\t--net: net name (e.g. tcp0)\n"
         "\t--gateway: gateway nid (e.g. 10.1.1.2@tcp)\n"},
@@ -126,6 +128,7 @@ command_t route_cmds[] = {
         "\t--gateway: gateway nid (e.g. 10.1.1.2@tcp) to filter on\n"
         "\t--hop: number to final destination (1 < hops < 255) to filter on\n"
         "\t--priority: priority of route (0 - highest prio to filter on\n"
+        "\t--health_sensitivity: gateway health sensitivity (>= 1)\n"
         "\t--verbose: display detailed output per route\n"},
        { 0, 0, 0, NULL }
 };
@@ -208,6 +211,9 @@ command_t set_cmds[] = {
         "\t>0 - sensitivity value not more than 1000\n"},
        {"recovery_interval", jt_set_recov_intrv, 0, "interval to ping in seconds (at least 1)\n"
         "\t>0 - time in seconds between pings\n"},
+       {"router_sensitivity", jt_set_rtr_sensitivity, 0, "router sensitivity %\n"
+        "\t100 - router interfaces need to be fully healthy to be used\n"
+        "\t<100 - router interfaces can be used even if not healthy\n"},
        { 0, 0, 0, NULL }
 };
 
@@ -394,6 +400,34 @@ static int jt_set_recov_intrv(int argc, char **argv)
        return rc;
 }
 
+static int jt_set_rtr_sensitivity(int argc, char **argv)
+{
+       long int value;
+       int rc;
+       struct cYAML *err_rc = NULL;
+
+       rc = check_cmd(set_cmds, "set", "router_sensitivity", 2, argc, argv);
+       if (rc)
+               return rc;
+
+       rc = parse_long(argv[1], &value);
+       if (rc != 0) {
+               cYAML_build_error(-1, -1, "parser", "set",
+                                 "cannot parse router sensitivity value", &err_rc);
+               cYAML_print_tree2file(stderr, err_rc);
+               cYAML_free_tree(err_rc);
+               return -1;
+       }
+
+       rc = lustre_lnet_config_rtr_sensitivity(value, -1, &err_rc);
+       if (rc != LUSTRE_CFG_RC_NO_ERR)
+               cYAML_print_tree2file(stderr, err_rc);
+
+       cYAML_free_tree(err_rc);
+
+       return rc;
+}
+
 static int jt_set_hsensitivity(int argc, char **argv)
 {
        long int value;
@@ -709,7 +743,7 @@ static int jt_unconfig_lnet(int argc, char **argv)
 static int jt_add_route(int argc, char **argv)
 {
        char *network = NULL, *gateway = NULL;
-       long int hop = -1, prio = -1;
+       long int hop = -1, prio = -1, sen = -1;
        struct cYAML *err_rc = NULL;
        int rc, opt;
 
@@ -719,6 +753,7 @@ static int jt_add_route(int argc, char **argv)
        { .name = "gateway",   .has_arg = required_argument, .val = 'g' },
        { .name = "hop-count", .has_arg = required_argument, .val = 'c' },
        { .name = "priority",  .has_arg = required_argument, .val = 'p' },
+       { .name = "health_sensitivity",  .has_arg = required_argument, .val = 's' },
        { .name = NULL } };
 
        rc = check_cmd(route_cmds, "route", "add", 0, argc, argv);
@@ -750,6 +785,15 @@ static int jt_add_route(int argc, char **argv)
                                continue;
                        }
                        break;
+               case 's':
+                       rc = parse_long(optarg, &sen);
+                       if (rc != 0) {
+                               /* ingore option */
+                               sen = -1;
+                               continue;
+                       }
+                       break;
+
                case '?':
                        print_help(route_cmds, "route", "add");
                default:
@@ -757,7 +801,8 @@ static int jt_add_route(int argc, char **argv)
                }
        }
 
-       rc = lustre_lnet_config_route(network, gateway, hop, prio, -1, &err_rc);
+       rc = lustre_lnet_config_route(network, gateway, hop, prio, sen, -1,
+                                     &err_rc);
 
        if (rc != LUSTRE_CFG_RC_NO_ERR)
                cYAML_print_tree2file(stderr, err_rc);
@@ -1299,6 +1344,12 @@ static int jt_show_global(int argc, char **argv)
                goto out;
        }
 
+       rc = lustre_lnet_show_rtr_sensitivity(-1, &show_rc, &err_rc);
+       if (rc != LUSTRE_CFG_RC_NO_ERR) {
+               cYAML_print_tree2file(stderr, err_rc);
+               goto out;
+       }
+
        if (show_rc)
                cYAML_print_tree(show_rc);
 
index bac8c63..1b72d2a 100755 (executable)
@@ -15038,8 +15038,8 @@ test_215() { # for bugs 18102, 21079, 21517
        # where ref > 0, rtr_ref > 0, alive_cnt >= 0, state is up/down,
        # last_ping >= 0, ping_sent is boolean (0/1), deadline and down_ni are
        # numeric (0 or >0 or <0), router is a string like 192.168.1.1@tcp2
-       L1="^ref +rtr_ref +alive_cnt +state +last_ping +ping_sent +deadline +down_ni +router$"
-       BR="^$P +$P +$N +(up|down) +$N +(0|1) +$I +$I +$NID$"
+       L1="^ref +rtr_ref +alive +router$"
+       BR="^$P +$P +(up|down) +$NID$"
        create_lnet_proc_files "routers"
        check_lnet_proc_entry "routers.sys" "lnet.routers" "$BR" "$L1"
        remove_lnet_proc_files "routers"
index 9466ea4..ca2eb34 100644 (file)
@@ -1383,6 +1383,8 @@ fault_simul_rule_add(__u32 opc, char *name, int argc, char **argv)
        { .name = "portal",   .has_arg = required_argument, .val = 'p' },
        { .name = "message",  .has_arg = required_argument, .val = 'm' },
        { .name = "health_error",  .has_arg = required_argument, .val = 'e' },
+       { .name = "local_nid",  .has_arg = required_argument, .val = 'o' },
+       { .name = "drop_all",  .has_arg = no_argument, .val = 'x' },
        { .name = NULL } };
 
        if (argc == 1) {
@@ -1391,7 +1393,7 @@ fault_simul_rule_add(__u32 opc, char *name, int argc, char **argv)
                return -1;
        }
 
-       optstr = opc == LNET_CTL_DROP_ADD ? "s:d:r:i:p:m:e:n" : "s:d:r:l:p:m:";
+       optstr = opc == LNET_CTL_DROP_ADD ? "s:d:o:r:i:p:m:e:nx" : "s:d:o:r:l:p:m:";
        memset(&attr, 0, sizeof(attr));
        while (1) {
                char c = getopt_long(argc, argv, optstr, opts, NULL);
@@ -1400,6 +1402,11 @@ fault_simul_rule_add(__u32 opc, char *name, int argc, char **argv)
                        break;
 
                switch (c) {
+               case 'o':
+                       rc = fault_attr_nid_parse(optarg, &attr.fa_local_nid);
+                       if (rc != 0)
+                               goto getopt_failed;
+                       break;
                case 's': /* source NID/NET */
                        rc = fault_attr_nid_parse(optarg, &attr.fa_src);
                        if (rc != 0)
@@ -1428,6 +1435,11 @@ fault_simul_rule_add(__u32 opc, char *name, int argc, char **argv)
                        }
                        break;
 
+               case 'x':
+                       if (opc == LNET_CTL_DROP_ADD)
+                               attr.u.drop.da_drop_all = true;
+                       break;
+
                case 'n':
                        if (opc == LNET_CTL_DROP_ADD)
                                attr.u.drop.da_random = true;
@@ -1502,6 +1514,9 @@ fault_simul_rule_add(__u32 opc, char *name, int argc, char **argv)
                return -1;
        }
 
+       if (attr.fa_local_nid == 0)
+               attr.fa_local_nid = LNET_NID_ANY;
+
        data.ioc_flags = opc;
        data.ioc_inllen1 = sizeof(attr);
        data.ioc_inlbuf1 = (char *)&attr;