Whamcloud - gitweb
LU-9120 lnet: LNet Health/Resiliency Feature
authorOleg Drokin <green@whamcloud.com>
Tue, 21 Aug 2018 16:15:26 +0000 (12:15 -0400)
committerOleg Drokin <green@whamcloud.com>
Tue, 21 Aug 2018 16:17:28 +0000 (12:17 -0400)
The LNet Health/Resiliency feature adds the ability for LNet
to try out different interfaces available to it if message
sending fails. It maintains the health of each remote and local
interfaces and selects the best interface for sending from and best
remote interface to send to.

Merge commit '958ef71f33fa925e6657f9902702cd3677e15ec9'

Change-Id: I9ca740654c48d642fe130f98a60c5c59b9b4ebe1
Signed-off-by: Amir Shehata <ashehata@whamcloud.com>
28 files changed:
lnet/include/lnet/api.h
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/include/uapi/linux/lnet/libcfs_ioctl.h
lnet/include/uapi/linux/lnet/lnet-dlc.h
lnet/include/uapi/linux/lnet/lnet-types.h
lnet/include/uapi/linux/lnet/lnetctl.h
lnet/klnds/o2iblnd/o2iblnd.c
lnet/klnds/o2iblnd/o2iblnd.h
lnet/klnds/o2iblnd/o2iblnd_cb.c
lnet/klnds/socklnd/socklnd.c
lnet/klnds/socklnd/socklnd.h
lnet/klnds/socklnd/socklnd_cb.c
lnet/lnet/api-ni.c
lnet/lnet/config.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/lnet/net_fault.c
lnet/lnet/peer.c
lnet/lnet/router.c
lnet/selftest/module.c
lnet/selftest/rpc.c
lnet/utils/lnetconfig/liblnetconfig.c
lnet/utils/lnetconfig/liblnetconfig.h
lnet/utils/lnetctl.c
lustre/ptlrpc/niobuf.c
lustre/utils/lctl.c
lustre/utils/portals.c

index 9408583..77afdd3 100644 (file)
@@ -198,7 +198,8 @@ int LNetGet(lnet_nid_t            self,
            struct lnet_process_id target_in,
            unsigned int      portal_in,
            __u64             match_bits_in,
-           unsigned int      offset_in);
+           unsigned int      offset_in,
+           bool              recovery);
 /** @} lnet_data */
 
 
index 6197be6..aeb0e4b 100644 (file)
@@ -75,6 +75,7 @@ extern struct lnet the_lnet;                  /* THE network */
 
 /* default timeout */
 #define DEFAULT_PEER_TIMEOUT    180
+#define LNET_LND_DEFAULT_TIMEOUT 5
 
 static inline int lnet_is_route_alive(struct lnet_route *route)
 {
@@ -489,6 +490,26 @@ lnet_msg_free(struct lnet_msg *msg)
        LIBCFS_FREE(msg, sizeof(*msg));
 }
 
+static inline struct lnet_rsp_tracker *
+lnet_rspt_alloc(int cpt)
+{
+       struct lnet_rsp_tracker *rspt;
+       LIBCFS_ALLOC(rspt, sizeof(*rspt));
+       lnet_net_lock(cpt);
+       the_lnet.ln_counters[cpt]->rst_alloc++;
+       lnet_net_unlock(cpt);
+       return rspt;
+}
+
+static inline void
+lnet_rspt_free(struct lnet_rsp_tracker *rspt, int cpt)
+{
+       LIBCFS_FREE(rspt, sizeof(*rspt));
+       lnet_net_lock(cpt);
+       the_lnet.ln_counters[cpt]->rst_alloc--;
+       lnet_net_unlock(cpt);
+}
+
 void lnet_ni_free(struct lnet_ni *ni);
 void lnet_net_free(struct lnet_net *net);
 
@@ -526,14 +547,15 @@ extern struct lnet_ni *lnet_nid2ni_locked(lnet_nid_t nid, int cpt);
 extern struct lnet_ni *lnet_nid2ni_addref(lnet_nid_t nid);
 extern struct lnet_ni *lnet_net2ni_locked(__u32 net, int cpt);
 extern struct lnet_ni *lnet_net2ni_addref(__u32 net);
-bool lnet_is_ni_healthy_locked(struct lnet_ni *ni);
 struct lnet_net *lnet_get_net_locked(__u32 net_id);
 
 int lnet_lib_init(void);
 void lnet_lib_exit(void);
 
 extern unsigned lnet_transaction_timeout;
+extern unsigned lnet_retry_count;
 extern unsigned int lnet_numa_range;
+extern unsigned int lnet_health_sensitivity;
 extern unsigned int lnet_peer_discovery_disabled;
 extern int portal_rotor;
 
@@ -570,6 +592,8 @@ extern int libcfs_ioctl_getdata(struct libcfs_ioctl_hdr **hdr_pp,
                                struct libcfs_ioctl_hdr __user *uparam);
 extern int lnet_get_peer_list(__u32 *countp, __u32 *sizep,
                              struct lnet_process_id __user *ids);
+extern void lnet_peer_ni_set_healthv(lnet_nid_t nid, int value, bool all);
+extern void lnet_peer_ni_add_to_recoveryq_locked(struct lnet_peer_ni *lpni);
 
 void lnet_router_debugfs_init(void);
 void lnet_router_debugfs_fini(void);
@@ -603,6 +627,8 @@ void lnet_prep_send(struct lnet_msg *msg, int type,
                    struct lnet_process_id target, unsigned int offset,
                    unsigned int len);
 int lnet_send(lnet_nid_t nid, struct lnet_msg *msg, lnet_nid_t rtr_nid);
+int lnet_send_ping(lnet_nid_t dest_nid, struct lnet_handle_md *mdh, int nnis,
+                  void *user_ptr, struct lnet_handle_eq eqh, bool recovery);
 void lnet_return_tx_credits_locked(struct lnet_msg *msg);
 void lnet_return_rx_credits_locked(struct lnet_msg *msg);
 void lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp);
@@ -677,8 +703,11 @@ struct lnet_msg *lnet_create_reply_msg(struct lnet_ni *ni,
                                       struct lnet_msg *get_msg);
 void lnet_set_reply_msg_len(struct lnet_ni *ni, struct lnet_msg *msg,
                            unsigned int len);
+void lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt);
 
 void lnet_finalize(struct lnet_msg *msg, int rc);
+bool lnet_send_error_simulation(struct lnet_msg *msg,
+                               enum lnet_msg_hstatus *hstatus);
 
 void lnet_drop_message(struct lnet_ni *ni, int cpt, void *private,
                       unsigned int nob, __u32 msg_type);
@@ -690,6 +719,7 @@ void lnet_msg_container_cleanup(struct lnet_msg_container *container);
 void lnet_msg_containers_destroy(void);
 int lnet_msg_containers_create(void);
 
+char *lnet_health_error2str(enum lnet_msg_hstatus hstatus);
 char *lnet_msgtyp2str(int type);
 void lnet_print_hdr(struct lnet_hdr *hdr);
 int lnet_fail_nid(lnet_nid_t nid, unsigned int threshold);
@@ -700,7 +730,7 @@ int lnet_fault_ctl(int cmd, struct libcfs_ioctl_data *data);
 int lnet_fault_init(void);
 void lnet_fault_fini(void);
 
-bool lnet_drop_rule_match(struct lnet_hdr *hdr);
+bool lnet_drop_rule_match(struct lnet_hdr *hdr, enum lnet_msg_hstatus *hstatus);
 
 int lnet_delay_rule_add(struct lnet_fault_attr *attr);
 int lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown);
@@ -791,6 +821,7 @@ void lnet_md_deconstruct(struct lnet_libmd *lmd, struct lnet_md *umd);
 struct page *lnet_kvaddr_to_page(unsigned long vaddr);
 int lnet_cpt_of_md(struct lnet_libmd *md, unsigned int offset);
 
+unsigned int lnet_get_lnd_timeout(void);
 void lnet_register_lnd(struct lnet_lnd *lnd);
 void lnet_unregister_lnd(struct lnet_lnd *lnd);
 
@@ -822,8 +853,15 @@ int lnet_sock_connect(struct socket **sockp, int *fatal,
 int lnet_peers_start_down(void);
 int lnet_peer_buffer_credits(struct lnet_net *net);
 
-int lnet_router_checker_start(void);
-void lnet_router_checker_stop(void);
+int lnet_monitor_thr_start(void);
+void lnet_monitor_thr_stop(void);
+
+bool lnet_router_checker_active(void);
+void lnet_check_routers(void);
+int lnet_router_pre_mt_start(void);
+void lnet_router_post_mt_start(void);
+void lnet_prune_rc_data(int wait_unlink);
+void lnet_router_cleanup(void);
 void lnet_router_ni_update_locked(struct lnet_peer_ni *gw, __u32 net);
 void lnet_swap_pinginfo(struct lnet_ping_buffer *pbuf);
 
@@ -896,45 +934,19 @@ int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
                          __u32 *ni_peer_tx_credits, __u32 *peer_tx_credits,
                          __u32 *peer_rtr_credits, __u32 *peer_min_rtr_credtis,
                          __u32 *peer_tx_qnob);
+int lnet_get_peer_ni_hstats(struct lnet_ioctl_peer_ni_hstats *stats);
 
-
-static inline bool
-lnet_is_peer_ni_healthy_locked(struct lnet_peer_ni *lpni)
-{
-       return lpni->lpni_healthy;
-}
-
-static inline void
-lnet_set_peer_ni_health_locked(struct lnet_peer_ni *lpni, bool health)
-{
-       lpni->lpni_healthy = health;
-}
-
-static inline bool
-lnet_is_peer_net_healthy_locked(struct lnet_peer_net *peer_net)
-{
-       struct lnet_peer_ni *lpni;
-
-       list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
-                           lpni_peer_nis) {
-               if (lnet_is_peer_ni_healthy_locked(lpni))
-                       return true;
-       }
-
-       return false;
-}
-
-static inline bool
-lnet_is_peer_healthy_locked(struct lnet_peer *peer)
+static inline struct lnet_peer_net *
+lnet_find_peer_net_locked(struct lnet_peer *peer, __u32 net_id)
 {
        struct lnet_peer_net *peer_net;
 
        list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
-               if (lnet_is_peer_net_healthy_locked(peer_net))
-                       return true;
+               if (peer_net->lpn_net_id == net_id)
+                       return peer_net;
        }
 
-       return false;
+       return NULL;
 }
 
 static inline void
@@ -984,6 +996,12 @@ lnet_peer_needs_push(struct lnet_peer *lp)
        return false;
 }
 
+static inline void
+lnet_inc_healthv(atomic_t *healthv)
+{
+       atomic_add_unless(healthv, 1, LNET_MAX_HEALTH_VALUE);
+}
+
 void lnet_incr_stats(struct lnet_element_stats *stats,
                     enum lnet_msg_type msg_type,
                     enum lnet_stats_type stats_type);
index e96b544..4ee8c60 100644 (file)
 
 #define LNET_MAX_IOV           (LNET_MAX_PAYLOAD >> PAGE_SHIFT)
 
+/*
+ * This is the maximum health value.
+ * All local and peer NIs created have their health default to this value.
+ */
+#define LNET_MAX_HEALTH_VALUE 1000
+
 /* forward refs */
 struct lnet_libmd;
 
+enum lnet_msg_hstatus {
+       LNET_MSG_STATUS_OK = 0,
+       LNET_MSG_STATUS_LOCAL_INTERRUPT,
+       LNET_MSG_STATUS_LOCAL_DROPPED,
+       LNET_MSG_STATUS_LOCAL_ABORTED,
+       LNET_MSG_STATUS_LOCAL_NO_ROUTE,
+       LNET_MSG_STATUS_LOCAL_ERROR,
+       LNET_MSG_STATUS_LOCAL_TIMEOUT,
+       LNET_MSG_STATUS_REMOTE_ERROR,
+       LNET_MSG_STATUS_REMOTE_DROPPED,
+       LNET_MSG_STATUS_REMOTE_TIMEOUT,
+       LNET_MSG_STATUS_NETWORK_TIMEOUT,
+       LNET_MSG_STATUS_END,
+};
+
+struct lnet_rsp_tracker {
+       /* chain on the waiting list */
+       struct list_head rspt_on_list;
+       /* cpt to lock */
+       int rspt_cpt;
+       /* deadline of the REPLY/ACK */
+       ktime_t rspt_deadline;
+       /* parent MD */
+       struct lnet_handle_md rspt_mdh;
+};
+
 struct lnet_msg {
        struct list_head        msg_activelist;
        struct list_head        msg_list;       /* Q for credits/MD */
@@ -76,6 +108,21 @@ struct lnet_msg {
        lnet_nid_t              msg_src_nid_param;
        lnet_nid_t              msg_rtr_nid_param;
 
+       /*
+        * Deadline for the message after which it will be finalized if it
+        * has not completed.
+        */
+       ktime_t                 msg_deadline;
+
+       /* The message health status. */
+       enum lnet_msg_hstatus   msg_health_status;
+       /* This is a recovery message */
+       bool                    msg_recovery;
+       /* the number of times a transmission has been retried */
+       int                     msg_retry_count;
+       /* flag to indicate that we do not want to resend this message */
+       bool                    msg_no_resend;
+
        /* committed for sending */
        unsigned int            msg_tx_committed:1;
        /* CPT # this message committed for sending */
@@ -156,24 +203,25 @@ struct lnet_me {
 };
 
 struct lnet_libmd {
-       struct list_head        md_list;
-       struct lnet_libhandle   md_lh;
-       struct lnet_me         *md_me;
-       char                   *md_start;
-       unsigned int            md_offset;
-       unsigned int            md_length;
-       unsigned int            md_max_size;
-       int                     md_threshold;
-       int                     md_refcount;
-       unsigned int            md_options;
-       unsigned int            md_flags;
-       unsigned int            md_niov;        /* # frags at end of struct */
-       void                   *md_user_ptr;
-       struct lnet_eq         *md_eq;
-       struct lnet_handle_md   md_bulk_handle;
+       struct list_head         md_list;
+       struct lnet_libhandle    md_lh;
+       struct lnet_me          *md_me;
+       char                    *md_start;
+       unsigned int             md_offset;
+       unsigned int             md_length;
+       unsigned int             md_max_size;
+       int                      md_threshold;
+       int                      md_refcount;
+       unsigned int             md_options;
+       unsigned int             md_flags;
+       unsigned int             md_niov;       /* # frags at end of struct */
+       void                    *md_user_ptr;
+       struct lnet_rsp_tracker *md_rspt_ptr;
+       struct lnet_eq          *md_eq;
+       struct lnet_handle_md    md_bulk_handle;
        union {
-               struct kvec     iov[LNET_MAX_IOV];
-               lnet_kiov_t     kiov[LNET_MAX_IOV];
+               struct kvec      iov[LNET_MAX_IOV];
+               lnet_kiov_t      kiov[LNET_MAX_IOV];
        } md_iov;
 };
 
@@ -275,18 +323,11 @@ enum lnet_net_state {
        LNET_NET_STATE_DELETING
 };
 
-enum lnet_ni_state {
-       /* set when NI block is allocated */
-       LNET_NI_STATE_INIT = 0,
-       /* set when NI is started successfully */
-       LNET_NI_STATE_ACTIVE,
-       /* set when LND notifies NI failed */
-       LNET_NI_STATE_FAILED,
-       /* set when LND notifies NI degraded */
-       LNET_NI_STATE_DEGRADED,
-       /* set when shuttding down NI */
-       LNET_NI_STATE_DELETING
-};
+#define LNET_NI_STATE_INIT             (1 << 0)
+#define LNET_NI_STATE_ACTIVE           (1 << 1)
+#define LNET_NI_STATE_FAILED           (1 << 2)
+#define LNET_NI_STATE_RECOVERY_PENDING (1 << 3)
+#define LNET_NI_STATE_DELETING         (1 << 4)
 
 enum lnet_stats_type {
        LNET_STATS_TYPE_SEND = 0,
@@ -308,6 +349,22 @@ struct lnet_element_stats {
        struct lnet_comm_count el_drop_stats;
 };
 
+struct lnet_health_local_stats {
+       atomic_t hlt_local_interrupt;
+       atomic_t hlt_local_dropped;
+       atomic_t hlt_local_aborted;
+       atomic_t hlt_local_no_route;
+       atomic_t hlt_local_timeout;
+       atomic_t hlt_local_error;
+};
+
+struct lnet_health_remote_stats {
+       atomic_t hlt_remote_dropped;
+       atomic_t hlt_remote_timeout;
+       atomic_t hlt_remote_error;
+       atomic_t hlt_network_timeout;
+};
+
 struct lnet_net {
        /* chain on the ln_nets */
        struct list_head        net_list;
@@ -359,6 +416,12 @@ struct lnet_ni {
        /* chain on net_ni_cpt */
        struct list_head        ni_cptlist;
 
+       /* chain on the recovery queue */
+       struct list_head        ni_recovery;
+
+       /* MD handle for recovery ping */
+       struct lnet_handle_md   ni_ping_mdh;
+
        spinlock_t              ni_lock;
 
        /* number of CPTs */
@@ -392,7 +455,7 @@ struct lnet_ni {
        struct lnet_ni_status   *ni_status;
 
        /* NI FSM */
-       enum lnet_ni_state      ni_state;
+       __u32                   ni_state;
 
        /* per NI LND tunables */
        struct lnet_lnd_tunables ni_lnd_tunables;
@@ -402,6 +465,7 @@ struct lnet_ni {
 
        /* NI statistics */
        struct lnet_element_stats ni_stats;
+       struct lnet_health_local_stats ni_hstats;
 
        /* physical device CPT */
        int                     ni_dev_cpt;
@@ -410,6 +474,22 @@ struct lnet_ni {
        __u32                   ni_seq;
 
        /*
+        * health value
+        *      initialized to LNET_MAX_HEALTH_VALUE
+        * Value is decremented every time we fail to send a message over
+        * this NI because of a NI specific failure.
+        * Value is incremented if we successfully send a message.
+        */
+       atomic_t                ni_healthv;
+
+       /*
+        * Set to 1 by the LND when it receives an event telling it the device
+        * has gone into a fatal state. Set to 0 when the LND receives an
+        * even telling it the device is back online.
+        */
+       atomic_t                ni_fatal_error_on;
+
+       /*
         * equivalent interfaces to use
         * This is an array because socklnd bonding can still be configured
         */
@@ -454,6 +534,8 @@ struct lnet_peer_ni {
        struct list_head        lpni_peer_nis;
        /* chain on remote peer list */
        struct list_head        lpni_on_remote_peer_ni_list;
+       /* chain on recovery queue */
+       struct list_head        lpni_recovery;
        /* chain on peer hash */
        struct list_head        lpni_hashlist;
        /* messages blocking for tx credits */
@@ -466,6 +548,7 @@ struct lnet_peer_ni {
        struct lnet_peer_net    *lpni_peer_net;
        /* statistics kept on each peer NI */
        struct lnet_element_stats lpni_stats;
+       struct lnet_health_remote_stats lpni_hstats;
        /* spin lock protecting credits and lpni_txq / lpni_rtrq */
        spinlock_t              lpni_lock;
        /* # tx credits available */
@@ -506,6 +589,10 @@ struct lnet_peer_ni {
        lnet_nid_t              lpni_nid;
        /* # refs */
        atomic_t                lpni_refcount;
+       /* health value for the peer */
+       atomic_t                lpni_healthv;
+       /* recovery ping mdh */
+       struct lnet_handle_md   lpni_recovery_ping_mdh;
        /* CPT this peer attached on */
        int                     lpni_cpt;
        /* state flags -- protected by lpni_lock */
@@ -535,6 +622,10 @@ struct lnet_peer_ni {
 
 /* Preferred path added due to traffic on non-MR peer_ni */
 #define LNET_PEER_NI_NON_MR_PREF       (1 << 0)
+/* peer is being recovered. */
+#define LNET_PEER_NI_RECOVERY_PENDING  (1 << 1)
+/* peer is being deleted */
+#define LNET_PEER_NI_DELETING          (1 << 2)
 
 struct lnet_peer {
        /* chain on pt_peer_list */
@@ -877,9 +968,9 @@ struct lnet_msg_container {
 #define LNET_DC_STATE_STOPPING         2       /* telling thread to stop */
 
 /* Router Checker states */
-#define LNET_RC_STATE_SHUTDOWN         0       /* not started */
-#define LNET_RC_STATE_RUNNING          1       /* started up OK */
-#define LNET_RC_STATE_STOPPING         2       /* telling thread to stop */
+#define LNET_MT_STATE_SHUTDOWN         0       /* not started */
+#define LNET_MT_STATE_RUNNING          1       /* started up OK */
+#define LNET_MT_STATE_STOPPING         2       /* telling thread to stop */
 
 /* LNet states */
 #define LNET_STATE_SHUTDOWN            0       /* not started */
@@ -983,8 +1074,8 @@ struct lnet {
        /* discovery startup/shutdown state */
        int                             ln_dc_state;
 
-       /* router checker startup/shutdown state */
-       int                             ln_rc_state;
+       /* monitor thread startup/shutdown state */
+       int                             ln_mt_state;
        /* router checker's event queue */
        struct lnet_handle_eq           ln_rc_eqh;
        /* rcd still pending on net */
@@ -992,7 +1083,7 @@ struct lnet {
        /* rcd ready for free */
        struct list_head                ln_rcd_zombie;
        /* serialise startup/shutdown */
-       struct semaphore                ln_rc_signal;
+       struct semaphore                ln_mt_signal;
 
        struct mutex                    ln_api_mutex;
        struct mutex                    ln_lnd_mutex;
@@ -1020,10 +1111,29 @@ struct lnet {
         */
        bool                            ln_nis_from_mod_params;
 
-       /* waitq for router checker.  As long as there are no routes in
-        * the list, the router checker will sleep on this queue.  when
-        * routes are added the thread will wake up */
-       wait_queue_head_t               ln_rc_waitq;
+       /*
+        * waitq for the monitor thread. The monitor thread takes care of
+        * checking routes, timedout messages and resending messages.
+        */
+       wait_queue_head_t               ln_mt_waitq;
+
+       /* per-cpt resend queues */
+       struct list_head                **ln_mt_resendqs;
+       /* local NIs to recover */
+       struct list_head                ln_mt_localNIRecovq;
+       /* local NIs to recover */
+       struct list_head                ln_mt_peerNIRecovq;
+       /*
+        * An array of queues for GET/PUT waiting for REPLY/ACK respectively.
+        * There are CPT number of queues. Since response trackers will be
+        * added on the fast path we can't afford to grab the exclusive
+        * net lock to protect these queues. The CPT will be calculated
+        * based on the mdh cookie.
+        */
+       struct list_head                **ln_mt_rstq;
+       /* recovery eq handler */
+       struct lnet_handle_eq           ln_mt_eqh;
+
 };
 
 #endif
index 07a7906..cdac10f 100644 (file)
@@ -147,7 +147,10 @@ struct libcfs_debug_ioctl_data {
 #define IOC_LIBCFS_GET_NUMA_RANGE         _IOWR(IOC_LIBCFS_TYPE, 99, IOCTL_CONFIG_SIZE)
 #define IOC_LIBCFS_GET_PEER_LIST          _IOWR(IOC_LIBCFS_TYPE, 100, IOCTL_CONFIG_SIZE)
 #define IOC_LIBCFS_GET_LOCAL_NI_MSG_STATS  _IOWR(IOC_LIBCFS_TYPE, 101, IOCTL_CONFIG_SIZE)
-#define IOC_LIBCFS_MAX_NR                                        101
+#define IOC_LIBCFS_SET_HEALHV             _IOWR(IOC_LIBCFS_TYPE, 102, IOCTL_CONFIG_SIZE)
+#define IOC_LIBCFS_GET_LOCAL_HSTATS       _IOWR(IOC_LIBCFS_TYPE, 103, IOCTL_CONFIG_SIZE)
+#define IOC_LIBCFS_GET_RECOVERY_QUEUE     _IOWR(IOC_LIBCFS_TYPE, 104, IOCTL_CONFIG_SIZE)
+#define IOC_LIBCFS_MAX_NR                                        104
 
 extern int libcfs_ioctl_data_adjust(struct libcfs_ioctl_data *data);
 
index 7dae698..52d0c81 100644 (file)
@@ -44,6 +44,7 @@
 #define MAX_NUM_SHOW_ENTRIES   32
 #define LNET_MAX_STR_LEN       128
 #define LNET_MAX_SHOW_NUM_CPT  128
+#define LNET_MAX_SHOW_NUM_NID  128
 #define LNET_UNDEFINED_HOPS    ((__u32) -1)
 
 /*
@@ -172,6 +173,31 @@ struct lnet_ioctl_element_stats {
        __u32 iel_drop_count;
 };
 
+enum lnet_health_type {
+       LNET_HEALTH_TYPE_LOCAL_NI = 0,
+       LNET_HEALTH_TYPE_PEER_NI,
+};
+
+struct lnet_ioctl_local_ni_hstats {
+       struct libcfs_ioctl_hdr hlni_hdr;
+       lnet_nid_t hlni_nid;
+       __u32 hlni_local_interrupt;
+       __u32 hlni_local_dropped;
+       __u32 hlni_local_aborted;
+       __u32 hlni_local_no_route;
+       __u32 hlni_local_timeout;
+       __u32 hlni_local_error;
+       __s32 hlni_health_value;
+};
+
+struct lnet_ioctl_peer_ni_hstats {
+       __u32 hlpni_remote_dropped;
+       __u32 hlpni_remote_timeout;
+       __u32 hlpni_remote_error;
+       __u32 hlpni_network_timeout;
+       __s32 hlpni_health_value;
+};
+
 struct lnet_ioctl_element_msg_stats {
        struct libcfs_ioctl_hdr im_hdr;
        __u32 im_idx;
@@ -239,6 +265,21 @@ struct lnet_ioctl_peer_cfg {
        void __user *prcfg_bulk;
 };
 
+struct lnet_ioctl_reset_health_cfg {
+       struct libcfs_ioctl_hdr rh_hdr;
+       enum lnet_health_type rh_type;
+       bool rh_all;
+       int rh_value;
+       lnet_nid_t rh_nid;
+};
+
+struct lnet_ioctl_recovery_list {
+       struct libcfs_ioctl_hdr rlst_hdr;
+       enum lnet_health_type rlst_type;
+       int rlst_num_nids;
+       lnet_nid_t rlst_nid_array[LNET_MAX_SHOW_NUM_NID];
+};
+
 struct lnet_ioctl_set_value {
        struct libcfs_ioctl_hdr sv_hdr;
        __u32 sv_value;
index f30f284..3f5b8bd 100644 (file)
@@ -226,11 +226,24 @@ struct lnet_acceptor_connreq {
 struct lnet_counters {
        __u32   msgs_alloc;
        __u32   msgs_max;
+       __u32   rst_alloc;
        __u32   errors;
        __u32   send_count;
        __u32   recv_count;
        __u32   route_count;
        __u32   drop_count;
+       __u32   resend_count;
+       __u32   response_timeout_count;
+       __u32   local_interrupt_count;
+       __u32   local_dropped_count;
+       __u32   local_aborted_count;
+       __u32   local_no_route_count;
+       __u32   local_timeout_count;
+       __u32   local_error_count;
+       __u32   remote_dropped_count;
+       __u32   remote_error_count;
+       __u32   remote_timeout_count;
+       __u32   network_timeout_count;
        __u64   send_length;
        __u64   recv_length;
        __u64   route_length;
index 611bf02..7e211e1 100644 (file)
@@ -32,6 +32,8 @@
 # include <linux/lnet/lnet-types.h>
 #endif
 
+#include <stdbool.h>
+
 /** \addtogroup lnet_fault_simulation
  * @{ */
 
@@ -51,6 +53,19 @@ enum {
 #define LNET_GET_BIT           (1 << 2)
 #define LNET_REPLY_BIT         (1 << 3)
 
+#define HSTATUS_END                    11
+#define HSTATUS_LOCAL_INTERRUPT_BIT    (1 << 1)
+#define HSTATUS_LOCAL_DROPPED_BIT      (1 << 2)
+#define HSTATUS_LOCAL_ABORTED_BIT      (1 << 3)
+#define HSTATUS_LOCAL_NO_ROUTE_BIT     (1 << 4)
+#define HSTATUS_LOCAL_ERROR_BIT                (1 << 5)
+#define HSTATUS_LOCAL_TIMEOUT_BIT      (1 << 6)
+#define HSTATUS_REMOTE_ERROR_BIT       (1 << 7)
+#define HSTATUS_REMOTE_DROPPED_BIT     (1 << 8)
+#define HSTATUS_REMOTE_TIMEOUT_BIT     (1 << 9)
+#define HSTATUS_NETWORK_TIMEOUT_BIT    (1 << 10)
+#define HSTATUS_RANDOM                 0xffffffff
+
 /** ioctl parameter for LNet fault simulation */
 struct lnet_fault_attr {
        /**
@@ -88,6 +103,10 @@ struct lnet_fault_attr {
                         * with da_rate
                         */
                        __u32                   da_interval;
+                       /** error type mask */
+                       __u32                   da_health_error_mask;
+                       /** randomize error generation */
+                       bool                    da_random;
                } drop;
                /** message latency simulation */
                struct {
index 3cabb22..4ebc9a5 100644 (file)
@@ -522,7 +522,7 @@ kiblnd_del_peer(struct lnet_ni *ni, lnet_nid_t nid)
 
        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 
-       kiblnd_txlist_done(&zombies, -EIO);
+       kiblnd_txlist_done(&zombies, -EIO, LNET_MSG_STATUS_LOCAL_ERROR);
 
        return rc;
 }
index 5c7746e..fa87dd1 100644 (file)
@@ -590,6 +590,8 @@ struct kib_tx {                                     /* transmit message */
        short                   tx_waiting;
        /* LNET completion status */
        int                     tx_status;
+       /* health status of the transmit */
+       enum lnet_msg_hstatus   tx_hstatus;
        /* completion deadline */
        ktime_t                 tx_deadline;
        /* completion cookie */
@@ -1164,7 +1166,8 @@ void kiblnd_close_conn(struct kib_conn *conn, int error);
 void kiblnd_close_conn_locked(struct kib_conn *conn, int error);
 
 void kiblnd_launch_tx(struct lnet_ni *ni, struct kib_tx *tx, lnet_nid_t nid);
-void kiblnd_txlist_done(struct list_head *txlist, int status);
+void kiblnd_txlist_done(struct list_head *txlist, int status,
+                       enum lnet_msg_hstatus hstatus);
 
 void kiblnd_qp_event(struct ib_event *event, void *arg);
 void kiblnd_cq_event(struct ib_event *event, void *arg);
index eba63b1..65eb85e 100644 (file)
@@ -86,12 +86,17 @@ kiblnd_tx_done(struct kib_tx *tx)
                if (lntmsg[i] == NULL)
                        continue;
 
+               /* propagate health status to LNet for requests */
+               if (i == 0 && lntmsg[i])
+                       lntmsg[i]->msg_health_status = tx->tx_hstatus;
+
                lnet_finalize(lntmsg[i], rc);
        }
 }
 
 void
-kiblnd_txlist_done(struct list_head *txlist, int status)
+kiblnd_txlist_done(struct list_head *txlist, int status,
+                  enum lnet_msg_hstatus hstatus)
 {
        struct kib_tx *tx;
 
@@ -102,6 +107,7 @@ kiblnd_txlist_done(struct list_head *txlist, int status)
                /* complete now */
                tx->tx_waiting = 0;
                tx->tx_status = status;
+               tx->tx_hstatus = hstatus;
                kiblnd_tx_done(tx);
        }
 }
@@ -131,6 +137,7 @@ kiblnd_get_idle_tx(struct lnet_ni *ni, lnet_nid_t target)
         LASSERT (tx->tx_nfrags == 0);
 
        tx->tx_gaps = false;
+       tx->tx_hstatus = LNET_MSG_STATUS_OK;
 
         return tx;
 }
@@ -266,23 +273,24 @@ kiblnd_handle_completion(struct kib_conn *conn, int txtype, int status, u64 cook
                spin_unlock(&conn->ibc_lock);
 
                CWARN("Unmatched completion type %x cookie %#llx from %s\n",
-                      txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
-                kiblnd_close_conn(conn, -EPROTO);
-                return;
-        }
+                     txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
+               kiblnd_close_conn(conn, -EPROTO);
+               return;
+       }
 
-        if (tx->tx_status == 0) {               /* success so far */
-                if (status < 0) {               /* failed? */
-                        tx->tx_status = status;
-                } else if (txtype == IBLND_MSG_GET_REQ) {
-                        lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
-                }
-        }
+       if (tx->tx_status == 0) {               /* success so far */
+               if (status < 0) {               /* failed? */
+                       tx->tx_status = status;
+                       tx->tx_hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
+               } else if (txtype == IBLND_MSG_GET_REQ) {
+                       lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
+               }
+       }
 
-        tx->tx_waiting = 0;
+       tx->tx_waiting = 0;
 
-        idle = !tx->tx_queued && (tx->tx_sending == 0);
-        if (idle)
+       idle = !tx->tx_queued && (tx->tx_sending == 0);
+       if (idle)
                list_del(&tx->tx_list);
 
        spin_unlock(&conn->ibc_lock);
@@ -906,6 +914,7 @@ __must_hold(&conn->ibc_lock)
                 * kiblnd_check_sends_locked will queue NOOP again when
                 * posted NOOPs complete */
                spin_unlock(&conn->ibc_lock);
+               tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
                kiblnd_tx_done(tx);
                spin_lock(&conn->ibc_lock);
                 CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
@@ -960,7 +969,10 @@ __must_hold(&conn->ibc_lock)
                         libcfs_nid2str(conn->ibc_peer->ibp_nid));
 
                bad = NULL;
-               rc = ib_post_send(conn->ibc_cmid->qp, wr, &bad);
+               if (lnet_send_error_simulation(tx->tx_lntmsg[0], &tx->tx_hstatus))
+                       rc = -EINVAL;
+               else
+                       rc = ib_post_send(conn->ibc_cmid->qp, wr, &bad);
        }
 
        conn->ibc_last_send = ktime_get();
@@ -1101,6 +1113,7 @@ kiblnd_tx_complete(struct kib_tx *tx, int status)
                 conn->ibc_noops_posted--;
 
         if (failed) {
+               tx->tx_hstatus = LNET_MSG_STATUS_REMOTE_DROPPED;
                 tx->tx_waiting = 0;             /* don't wait for peer_ni */
                 tx->tx_status = -EIO;
         }
@@ -1268,7 +1281,7 @@ kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn)
        LASSERT(!tx->tx_queued);        /* not queued for sending already */
        LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
 
-       timeout_ns = *kiblnd_tunables.kib_timeout * NSEC_PER_SEC;
+       timeout_ns = lnet_get_lnd_timeout() * NSEC_PER_SEC;
        tx->tx_queued = 1;
        tx->tx_deadline = ktime_add_ns(ktime_get(), timeout_ns);
 
@@ -1394,21 +1407,21 @@ kiblnd_connect_peer(struct kib_peer_ni *peer_ni)
 
         kiblnd_peer_addref(peer_ni);               /* cmid's ref */
 
-        if (*kiblnd_tunables.kib_use_priv_port) {
-                rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
-                                         *kiblnd_tunables.kib_timeout * 1000);
-        } else {
-                rc = rdma_resolve_addr(cmid,
-                                       (struct sockaddr *)&srcaddr,
-                                       (struct sockaddr *)&dstaddr,
-                                       *kiblnd_tunables.kib_timeout * 1000);
-        }
-        if (rc != 0) {
-                /* Can't initiate address resolution:  */
-                CERROR("Can't resolve addr for %s: %d\n",
-                       libcfs_nid2str(peer_ni->ibp_nid), rc);
-                goto failed2;
-        }
+       if (*kiblnd_tunables.kib_use_priv_port) {
+               rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
+                                        lnet_get_lnd_timeout() * 1000);
+       } else {
+               rc = rdma_resolve_addr(cmid,
+                                      (struct sockaddr *)&srcaddr,
+                                      (struct sockaddr *)&dstaddr,
+                                      lnet_get_lnd_timeout() * 1000);
+       }
+       if (rc != 0) {
+               /* Can't initiate address resolution:  */
+               CERROR("Can't resolve addr for %s: %d\n",
+                      libcfs_nid2str(peer_ni->ibp_nid), rc);
+               goto failed2;
+       }
 
        return;
 
@@ -1468,7 +1481,8 @@ kiblnd_reconnect_peer(struct kib_peer_ni *peer_ni)
 
        CWARN("Abort reconnection of %s: %s\n",
              libcfs_nid2str(peer_ni->ibp_nid), reason);
-       kiblnd_txlist_done(&txs, -ECONNABORTED);
+       kiblnd_txlist_done(&txs, -ECONNABORTED,
+                          LNET_MSG_STATUS_LOCAL_ABORTED);
        return false;
 }
 
@@ -1543,6 +1557,7 @@ kiblnd_launch_tx(struct lnet_ni *ni, struct kib_tx *tx, lnet_nid_t nid)
                if (tx != NULL) {
                        tx->tx_status = -EHOSTUNREACH;
                        tx->tx_waiting = 0;
+                       tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
                        kiblnd_tx_done(tx);
                }
                return;
@@ -1667,6 +1682,7 @@ kiblnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg)
                if (rc != 0) {
                        CERROR("Can't setup GET sink for %s: %d\n",
                               libcfs_nid2str(target.nid), rc);
+                       tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
                        kiblnd_tx_done(tx);
                        return -EIO;
                }
@@ -1821,9 +1837,11 @@ kiblnd_reply(struct lnet_ni *ni, struct kib_rx *rx, struct lnet_msg *lntmsg)
         kiblnd_queue_tx(tx, rx->rx_conn);
         return;
 
- failed_1:
+
+failed_1:
+       tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
        kiblnd_tx_done(tx);
- failed_0:
+failed_0:
        lnet_finalize(lntmsg, -EIO);
 }
 
@@ -1905,6 +1923,7 @@ kiblnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *lntmsg,
                if (rc != 0) {
                        CERROR("Can't setup PUT sink for %s: %d\n",
                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
+                       tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
                        kiblnd_tx_done(tx);
                        /* tell peer_ni it's over */
                        kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK,
@@ -2118,13 +2137,35 @@ kiblnd_abort_txs(struct kib_conn *conn, struct list_head *txs)
                        LASSERT(!tx->tx_queued);
                        LASSERT(tx->tx_waiting ||
                                tx->tx_sending != 0);
+                       if (conn->ibc_comms_error == -ETIMEDOUT) {
+                               if (tx->tx_waiting && !tx->tx_sending)
+                                       tx->tx_hstatus =
+                                         LNET_MSG_STATUS_REMOTE_TIMEOUT;
+                               else if (tx->tx_sending)
+                                       tx->tx_hstatus =
+                                         LNET_MSG_STATUS_NETWORK_TIMEOUT;
+                       }
                } else {
                        LASSERT(tx->tx_queued);
+                       if (conn->ibc_comms_error == -ETIMEDOUT)
+                               tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_TIMEOUT;
+                       else
+                               tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
                }
 
                tx->tx_status = -ECONNABORTED;
                tx->tx_waiting = 0;
 
+               /*
+                * TODO: This makes an assumption that
+                * kiblnd_tx_complete() will be called for each tx. If
+                * that event is dropped we could end up with stale
+                * connections floating around. We'd like to deal with
+                * that in a better way.
+                *
+                * Also that means we can exceed the timeout by many
+                * seconds.
+                */
                if (tx->tx_sending == 0) {
                        tx->tx_queued = 0;
                        list_del(&tx->tx_list);
@@ -2134,7 +2175,11 @@ kiblnd_abort_txs(struct kib_conn *conn, struct list_head *txs)
 
        spin_unlock(&conn->ibc_lock);
 
-       kiblnd_txlist_done(&zombies, -ECONNABORTED);
+       /*
+        * aborting transmits occurs when finalizing the connection.
+        * The connection is finalized on error
+        */
+       kiblnd_txlist_done(&zombies, -ECONNABORTED, -1);
 }
 
 static void
@@ -2213,7 +2258,8 @@ kiblnd_peer_connect_failed(struct kib_peer_ni *peer_ni, int active,
        CNETERR("Deleting messages for %s: connection failed\n",
                libcfs_nid2str(peer_ni->ibp_nid));
 
-       kiblnd_txlist_done(&zombies, -EHOSTUNREACH);
+       kiblnd_txlist_done(&zombies, error,
+                          LNET_MSG_STATUS_LOCAL_DROPPED);
 }
 
 static void
@@ -2288,7 +2334,8 @@ kiblnd_connreq_done(struct kib_conn *conn, int status)
                 kiblnd_close_conn_locked(conn, -ECONNABORTED);
                write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 
-               kiblnd_txlist_done(&txs, -ECONNABORTED);
+               kiblnd_txlist_done(&txs, -ECONNABORTED,
+                                  LNET_MSG_STATUS_LOCAL_ERROR);
 
                return;
        }
@@ -3109,9 +3156,9 @@ kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
                         CNETERR("Can't resolve address for %s: %d\n",
                                 libcfs_nid2str(peer_ni->ibp_nid), event->status);
                         rc = event->status;
-                } else {
-                        rc = rdma_resolve_route(
-                                cmid, *kiblnd_tunables.kib_timeout * 1000);
+               } else {
+                       rc = rdma_resolve_route(
+                               cmid, lnet_get_lnd_timeout() * 1000);
                        if (rc == 0) {
                                struct kib_net *net = peer_ni->ibp_ni->ni_data;
                                struct kib_dev *dev = net->ibn_dev;
@@ -3361,7 +3408,8 @@ kiblnd_check_conns (int idx)
        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
 
        if (!list_empty(&timedout_txs))
-               kiblnd_txlist_done(&timedout_txs, -ETIMEDOUT);
+               kiblnd_txlist_done(&timedout_txs, -ETIMEDOUT,
+                                  LNET_MSG_STATUS_LOCAL_TIMEOUT);
 
        /* Handle timeout by closing the whole
         * connection. We can only be sure RDMA activity
@@ -3515,6 +3563,7 @@ kiblnd_connd (void *arg)
                         const int n = 4;
                         const int p = 1;
                         int       chunk = kiblnd_data.kib_peer_hash_size;
+                       unsigned int lnd_timeout;
 
                        spin_unlock_irqrestore(lock, flags);
                         dropped_lock = 1;
@@ -3527,11 +3576,11 @@ kiblnd_connd (void *arg)
                          * connection within (n+1)/n times the timeout
                          * interval. */
 
-                        if (*kiblnd_tunables.kib_timeout > n * p)
-                                chunk = (chunk * n * p) /
-                                        *kiblnd_tunables.kib_timeout;
-                        if (chunk == 0)
-                                chunk = 1;
+                       lnd_timeout = lnet_get_lnd_timeout();
+                       if (lnd_timeout > n * p)
+                               chunk = (chunk * n * p) / lnd_timeout;
+                       if (chunk == 0)
+                               chunk = 1;
 
                        for (i = 0; i < chunk; i++) {
                                kiblnd_check_conns(peer_index);
@@ -3569,21 +3618,34 @@ kiblnd_qp_event(struct ib_event *event, void *arg)
 {
        struct kib_conn *conn = arg;
 
-        switch (event->event) {
-        case IB_EVENT_COMM_EST:
-                CDEBUG(D_NET, "%s established\n",
-                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
+       switch (event->event) {
+       case IB_EVENT_COMM_EST:
+               CDEBUG(D_NET, "%s established\n",
+                      libcfs_nid2str(conn->ibc_peer->ibp_nid));
                /* We received a packet but connection isn't established
                 * probably handshake packet was lost, so free to
                 * force make connection established */
                rdma_notify(conn->ibc_cmid, IB_EVENT_COMM_EST);
-                return;
+               return;
 
-        default:
-                CERROR("%s: Async QP event type %d\n",
-                       libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
-                return;
-        }
+       case IB_EVENT_PORT_ERR:
+       case IB_EVENT_DEVICE_FATAL:
+               CERROR("Fatal device error for NI %s\n",
+                      libcfs_nid2str(conn->ibc_peer->ibp_ni->ni_nid));
+               atomic_set(&conn->ibc_peer->ibp_ni->ni_fatal_error_on, 1);
+               return;
+
+       case IB_EVENT_PORT_ACTIVE:
+               CERROR("Port reactivated for NI %s\n",
+                      libcfs_nid2str(conn->ibc_peer->ibp_ni->ni_nid));
+               atomic_set(&conn->ibc_peer->ibp_ni->ni_fatal_error_on, 0);
+               return;
+
+       default:
+               CERROR("%s: Async QP event type %d\n",
+                      libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
+               return;
+       }
 }
 
 static void
index 1e36746..0f5b15a 100644 (file)
@@ -1288,7 +1288,7 @@ ksocknal_create_conn(struct lnet_ni *ni, struct ksock_route *route,
        /* Set the deadline for the outgoing HELLO to drain */
        conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
        conn->ksnc_tx_deadline = ktime_get_seconds() +
-                                *ksocknal_tunables.ksnd_timeout;
+                                lnet_get_lnd_timeout();
        smp_mb();   /* order with adding to peer_ni's conn list */
 
        list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
@@ -1669,7 +1669,7 @@ ksocknal_destroy_conn(struct ksock_conn *conn)
         switch (conn->ksnc_rx_state) {
         case SOCKNAL_RX_LNET_PAYLOAD:
                 last_rcv = conn->ksnc_rx_deadline -
-                          *ksocknal_tunables.ksnd_timeout;
+                          lnet_get_lnd_timeout();
                CERROR("Completing partial receive from %s[%d], "
                       "ip %pI4h:%d, with error, wanted: %d, left: %d, "
                       "last alive is %lld secs ago\n",
@@ -1843,7 +1843,7 @@ ksocknal_query(struct lnet_ni *ni, lnet_nid_t nid, time64_t *when)
                        if (bufnob < conn->ksnc_tx_bufnob) {
                                /* something got ACKed */
                                conn->ksnc_tx_deadline = ktime_get_seconds() +
-                                                        *ksocknal_tunables.ksnd_timeout;
+                                                        lnet_get_lnd_timeout();
                                 peer_ni->ksnp_last_alive = now;
                                 conn->ksnc_tx_bufnob = bufnob;
                         }
index f409443..c639ae4 100644 (file)
@@ -282,6 +282,7 @@ struct ksock_tx {                   /* transmit packet */
        time64_t           tx_deadline; /* when (in secs) tx times out */
        struct ksock_msg   tx_msg;         /* socklnd message buffer */
         int            tx_desc_size;   /* size of this descriptor */
+       enum lnet_msg_hstatus tx_hstatus; /* health status of tx */
         union {
                 struct {
                        struct kvec iov;        /* virt hdr */
index c27844e..aa32b2a 100644 (file)
@@ -57,6 +57,7 @@ ksocknal_alloc_tx(int type, int size)
        tx->tx_zc_aborted = 0;
        tx->tx_zc_capable = 0;
        tx->tx_zc_checked = 0;
+       tx->tx_hstatus = LNET_MSG_STATUS_OK;
        tx->tx_desc_size  = size;
 
        atomic_inc(&ksocknal_data.ksnd_nactive_txs);
@@ -222,7 +223,7 @@ ksocknal_transmit(struct ksock_conn *conn, struct ksock_tx *tx)
                        /* allocated send buffer bytes < computed; infer
                         * something got ACKed */
                        conn->ksnc_tx_deadline = ktime_get_seconds() +
-                                                *ksocknal_tunables.ksnd_timeout;
+                                                lnet_get_lnd_timeout();
                        conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds();
                        conn->ksnc_tx_bufnob = bufnob;
                        smp_mb();
@@ -271,7 +272,7 @@ ksocknal_recv_iov(struct ksock_conn *conn)
 
        conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds();
        conn->ksnc_rx_deadline = ktime_get_seconds() +
-                                *ksocknal_tunables.ksnd_timeout;
+                                lnet_get_lnd_timeout();
        smp_mb();                       /* order with setting rx_started */
        conn->ksnc_rx_started = 1;
 
@@ -315,7 +316,7 @@ ksocknal_recv_kiov(struct ksock_conn *conn)
 
        conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds();
        conn->ksnc_rx_deadline = ktime_get_seconds() +
-                                *ksocknal_tunables.ksnd_timeout;
+                                lnet_get_lnd_timeout();
        smp_mb();                       /* order with setting rx_started */
        conn->ksnc_rx_started = 1;
 
@@ -392,19 +393,28 @@ void
 ksocknal_tx_done(struct lnet_ni *ni, struct ksock_tx *tx, int rc)
 {
        struct lnet_msg *lnetmsg = tx->tx_lnetmsg;
+       enum lnet_msg_hstatus hstatus = tx->tx_hstatus;
         ENTRY;
 
        LASSERT(ni != NULL || tx->tx_conn != NULL);
 
-       if (!rc && (tx->tx_resid != 0 || tx->tx_zc_aborted))
+       if (!rc && (tx->tx_resid != 0 || tx->tx_zc_aborted)) {
                rc = -EIO;
+               if (hstatus == LNET_MSG_STATUS_OK)
+                       hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
+       }
 
        if (tx->tx_conn != NULL)
                ksocknal_conn_decref(tx->tx_conn);
 
        ksocknal_free_tx(tx);
-       if (lnetmsg != NULL) /* KSOCK_MSG_NOOP go without lnetmsg */
+       if (lnetmsg != NULL) { /* KSOCK_MSG_NOOP go without lnetmsg */
+               if (rc)
+                       CERROR("tx failure rc = %d, hstatus = %d\n", rc,
+                              hstatus);
+               lnetmsg->msg_health_status = hstatus;
                lnet_finalize(lnetmsg, rc);
+       }
 
        EXIT;
 }
@@ -429,6 +439,22 @@ ksocknal_txlist_done(struct lnet_ni *ni, struct list_head *txlist, int error)
 
                list_del(&tx->tx_list);
 
+               if (tx->tx_hstatus == LNET_MSG_STATUS_OK) {
+                       if (error == -ETIMEDOUT)
+                               tx->tx_hstatus =
+                                 LNET_MSG_STATUS_LOCAL_TIMEOUT;
+                       else if (error == -ENETDOWN ||
+                                error == -EHOSTUNREACH ||
+                                error == -ENETUNREACH)
+                               tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_DROPPED;
+                       /*
+                        * for all other errors we don't want to
+                        * retransmit
+                        */
+                       else if (error)
+                               tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
+               }
+
                LASSERT(atomic_read(&tx->tx_refcount) == 1);
                ksocknal_tx_done(ni, tx, error);
        }
@@ -464,7 +490,7 @@ ksocknal_check_zc_req(struct ksock_tx *tx)
 
         /* ZC_REQ is going to be pinned to the peer_ni */
        tx->tx_deadline = ktime_get_seconds() +
-                         *ksocknal_tunables.ksnd_timeout;
+                         lnet_get_lnd_timeout();
 
         LASSERT (tx->tx_msg.ksm_zc_cookies[0] == 0);
 
@@ -508,6 +534,13 @@ static int
 ksocknal_process_transmit(struct ksock_conn *conn, struct ksock_tx *tx)
 {
        int rc;
+       bool error_sim = false;
+
+       if (lnet_send_error_simulation(tx->tx_lnetmsg, &tx->tx_hstatus)) {
+               error_sim = true;
+               rc = -EINVAL;
+               goto simulate_error;
+       }
 
         if (tx->tx_zc_capable && !tx->tx_zc_checked)
                 ksocknal_check_zc_req(tx);
@@ -546,39 +579,58 @@ ksocknal_process_transmit(struct ksock_conn *conn, struct ksock_tx *tx)
                        wake_up(&ksocknal_data.ksnd_reaper_waitq);
 
                spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
+
+               /*
+                * set the health status of the message which determines
+                * whether we should retry the transmit
+                */
+               tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
                return (rc);
        }
 
-        /* Actual error */
-        LASSERT (rc < 0);
+simulate_error:
 
-        if (!conn->ksnc_closing) {
-                switch (rc) {
-                case -ECONNRESET:
+       /* Actual error */
+       LASSERT(rc < 0);
+
+       if (!error_sim) {
+               /*
+               * set the health status of the message which determines
+               * whether we should retry the transmit
+               */
+               if (rc == -ETIMEDOUT)
+                       tx->tx_hstatus = LNET_MSG_STATUS_REMOTE_TIMEOUT;
+               else
+                       tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_ERROR;
+       }
+
+       if (!conn->ksnc_closing) {
+               switch (rc) {
+               case -ECONNRESET:
                        LCONSOLE_WARN("Host %pI4h reset our connection "
-                                      "while we were sending data; it may have "
-                                      "rebooted.\n",
+                                     "while we were sending data; it may have "
+                                     "rebooted.\n",
                                      &conn->ksnc_ipaddr);
-                        break;
-                default:
-                        LCONSOLE_WARN("There was an unexpected network error "
+                       break;
+               default:
+                       LCONSOLE_WARN("There was an unexpected network error "
                                      "while writing to %pI4h: %d.\n",
                                      &conn->ksnc_ipaddr, rc);
-                        break;
-                }
+                       break;
+               }
                CDEBUG(D_NET, "[%p] Error %d on write to %s ip %pI4h:%d\n",
                       conn, rc, libcfs_id2str(conn->ksnc_peer->ksnp_id),
                       &conn->ksnc_ipaddr, conn->ksnc_port);
-        }
+       }
 
-        if (tx->tx_zc_checked)
-                ksocknal_uncheck_zc_req(tx);
+       if (tx->tx_zc_checked)
+               ksocknal_uncheck_zc_req(tx);
 
-        /* it's not an error if conn is being closed */
-        ksocknal_close_conn_and_siblings (conn,
-                                          (conn->ksnc_closing) ? 0 : rc);
+       /* it's not an error if conn is being closed */
+       ksocknal_close_conn_and_siblings(conn,
+                                         (conn->ksnc_closing) ? 0 : rc);
 
-        return (rc);
+       return rc;
 }
 
 static void
@@ -730,7 +782,7 @@ ksocknal_queue_tx_locked(struct ksock_tx *tx, struct ksock_conn *conn)
        if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) {
                /* First packet starts the timeout */
                conn->ksnc_tx_deadline = ktime_get_seconds() +
-                                        *ksocknal_tunables.ksnd_timeout;
+                                        lnet_get_lnd_timeout();
                if (conn->ksnc_tx_bufnob > 0) /* something got ACKed */
                        conn->ksnc_peer->ksnp_last_alive = ktime_get_seconds();
                conn->ksnc_tx_bufnob = 0;
@@ -907,7 +959,7 @@ ksocknal_launch_packet(struct lnet_ni *ni, struct ksock_tx *tx,
             ksocknal_find_connecting_route_locked (peer_ni) != NULL) {
                 /* the message is going to be pinned to the peer_ni */
                tx->tx_deadline = ktime_get_seconds() +
-                                 *ksocknal_tunables.ksnd_timeout;
+                                 lnet_get_lnd_timeout();
 
                 /* Queue the message until a connection is established */
                list_add_tail(&tx->tx_list, &peer_ni->ksnp_tx_queue);
@@ -1722,7 +1774,7 @@ ksocknal_recv_hello(struct lnet_ni *ni, struct ksock_conn *conn,
        /* socket type set on active connections - not set on passive */
        LASSERT(!active == !(conn->ksnc_type != SOCKLND_CONN_NONE));
 
-       timeout = active ? *ksocknal_tunables.ksnd_timeout :
+       timeout = active ? lnet_get_lnd_timeout() :
                            lnet_acceptor_timeout();
 
        rc = lnet_sock_read(sock, &hello->kshm_magic,
@@ -1857,7 +1909,7 @@ ksocknal_connect(struct ksock_route *route)
         int               retry_later = 0;
         int               rc = 0;
 
-       deadline = ktime_get_seconds() + *ksocknal_tunables.ksnd_timeout;
+       deadline = ktime_get_seconds() + lnet_get_lnd_timeout();
 
        write_lock_bh(&ksocknal_data.ksnd_global_lock);
 
@@ -2246,6 +2298,7 @@ ksocknal_find_timed_out_conn(struct ksock_peer_ni *peer_ni)
         /* We're called with a shared lock on ksnd_global_lock */
        struct ksock_conn *conn;
        struct list_head *ctmp;
+       struct ksock_tx *tx;
 
        list_for_each(ctmp, &peer_ni->ksnp_conns) {
                int error;
@@ -2310,6 +2363,10 @@ ksocknal_find_timed_out_conn(struct ksock_peer_ni *peer_ni)
                         /* Timed out messages queued for sending or
                          * buffered in the socket's send buffer */
                         ksocknal_conn_addref(conn);
+                       list_for_each_entry(tx, &conn->ksnc_tx_queue,
+                                           tx_list)
+                               tx->tx_hstatus =
+                                       LNET_MSG_STATUS_LOCAL_TIMEOUT;
                        CNETERR("Timeout sending data to %s (%pI4h:%d) "
                                 "the network or that node may be down.\n",
                                 libcfs_id2str(peer_ni->ksnp_id),
@@ -2336,6 +2393,8 @@ ksocknal_flush_stale_txs(struct ksock_peer_ni *peer_ni)
                if (ktime_get_seconds() < tx->tx_deadline)
                        break;
 
+               tx->tx_hstatus = LNET_MSG_STATUS_LOCAL_TIMEOUT;
+
                list_del(&tx->tx_list);
                list_add_tail(&tx->tx_list, &stale_txs);
        }
@@ -2589,6 +2648,7 @@ int ksocknal_reaper(void *arg)
                         const int n = 4;
                         const int p = 1;
                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
+                       unsigned int lnd_timeout;
 
                         /* Time to check for timeouts on a few more peers: I do
                          * checks every 'p' seconds on a proportion of the peer_ni
@@ -2597,11 +2657,11 @@ int ksocknal_reaper(void *arg)
                          * timeout on any connection within (n+1)/n times the
                          * timeout interval. */
 
-                        if (*ksocknal_tunables.ksnd_timeout > n * p)
-                                chunk = (chunk * n * p) /
-                                        *ksocknal_tunables.ksnd_timeout;
-                        if (chunk == 0)
-                                chunk = 1;
+                       lnd_timeout = lnet_get_lnd_timeout();
+                       if (lnd_timeout > n * p)
+                               chunk = (chunk * n * p) / lnd_timeout;
+                       if (chunk == 0)
+                               chunk = 1;
 
                         for (i = 0; i < chunk; i++) {
                                 ksocknal_check_peer_timeouts (peer_index);
index f42ca74..9a56605 100644 (file)
@@ -78,6 +78,28 @@ module_param(lnet_numa_range, uint, 0444);
 MODULE_PARM_DESC(lnet_numa_range,
                "NUMA range to consider during Multi-Rail selection");
 
+/*
+ * lnet_health_sensitivity determines by how much we decrement the health
+ * value on sending error. The value defaults to 0, which means health
+ * checking is turned off by default.
+ */
+unsigned int lnet_health_sensitivity = 0;
+static int sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp);
+static struct kernel_param_ops param_ops_health_sensitivity = {
+       .set = sensitivity_set,
+       .get = param_get_int,
+};
+#define param_check_health_sensitivity(name, p) \
+               __param_check(name, p, int)
+#ifdef HAVE_KERNEL_PARAM_OPS
+module_param(lnet_health_sensitivity, health_sensitivity, S_IRUGO|S_IWUSR);
+#else
+module_param_call(lnet_health_sensitivity, sensitivity_set, param_get_int,
+                 &lnet_health_sensitivity, S_IRUGO|S_IWUSR);
+#endif
+MODULE_PARM_DESC(lnet_health_sensitivity,
+               "Value to decrement the health value by on error");
+
 static int lnet_interfaces_max = LNET_INTERFACES_MAX_DEFAULT;
 static int intf_max_set(const char *val, cfs_kernel_param_arg_t *kp);
 
@@ -118,9 +140,43 @@ MODULE_PARM_DESC(lnet_peer_discovery_disabled,
                "Set to 1 to disable peer discovery on this node.");
 
 unsigned lnet_transaction_timeout = 5;
-module_param(lnet_transaction_timeout, uint, 0444);
-MODULE_PARM_DESC(lnet_transaction_timeout,
-               "Time in seconds to wait for a REPLY or an ACK");
+static int transaction_to_set(const char *val, cfs_kernel_param_arg_t *kp);
+static struct kernel_param_ops param_ops_transaction_timeout = {
+       .set = transaction_to_set,
+       .get = param_get_int,
+};
+
+#define param_check_transaction_timeout(name, p) \
+               __param_check(name, p, int)
+#ifdef HAVE_KERNEL_PARAM_OPS
+module_param(lnet_transaction_timeout, transaction_timeout, S_IRUGO|S_IWUSR);
+#else
+module_param_call(lnet_transaction_timeout, transaction_to_set, param_get_int,
+                 &lnet_transaction_timeout, S_IRUGO|S_IWUSR);
+#endif
+MODULE_PARM_DESC(lnet_peer_discovery_disabled,
+               "Set to 1 to disable peer discovery on this node.");
+
+unsigned lnet_retry_count = 0;
+static int retry_count_set(const char *val, cfs_kernel_param_arg_t *kp);
+static struct kernel_param_ops param_ops_retry_count = {
+       .set = retry_count_set,
+       .get = param_get_int,
+};
+
+#define param_check_retry_count(name, p) \
+               __param_check(name, p, int)
+#ifdef HAVE_KERNEL_PARAM_OPS
+module_param(lnet_retry_count, retry_count, S_IRUGO|S_IWUSR);
+#else
+module_param_call(lnet_retry_count, retry_count_set, param_get_int,
+                 &lnet_retry_count, S_IRUGO|S_IWUSR);
+#endif
+MODULE_PARM_DESC(lnet_retry_count,
+                "Maximum number of times to retry transmitting a message");
+
+
+unsigned lnet_lnd_timeout = LNET_LND_DEFAULT_TIMEOUT;
 
 /*
  * This sequence number keeps track of how many times DLC was used to
@@ -138,6 +194,42 @@ static int lnet_discover(struct lnet_process_id id, __u32 force,
                         struct lnet_process_id __user *ids, int n_ids);
 
 static int
+sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp)
+{
+       int rc;
+       unsigned *sensitivity = (unsigned *)kp->arg;
+       unsigned long value;
+
+       rc = kstrtoul(val, 0, &value);
+       if (rc) {
+               CERROR("Invalid module parameter value for 'lnet_health_sensitivity'\n");
+               return rc;
+       }
+
+       /*
+        * The purpose of locking the api_mutex here is to ensure that
+        * the correct value ends up stored properly.
+        */
+       mutex_lock(&the_lnet.ln_api_mutex);
+
+       if (the_lnet.ln_state != LNET_STATE_RUNNING) {
+               mutex_unlock(&the_lnet.ln_api_mutex);
+               return 0;
+       }
+
+       if (value == *sensitivity) {
+               mutex_unlock(&the_lnet.ln_api_mutex);
+               return 0;
+       }
+
+       *sensitivity = value;
+
+       mutex_unlock(&the_lnet.ln_api_mutex);
+
+       return 0;
+}
+
+static int
 discovery_set(const char *val, cfs_kernel_param_arg_t *kp)
 {
        int rc;
@@ -188,6 +280,103 @@ discovery_set(const char *val, cfs_kernel_param_arg_t *kp)
 }
 
 static int
+transaction_to_set(const char *val, cfs_kernel_param_arg_t *kp)
+{
+       int rc;
+       unsigned *transaction_to = (unsigned *)kp->arg;
+       unsigned long value;
+
+       rc = kstrtoul(val, 0, &value);
+       if (rc) {
+               CERROR("Invalid module parameter value for 'lnet_transaction_timeout'\n");
+               return rc;
+       }
+
+       /*
+        * The purpose of locking the api_mutex here is to ensure that
+        * the correct value ends up stored properly.
+        */
+       mutex_lock(&the_lnet.ln_api_mutex);
+
+       if (the_lnet.ln_state != LNET_STATE_RUNNING) {
+               mutex_unlock(&the_lnet.ln_api_mutex);
+               return 0;
+       }
+
+       if (value < lnet_retry_count || value == 0) {
+               mutex_unlock(&the_lnet.ln_api_mutex);
+               CERROR("Invalid value for lnet_transaction_timeout (%lu). "
+                      "Has to be greater than lnet_retry_count (%u)\n",
+                      value, lnet_retry_count);
+               return -EINVAL;
+       }
+
+       if (value == *transaction_to) {
+               mutex_unlock(&the_lnet.ln_api_mutex);
+               return 0;
+       }
+
+       *transaction_to = value;
+       if (lnet_retry_count == 0)
+               lnet_lnd_timeout = value;
+       else
+               lnet_lnd_timeout = value / lnet_retry_count;
+
+       mutex_unlock(&the_lnet.ln_api_mutex);
+
+       return 0;
+}
+
+static int
+retry_count_set(const char *val, cfs_kernel_param_arg_t *kp)
+{
+       int rc;
+       unsigned *retry_count = (unsigned *)kp->arg;
+       unsigned long value;
+
+       rc = kstrtoul(val, 0, &value);
+       if (rc) {
+               CERROR("Invalid module parameter value for 'lnet_retry_count'\n");
+               return rc;
+       }
+
+       /*
+        * The purpose of locking the api_mutex here is to ensure that
+        * the correct value ends up stored properly.
+        */
+       mutex_lock(&the_lnet.ln_api_mutex);
+
+       if (the_lnet.ln_state != LNET_STATE_RUNNING) {
+               mutex_unlock(&the_lnet.ln_api_mutex);
+               return 0;
+       }
+
+       if (value > lnet_transaction_timeout) {
+               mutex_unlock(&the_lnet.ln_api_mutex);
+               CERROR("Invalid value for lnet_retry_count (%lu). "
+                      "Has to be smaller than lnet_transaction_timeout (%u)\n",
+                      value, lnet_transaction_timeout);
+               return -EINVAL;
+       }
+
+       if (value == *retry_count) {
+               mutex_unlock(&the_lnet.ln_api_mutex);
+               return 0;
+       }
+
+       *retry_count = value;
+
+       if (value == 0)
+               lnet_lnd_timeout = lnet_transaction_timeout;
+       else
+               lnet_lnd_timeout = lnet_transaction_timeout / value;
+
+       mutex_unlock(&the_lnet.ln_api_mutex);
+
+       return 0;
+}
+
+static int
 intf_max_set(const char *val, cfs_kernel_param_arg_t *kp)
 {
        int value, rc;
@@ -244,7 +433,7 @@ lnet_init_locks(void)
        spin_lock_init(&the_lnet.ln_eq_wait_lock);
        spin_lock_init(&the_lnet.ln_msg_resend_lock);
        init_waitqueue_head(&the_lnet.ln_eq_waitq);
-       init_waitqueue_head(&the_lnet.ln_rc_waitq);
+       init_waitqueue_head(&the_lnet.ln_mt_waitq);
        mutex_init(&the_lnet.ln_lnd_mutex);
 }
 
@@ -510,6 +699,13 @@ static struct lnet_lnd *lnet_find_lnd_by_type(__u32 type)
        return NULL;
 }
 
+unsigned int
+lnet_get_lnd_timeout(void)
+{
+       return lnet_lnd_timeout;
+}
+EXPORT_SYMBOL(lnet_get_lnd_timeout);
+
 void
 lnet_register_lnd(struct lnet_lnd *lnd)
 {
@@ -555,7 +751,20 @@ lnet_counters_get(struct lnet_counters *counters)
        cfs_percpt_for_each(ctr, i, the_lnet.ln_counters) {
                counters->msgs_max     += ctr->msgs_max;
                counters->msgs_alloc   += ctr->msgs_alloc;
+               counters->rst_alloc    += ctr->rst_alloc;
                counters->errors       += ctr->errors;
+               counters->resend_count += ctr->resend_count;
+               counters->response_timeout_count += ctr->response_timeout_count;
+               counters->local_interrupt_count += ctr->local_interrupt_count;
+               counters->local_dropped_count += ctr->local_dropped_count;
+               counters->local_aborted_count += ctr->local_aborted_count;
+               counters->local_no_route_count += ctr->local_no_route_count;
+               counters->local_timeout_count += ctr->local_timeout_count;
+               counters->local_error_count += ctr->local_error_count;
+               counters->remote_dropped_count += ctr->remote_dropped_count;
+               counters->remote_error_count += ctr->remote_error_count;
+               counters->remote_timeout_count += ctr->remote_timeout_count;
+               counters->network_timeout_count += ctr->network_timeout_count;
                counters->send_count   += ctr->send_count;
                counters->recv_count   += ctr->recv_count;
                counters->route_count  += ctr->route_count;
@@ -779,6 +988,8 @@ lnet_prepare(lnet_pid_t requested_pid)
        INIT_LIST_HEAD(&the_lnet.ln_dc_request);
        INIT_LIST_HEAD(&the_lnet.ln_dc_working);
        INIT_LIST_HEAD(&the_lnet.ln_dc_expired);
+       INIT_LIST_HEAD(&the_lnet.ln_mt_localNIRecovq);
+       INIT_LIST_HEAD(&the_lnet.ln_mt_peerNIRecovq);
        init_waitqueue_head(&the_lnet.ln_dc_waitq);
 
        rc = lnet_descriptor_setup();
@@ -1024,16 +1235,6 @@ lnet_islocalnet(__u32 net_id)
        return local;
 }
 
-bool
-lnet_is_ni_healthy_locked(struct lnet_ni *ni)
-{
-       if (ni->ni_state == LNET_NI_STATE_ACTIVE ||
-           ni->ni_state == LNET_NI_STATE_DEGRADED)
-               return true;
-
-       return false;
-}
-
 struct lnet_ni  *
 lnet_nid2ni_locked(lnet_nid_t nid, int cpt)
 {
@@ -1615,7 +1816,7 @@ lnet_clear_zombies_nis_locked(struct lnet_net *net)
                list_del_init(&ni->ni_netlist);
                /* the ni should be in deleting state. If it's not it's
                 * a bug */
-               LASSERT(ni->ni_state == LNET_NI_STATE_DELETING);
+               LASSERT(ni->ni_state & LNET_NI_STATE_DELETING);
                cfs_percpt_for_each(ref, j, ni->ni_refs) {
                        if (*ref == 0)
                                continue;
@@ -1663,7 +1864,10 @@ lnet_shutdown_lndni(struct lnet_ni *ni)
        struct lnet_net *net = ni->ni_net;
 
        lnet_net_lock(LNET_LOCK_EX);
-       ni->ni_state = LNET_NI_STATE_DELETING;
+       lnet_ni_lock(ni);
+       ni->ni_state |= LNET_NI_STATE_DELETING;
+       ni->ni_state &= ~LNET_NI_STATE_ACTIVE;
+       lnet_ni_unlock(ni);
        lnet_ni_unlink_locked(ni);
        lnet_incr_dlc_seq();
        lnet_net_unlock(LNET_LOCK_EX);
@@ -1762,6 +1966,7 @@ lnet_shutdown_lndnets(void)
 
        list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
                list_del_init(&msg->msg_list);
+               msg->msg_no_resend = true;
                lnet_finalize(msg, -ECANCELED);
        }
 
@@ -1798,7 +2003,10 @@ lnet_startup_lndni(struct lnet_ni *ni, struct lnet_lnd_tunables *tun)
                goto failed0;
        }
 
-       ni->ni_state = LNET_NI_STATE_ACTIVE;
+       lnet_ni_lock(ni);
+       ni->ni_state |= LNET_NI_STATE_ACTIVE;
+       ni->ni_state &= ~LNET_NI_STATE_INIT;
+       lnet_ni_unlock(ni);
 
        /* We keep a reference on the loopback net through the loopback NI */
        if (net->net_lnd->lnd_type == LOLND) {
@@ -1833,6 +2041,7 @@ lnet_startup_lndni(struct lnet_ni *ni, struct lnet_lnd_tunables *tun)
 
        atomic_set(&ni->ni_tx_credits,
                   lnet_ni_tq_credits(ni) * ni->ni_ncpts);
+       atomic_set(&ni->ni_healthv, LNET_MAX_HEALTH_VALUE);
 
        CDEBUG(D_LNI, "Added LNI %s [%d/%d/%d/%d]\n",
                libcfs_nid2str(ni->ni_nid),
@@ -2245,13 +2454,13 @@ LNetNIInit(lnet_pid_t requested_pid)
 
        lnet_ping_target_update(pbuf, ping_mdh);
 
-       rc = lnet_router_checker_start();
+       rc = lnet_monitor_thr_start();
        if (rc != 0)
                goto err_stop_ping;
 
        rc = lnet_push_target_init();
        if (rc != 0)
-               goto err_stop_router_checker;
+               goto err_stop_monitor_thr;
 
        rc = lnet_peer_discovery_start();
        if (rc != 0)
@@ -2266,8 +2475,8 @@ LNetNIInit(lnet_pid_t requested_pid)
 
 err_destroy_push_target:
        lnet_push_target_fini();
-err_stop_router_checker:
-       lnet_router_checker_stop();
+err_stop_monitor_thr:
+       lnet_monitor_thr_stop();
 err_stop_ping:
        lnet_ping_target_fini();
 err_acceptor_stop:
@@ -2319,7 +2528,7 @@ LNetNIFini()
                lnet_router_debugfs_init();
                lnet_peer_discovery_stop();
                lnet_push_target_fini();
-               lnet_router_checker_stop();
+               lnet_monitor_thr_stop();
                lnet_ping_target_fini();
 
                /* Teardown fns that use my own API functions BEFORE here */
@@ -2520,10 +2729,17 @@ lnet_get_next_ni_locked(struct lnet_net *mynet, struct lnet_ni *prev)
        struct lnet_ni          *ni;
        struct lnet_net         *net = mynet;
 
+       /*
+        * It is possible that the net has been cleaned out while there is
+        * a message being sent. This function accessed the net without
+        * checking if the list is empty
+        */
        if (prev == NULL) {
                if (net == NULL)
                        net = list_entry(the_lnet.ln_nets.next, struct lnet_net,
                                        net_list);
+               if (list_empty(&net->net_ni_list))
+                       return NULL;
                ni = list_entry(net->net_ni_list.next, struct lnet_ni,
                                ni_netlist);
 
@@ -2545,6 +2761,8 @@ lnet_get_next_ni_locked(struct lnet_net *mynet, struct lnet_ni *prev)
                /* get the next net */
                net = list_entry(prev->ni_net->net_list.next, struct lnet_net,
                                 net_list);
+               if (list_empty(&net->net_ni_list))
+                       return NULL;
                /* get the ni on it */
                ni = list_entry(net->net_ni_list.next, struct lnet_ni,
                                ni_netlist);
@@ -2552,6 +2770,9 @@ lnet_get_next_ni_locked(struct lnet_net *mynet, struct lnet_ni *prev)
                return ni;
        }
 
+       if (list_empty(&prev->ni_netlist))
+               return NULL;
+
        /* there are more nis left */
        ni = list_entry(prev->ni_netlist.next, struct lnet_ni, ni_netlist);
 
@@ -3024,6 +3245,102 @@ __u32 lnet_get_dlc_seq_locked(void)
        return atomic_read(&lnet_dlc_seq_no);
 }
 
+static void
+lnet_ni_set_healthv(lnet_nid_t nid, int value, bool all)
+{
+       struct lnet_net *net;
+       struct lnet_ni *ni;
+
+       lnet_net_lock(LNET_LOCK_EX);
+       list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
+               list_for_each_entry(ni, &net->net_ni_list, ni_netlist) {
+                       if (ni->ni_nid == nid || all) {
+                               atomic_set(&ni->ni_healthv, value);
+                               if (list_empty(&ni->ni_recovery) &&
+                                   value < LNET_MAX_HEALTH_VALUE) {
+                                       CERROR("manually adding local NI %s to recovery\n",
+                                              libcfs_nid2str(ni->ni_nid));
+                                       list_add_tail(&ni->ni_recovery,
+                                                     &the_lnet.ln_mt_localNIRecovq);
+                                       lnet_ni_addref_locked(ni, 0);
+                               }
+                               if (!all) {
+                                       lnet_net_unlock(LNET_LOCK_EX);
+                                       return;
+                               }
+                       }
+               }
+       }
+       lnet_net_unlock(LNET_LOCK_EX);
+}
+
+static int
+lnet_get_local_ni_hstats(struct lnet_ioctl_local_ni_hstats *stats)
+{
+       int cpt, rc = 0;
+       struct lnet_ni *ni;
+       lnet_nid_t nid = stats->hlni_nid;
+
+       cpt = lnet_net_lock_current();
+       ni = lnet_nid2ni_locked(nid, cpt);
+
+       if (!ni) {
+               rc = -ENOENT;
+               goto unlock;
+       }
+
+       stats->hlni_local_interrupt = atomic_read(&ni->ni_hstats.hlt_local_interrupt);
+       stats->hlni_local_dropped = atomic_read(&ni->ni_hstats.hlt_local_dropped);
+       stats->hlni_local_aborted = atomic_read(&ni->ni_hstats.hlt_local_aborted);
+       stats->hlni_local_no_route = atomic_read(&ni->ni_hstats.hlt_local_no_route);
+       stats->hlni_local_timeout = atomic_read(&ni->ni_hstats.hlt_local_timeout);
+       stats->hlni_local_error = atomic_read(&ni->ni_hstats.hlt_local_error);
+       stats->hlni_health_value = atomic_read(&ni->ni_healthv);
+
+unlock:
+       lnet_net_unlock(cpt);
+
+       return rc;
+}
+
+static int
+lnet_get_local_ni_recovery_list(struct lnet_ioctl_recovery_list *list)
+{
+       struct lnet_ni *ni;
+       int i = 0;
+
+       lnet_net_lock(LNET_LOCK_EX);
+       list_for_each_entry(ni, &the_lnet.ln_mt_localNIRecovq, ni_recovery) {
+               list->rlst_nid_array[i] = ni->ni_nid;
+               i++;
+               if (i >= LNET_MAX_SHOW_NUM_NID)
+                       break;
+       }
+       lnet_net_unlock(LNET_LOCK_EX);
+       list->rlst_num_nids = i;
+
+       return 0;
+}
+
+static int
+lnet_get_peer_ni_recovery_list(struct lnet_ioctl_recovery_list *list)
+{
+       struct lnet_peer_ni *lpni;
+       int i = 0;
+
+       lnet_net_lock(LNET_LOCK_EX);
+       list_for_each_entry(lpni, &the_lnet.ln_mt_peerNIRecovq, lpni_recovery) {
+               list->rlst_nid_array[i] = lpni->lpni_nid;
+               i++;
+               if (i >= LNET_MAX_SHOW_NUM_NID)
+                       break;
+       }
+       lnet_net_unlock(LNET_LOCK_EX);
+       list->rlst_num_nids = i;
+
+       return 0;
+}
+
 /**
  * LNet ioctl handler.
  *
@@ -3234,6 +3551,33 @@ LNetCtl(unsigned int cmd, void *arg)
                return rc;
        }
 
+       case IOC_LIBCFS_GET_LOCAL_HSTATS: {
+               struct lnet_ioctl_local_ni_hstats *stats = arg;
+
+               if (stats->hlni_hdr.ioc_len < sizeof(*stats))
+                       return -EINVAL;
+
+               mutex_lock(&the_lnet.ln_api_mutex);
+               rc = lnet_get_local_ni_hstats(stats);
+               mutex_unlock(&the_lnet.ln_api_mutex);
+
+               return rc;
+       }
+
+       case IOC_LIBCFS_GET_RECOVERY_QUEUE: {
+               struct lnet_ioctl_recovery_list *list = arg;
+               if (list->rlst_hdr.ioc_len < sizeof(*list))
+                       return -EINVAL;
+
+               mutex_lock(&the_lnet.ln_api_mutex);
+               if (list->rlst_type == LNET_HEALTH_TYPE_LOCAL_NI)
+                       rc = lnet_get_local_ni_recovery_list(list);
+               else
+                       rc = lnet_get_peer_ni_recovery_list(list);
+               mutex_unlock(&the_lnet.ln_api_mutex);
+               return rc;
+       }
+
        case IOC_LIBCFS_ADD_PEER_NI: {
                struct lnet_ioctl_peer_cfg *cfg = arg;
 
@@ -3309,6 +3653,29 @@ LNetCtl(unsigned int cmd, void *arg)
                return rc;
        }
 
+       case IOC_LIBCFS_SET_HEALHV: {
+               struct lnet_ioctl_reset_health_cfg *cfg = arg;
+               int value;
+               if (cfg->rh_hdr.ioc_len < sizeof(*cfg))
+                       return -EINVAL;
+               if (cfg->rh_value < 0 ||
+                   cfg->rh_value > LNET_MAX_HEALTH_VALUE)
+                       value = LNET_MAX_HEALTH_VALUE;
+               else
+                       value = cfg->rh_value;
+               CDEBUG(D_NET, "Manually setting healthv to %d for %s:%s. all = %d\n",
+                      value, (cfg->rh_type == LNET_HEALTH_TYPE_LOCAL_NI) ?
+                      "local" : "peer", libcfs_nid2str(cfg->rh_nid), cfg->rh_all);
+               mutex_lock(&the_lnet.ln_api_mutex);
+               if (cfg->rh_type == LNET_HEALTH_TYPE_LOCAL_NI)
+                       lnet_ni_set_healthv(cfg->rh_nid, value,
+                                            cfg->rh_all);
+               else
+                       lnet_peer_ni_set_healthv(cfg->rh_nid, value,
+                                                 cfg->rh_all);
+               mutex_unlock(&the_lnet.ln_api_mutex);
+       }
+
        case IOC_LIBCFS_NOTIFY_ROUTER: {
                time64_t deadline = ktime_get_real_seconds() - data->ioc_u64[0];
 
@@ -3568,7 +3935,7 @@ static int lnet_ping(struct lnet_process_id id, signed long timeout,
 
        rc = LNetGet(LNET_NID_ANY, mdh, id,
                     LNET_RESERVED_PORTAL,
-                    LNET_PROTO_PING_MATCHBITS, 0);
+                    LNET_PROTO_PING_MATCHBITS, 0, false);
 
        if (rc != 0) {
                /* Don't CERROR; this could be deliberate! */
index 57f63fa..70240ad 100644 (file)
@@ -458,6 +458,7 @@ lnet_ni_alloc_common(struct lnet_net *net, char *iface)
        spin_lock_init(&ni->ni_lock);
        INIT_LIST_HEAD(&ni->ni_cptlist);
        INIT_LIST_HEAD(&ni->ni_netlist);
+       INIT_LIST_HEAD(&ni->ni_recovery);
        ni->ni_refs = cfs_percpt_alloc(lnet_cpt_table(),
                                       sizeof(*ni->ni_refs[0]));
        if (ni->ni_refs == NULL)
@@ -482,7 +483,7 @@ lnet_ni_alloc_common(struct lnet_net *net, char *iface)
                ni->ni_net_ns = NULL;
 
        ni->ni_last_alive = ktime_get_real_seconds();
-       ni->ni_state = LNET_NI_STATE_INIT;
+       ni->ni_state |= LNET_NI_STATE_INIT;
        list_add_tail(&ni->ni_netlist, &net->net_ni_added);
 
        /*
index e0014c2..a89fa41 100644 (file)
@@ -46,6 +46,23 @@ static int local_nid_dist_zero = 1;
 module_param(local_nid_dist_zero, int, 0444);
 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
 
+struct lnet_send_data {
+       struct lnet_ni *sd_best_ni;
+       struct lnet_peer_ni *sd_best_lpni;
+       struct lnet_peer_ni *sd_final_dst_lpni;
+       struct lnet_peer *sd_peer;
+       struct lnet_peer *sd_gw_peer;
+       struct lnet_peer_ni *sd_gw_lpni;
+       struct lnet_peer_net *sd_peer_net;
+       struct lnet_msg *sd_msg;
+       lnet_nid_t sd_dst_nid;
+       lnet_nid_t sd_src_nid;
+       lnet_nid_t sd_rtr_nid;
+       int sd_cpt;
+       int sd_md_cpt;
+       __u32 sd_send_case;
+};
+
 static inline struct lnet_comm_count *
 get_stats_counts(struct lnet_element_stats *stats,
                 enum lnet_stats_type stats_type)
@@ -740,15 +757,17 @@ static void
 lnet_ni_send(struct lnet_ni *ni, struct lnet_msg *msg)
 {
        void   *priv = msg->msg_private;
-       int     rc;
+       int rc;
 
        LASSERT (!in_interrupt ());
        LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
                 (msg->msg_txcredit && msg->msg_peertxcredit));
 
        rc = (ni->ni_net->net_lnd->lnd_send)(ni, priv, msg);
-       if (rc < 0)
+       if (rc < 0) {
+               msg->msg_no_resend = true;
                lnet_finalize(msg, rc);
+       }
 }
 
 static int
@@ -929,8 +948,10 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
 
                CNETERR("Dropping message for %s: peer not alive\n",
                        libcfs_id2str(msg->msg_target));
-               if (do_send)
+               if (do_send) {
+                       msg->msg_health_status = LNET_MSG_STATUS_LOCAL_DROPPED;
                        lnet_finalize(msg, -EHOSTUNREACH);
+               }
 
                lnet_net_lock(cpt);
                return -EHOSTUNREACH;
@@ -943,8 +964,10 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
                CNETERR("Aborting message for %s: LNetM[DE]Unlink() already "
                        "called on the MD/ME.\n",
                        libcfs_id2str(msg->msg_target));
-               if (do_send)
+               if (do_send) {
+                       msg->msg_no_resend = true;
                        lnet_finalize(msg, -ECANCELED);
+               }
 
                lnet_net_lock(cpt);
                return -ECANCELED;
@@ -989,6 +1012,9 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
                }
        }
 
+       /* unset the tx_delay flag as we're going to send it now */
+       msg->msg_tx_delayed = 0;
+
        if (do_send) {
                lnet_net_unlock(cpt);
                lnet_ni_send(ni, msg);
@@ -1084,6 +1110,9 @@ lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
        msg->msg_niov = rbp->rbp_npages;
        msg->msg_kiov = &rb->rb_kiov[0];
 
+       /* unset the msg-rx_delayed flag since we're receiving the message */
+       msg->msg_rx_delayed = 0;
+
        if (do_recv) {
                int cpt = msg->msg_rx_cpt;
 
@@ -1183,15 +1212,6 @@ lnet_return_tx_credits_locked(struct lnet_msg *msg)
        }
 
        if (txpeer != NULL) {
-               /*
-                * TODO:
-                * Once the patch for the health comes in we need to set
-                * the health of the peer ni to bad when we fail to send
-                * a message.
-                * int status = msg->msg_ev.status;
-                * if (status != 0)
-                *      lnet_set_peer_ni_health_locked(txpeer, false)
-                */
                msg->msg_txpeer = NULL;
                lnet_peer_ni_decref_locked(txpeer);
        }
@@ -1223,6 +1243,7 @@ lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
                lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
                             0, 0, 0, msg->msg_hdr.payload_length);
                list_del_init(&msg->msg_list);
+               msg->msg_no_resend = true;
                lnet_finalize(msg, -ECANCELED);
        }
 
@@ -1369,7 +1390,7 @@ lnet_compare_routes(struct lnet_route *r1, struct lnet_route *r2)
 }
 
 static struct lnet_peer_ni *
-lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
+lnet_find_route_locked(struct lnet_net *net, __u32 remote_net,
                       lnet_nid_t rtr_nid)
 {
        struct lnet_remotenet   *rnet;
@@ -1383,7 +1404,7 @@ lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
        /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
         * rtr_nid nid, otherwise find the best gateway I can use */
 
-       rnet = lnet_find_rnet_locked(LNET_NIDNET(target));
+       rnet = lnet_find_rnet_locked(remote_net);
        if (rnet == NULL)
                return NULL;
 
@@ -1428,30 +1449,42 @@ lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
 }
 
 static struct lnet_ni *
-lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *cur_ni,
+lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *best_ni,
+                struct lnet_peer *peer, struct lnet_peer_net *peer_net,
                 int md_cpt)
 {
-       struct lnet_ni *ni = NULL, *best_ni = cur_ni;
+       struct lnet_ni *ni = NULL;
        unsigned int shortest_distance;
        int best_credits;
+       int best_healthv;
+
+       /*
+        * If there is no peer_ni that we can send to on this network,
+        * then there is no point in looking for a new best_ni here.
+       */
+       if (!lnet_get_next_peer_ni_locked(peer, peer_net, NULL))
+               return best_ni;
 
        if (best_ni == NULL) {
                shortest_distance = UINT_MAX;
                best_credits = INT_MIN;
+               best_healthv = 0;
        } else {
                shortest_distance = cfs_cpt_distance(lnet_cpt_table(), md_cpt,
                                                     best_ni->ni_dev_cpt);
                best_credits = atomic_read(&best_ni->ni_tx_credits);
+               best_healthv = atomic_read(&best_ni->ni_healthv);
        }
 
        while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
                unsigned int distance;
                int ni_credits;
-
-               if (!lnet_is_ni_healthy_locked(ni))
-                       continue;
+               int ni_healthv;
+               int ni_fatal;
 
                ni_credits = atomic_read(&ni->ni_tx_credits);
+               ni_healthv = atomic_read(&ni->ni_healthv);
+               ni_fatal = atomic_read(&ni->ni_fatal_error_on);
 
                /*
                 * calculate the distance from the CPT on which
@@ -1462,6 +1495,12 @@ lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *cur_ni,
                                            md_cpt,
                                            ni->ni_dev_cpt);
 
+               CDEBUG(D_NET, "compare ni %s [c:%d, d:%d, s:%d] with best_ni %s [c:%d, d:%d, s:%d]\n",
+                      libcfs_nid2str(ni->ni_nid), ni_credits, distance,
+                      ni->ni_seq, (best_ni) ? libcfs_nid2str(best_ni->ni_nid)
+                       : "not seleced", best_credits, shortest_distance,
+                       (best_ni) ? best_ni->ni_seq : 0);
+
                /*
                 * All distances smaller than the NUMA range
                 * are treated equally.
@@ -1470,23 +1509,41 @@ lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *cur_ni,
                        distance = lnet_numa_range;
 
                /*
-                * Select on shorter distance, then available
+                * Select on health, shorter distance, available
                 * credits, then round-robin.
                 */
-               if (distance > shortest_distance) {
+               if (ni_fatal) {
+                       continue;
+               } else if (ni_healthv < best_healthv) {
+                       continue;
+               } else if (ni_healthv > best_healthv) {
+                       best_healthv = ni_healthv;
+                       /*
+                        * If we're going to prefer this ni because it's
+                        * the healthiest, then we should set the
+                        * shortest_distance in the algorithm in case
+                        * there are multiple NIs with the same health but
+                        * different distances.
+                        */
+                       if (distance < shortest_distance)
+                               shortest_distance = distance;
+               } else if (distance > shortest_distance) {
                        continue;
                } else if (distance < shortest_distance) {
                        shortest_distance = distance;
                } else if (ni_credits < best_credits) {
                        continue;
                } else if (ni_credits == best_credits) {
-                       if (best_ni && (best_ni)->ni_seq <= ni->ni_seq)
+                       if (best_ni && best_ni->ni_seq <= ni->ni_seq)
                                continue;
                }
                best_ni = ni;
                best_credits = ni_credits;
        }
 
+       CDEBUG(D_NET, "selected best_ni %s\n",
+              (best_ni) ? libcfs_nid2str(best_ni->ni_nid) : "no selection");
+
        return best_ni;
 }
 
@@ -1511,422 +1568,150 @@ lnet_msg_discovery(struct lnet_msg *msg)
        return false;
 }
 
+#define SRC_SPEC       0x0001
+#define SRC_ANY                0x0002
+#define LOCAL_DST      0x0004
+#define REMOTE_DST     0x0008
+#define MR_DST         0x0010
+#define NMR_DST                0x0020
+#define SND_RESP       0x0040
+
+/* The following to defines are used for return codes */
+#define REPEAT_SEND    0x1000
+#define PASS_THROUGH   0x2000
+
+/* The different cases lnet_select pathway needs to handle */
+#define SRC_SPEC_LOCAL_MR_DST  (SRC_SPEC | LOCAL_DST | MR_DST)
+#define SRC_SPEC_ROUTER_MR_DST (SRC_SPEC | REMOTE_DST | MR_DST)
+#define SRC_SPEC_LOCAL_NMR_DST (SRC_SPEC | LOCAL_DST | NMR_DST)
+#define SRC_SPEC_ROUTER_NMR_DST        (SRC_SPEC | REMOTE_DST | NMR_DST)
+#define SRC_ANY_LOCAL_MR_DST   (SRC_ANY | LOCAL_DST | MR_DST)
+#define SRC_ANY_ROUTER_MR_DST  (SRC_ANY | REMOTE_DST | MR_DST)
+#define SRC_ANY_LOCAL_NMR_DST  (SRC_ANY | LOCAL_DST | NMR_DST)
+#define SRC_ANY_ROUTER_NMR_DST (SRC_ANY | REMOTE_DST | NMR_DST)
+
 static int
-lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
-                   struct lnet_msg *msg, lnet_nid_t rtr_nid)
+lnet_handle_send(struct lnet_send_data *sd)
 {
-       struct lnet_ni          *best_ni;
-       struct lnet_peer_ni     *best_lpni;
-       struct lnet_peer_ni     *best_gw;
-       struct lnet_peer_ni     *lpni;
-       struct lnet_peer_ni     *final_dst;
-       struct lnet_peer        *peer;
-       struct lnet_peer_net    *peer_net;
-       struct lnet_net         *local_net;
-       int                     cpt, cpt2, rc;
-       bool                    routing;
-       bool                    routing2;
-       bool                    ni_is_pref;
-       bool                    preferred;
-       bool                    local_found;
-       int                     best_lpni_credits;
-       int                     md_cpt;
-
-       /*
-        * get an initial CPT to use for locking. The idea here is not to
-        * serialize the calls to select_pathway, so that as many
-        * operations can run concurrently as possible. To do that we use
-        * the CPT where this call is being executed. Later on when we
-        * determine the CPT to use in lnet_message_commit, we switch the
-        * lock and check if there was any configuration change.  If none,
-        * then we proceed, if there is, then we restart the operation.
-        */
-       cpt = lnet_net_lock_current();
-
-       md_cpt = lnet_cpt_of_md(msg->msg_md, msg->msg_offset);
-       if (md_cpt == CFS_CPT_ANY)
-               md_cpt = cpt;
-
-again:
-       best_ni = NULL;
-       best_lpni = NULL;
-       best_gw = NULL;
-       final_dst = NULL;
-       local_net = NULL;
-       routing = false;
-       routing2 = false;
-       local_found = false;
+       struct lnet_ni *best_ni = sd->sd_best_ni;
+       struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
+       struct lnet_peer_ni *final_dst_lpni = sd->sd_final_dst_lpni;
+       struct lnet_msg *msg = sd->sd_msg;
+       int cpt2;
+       __u32 send_case = sd->sd_send_case;
+       int rc;
+       __u32 routing = send_case & REMOTE_DST;
 
        /*
-        * lnet_nid2peerni_locked() is the path that will find an
-        * existing peer_ni, or create one and mark it as having been
-        * created due to network traffic.
+        * Increment sequence number of the selected peer so that we
+        * pick the next one in Round Robin.
         */
-       lpni = lnet_nid2peerni_locked(dst_nid, LNET_NID_ANY, cpt);
-       if (IS_ERR(lpni)) {
-               lnet_net_unlock(cpt);
-               return PTR_ERR(lpni);
-       }
+       best_lpni->lpni_seq++;
 
        /*
-        * If we're being asked to send to the loopback interface, there
-        * is no need to go through any selection. We can just shortcut
-        * the entire process and send over lolnd
+        * grab a reference on the peer_ni so it sticks around even if
+        * we need to drop and relock the lnet_net_lock below.
         */
-       if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) {
-               lnet_peer_ni_decref_locked(lpni);
-               best_ni = the_lnet.ln_loni;
-               goto send;
-       }
+       lnet_peer_ni_addref_locked(best_lpni);
 
        /*
-        * Now that we have a peer_ni, check if we want to discover
-        * the peer. Traffic to the LNET_RESERVED_PORTAL should not
-        * trigger discovery.
+        * Use lnet_cpt_of_nid() to determine the CPT used to commit the
+        * message. This ensures that we get a CPT that is correct for
+        * the NI when the NI has been restricted to a subset of all CPTs.
+        * If the selected CPT differs from the one currently locked, we
+        * must unlock and relock the lnet_net_lock(), and then check whether
+        * the configuration has changed. We don't have a hold on the best_ni
+        * yet, and it may have vanished.
         */
-       peer = lpni->lpni_peer_net->lpn_peer;
-       if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) {
-               rc = lnet_discover_peer_locked(lpni, cpt, false);
-               if (rc) {
-                       lnet_peer_ni_decref_locked(lpni);
-                       lnet_net_unlock(cpt);
-                       return rc;
+       cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
+       if (sd->sd_cpt != cpt2) {
+               __u32 seq = lnet_get_dlc_seq_locked();
+               lnet_net_unlock(sd->sd_cpt);
+               sd->sd_cpt = cpt2;
+               lnet_net_lock(sd->sd_cpt);
+               if (seq != lnet_get_dlc_seq_locked()) {
+                       lnet_peer_ni_decref_locked(best_lpni);
+                       return REPEAT_SEND;
                }
-               /* The peer may have changed. */
-               peer = lpni->lpni_peer_net->lpn_peer;
-               /* queue message and return */
-               msg->msg_src_nid_param = src_nid;
-               msg->msg_rtr_nid_param = rtr_nid;
-               msg->msg_sending = 0;
-               list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
-               CDEBUG(D_NET, "%s pending discovery\n",
-                      libcfs_nid2str(peer->lp_primary_nid));
-               lnet_peer_ni_decref_locked(lpni);
-               lnet_net_unlock(cpt);
-
-               return LNET_DC_WAIT;
-       }
-       lnet_peer_ni_decref_locked(lpni);
-
-       /* If peer is not healthy then can not send anything to it */
-       if (!lnet_is_peer_healthy_locked(peer)) {
-               lnet_net_unlock(cpt);
-               return -EHOSTUNREACH;
        }
 
        /*
-        * STEP 1: first jab at determining best_ni
-        * if src_nid is explicitly specified, then best_ni is already
-        * pre-determiend for us. Otherwise we need to select the best
-        * one to use later on
+        * store the best_lpni in the message right away to avoid having
+        * to do the same operation under different conditions
         */
-       if (src_nid != LNET_NID_ANY) {
-               best_ni = lnet_nid2ni_locked(src_nid, cpt);
-               if (!best_ni) {
-                       lnet_net_unlock(cpt);
-                       LCONSOLE_WARN("Can't send to %s: src %s is not a "
-                                     "local nid\n", libcfs_nid2str(dst_nid),
-                                     libcfs_nid2str(src_nid));
-                       return -EINVAL;
-               }
-       }
-
-       if (msg->msg_type == LNET_MSG_REPLY ||
-           msg->msg_type == LNET_MSG_ACK ||
-           !lnet_peer_is_multi_rail(peer) ||
-           best_ni) {
-               /*
-                * for replies we want to respond on the same peer_ni we
-                * received the message on if possible. If not, then pick
-                * a peer_ni to send to
-                *
-                * if the peer is non-multi-rail then you want to send to
-                * the dst_nid provided as well.
-                *
-                * If the best_ni has already been determined, IE the
-                * src_nid has been specified, then use the
-                * destination_nid provided as well, since we're
-                * continuing a series of related messages for the same
-                * RPC.
-                *
-                * It is expected to find the lpni using dst_nid, since we
-                * created it earlier.
-                */
-               best_lpni = lnet_find_peer_ni_locked(dst_nid);
-               if (best_lpni)
-                       lnet_peer_ni_decref_locked(best_lpni);
-
-               if (best_lpni && !lnet_get_net_locked(LNET_NIDNET(dst_nid))) {
-                       /*
-                        * this lpni is not on a local network so we need
-                        * to route this reply.
-                        */
-                       best_gw = lnet_find_route_locked(NULL,
-                                                        best_lpni->lpni_nid,
-                                                        rtr_nid);
-                       if (best_gw) {
-                               /*
-                               * RULE: Each node considers only the next-hop
-                               *
-                               * We're going to route the message, so change the peer to
-                               * the router.
-                               */
-                               LASSERT(best_gw->lpni_peer_net);
-                               LASSERT(best_gw->lpni_peer_net->lpn_peer);
-                               peer = best_gw->lpni_peer_net->lpn_peer;
-
-                               /*
-                               * if the router is not multi-rail then use the best_gw
-                               * found to send the message to
-                               */
-                               if (!lnet_peer_is_multi_rail(peer))
-                                       best_lpni = best_gw;
-                               else
-                                       best_lpni = NULL;
-
-                               routing = true;
-                       } else {
-                               best_lpni = NULL;
-                       }
-               } else if (!best_lpni) {
-                       lnet_net_unlock(cpt);
-                       CERROR("unable to send msg_type %d to "
-                             "originating %s. Destination NID not in DB\n",
-                             msg->msg_type, libcfs_nid2str(dst_nid));
-                       return -EINVAL;
-               }
-       }
+       msg->msg_txpeer = best_lpni;
+       msg->msg_txni = best_ni;
 
        /*
-        * We must use a consistent source address when sending to a
-        * non-MR peer. However, a non-MR peer can have multiple NIDs
-        * on multiple networks, and we may even need to talk to this
-        * peer on multiple networks -- certain types of
-        * load-balancing configuration do this.
-        *
-        * So we need to pick the NI the peer prefers for this
-        * particular network.
+        * grab a reference for the best_ni since now it's in use in this
+        * send. The reference will be dropped in lnet_finalize()
         */
-       if (!lnet_peer_is_multi_rail(peer)) {
-               if (!best_lpni) {
-                       lnet_net_unlock(cpt);
-                       CERROR("no route to %s\n",
-                              libcfs_nid2str(dst_nid));
-                       return -EHOSTUNREACH;
-               }
-
-               /* best ni is already set if src_nid was provided */
-               if (!best_ni) {
-                       /* Get the target peer_ni */
-                       peer_net = lnet_peer_get_net_locked(peer,
-                                       LNET_NIDNET(best_lpni->lpni_nid));
-                       LASSERT(peer_net != NULL);
-                       list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
-                                           lpni_peer_nis) {
-                               if (lpni->lpni_pref_nnids == 0)
-                                       continue;
-                               LASSERT(lpni->lpni_pref_nnids == 1);
-                               best_ni = lnet_nid2ni_locked(
-                                               lpni->lpni_pref.nid, cpt);
-                               break;
-                       }
-               }
-               /* if best_ni is still not set just pick one */
-               if (!best_ni) {
-                       best_ni = lnet_net2ni_locked(
-                               best_lpni->lpni_net->net_id, cpt);
-                       /* If there is no best_ni we don't have a route */
-                       if (!best_ni) {
-                               CERROR("no path to %s from net %s\n",
-                                       libcfs_nid2str(best_lpni->lpni_nid),
-                                       libcfs_net2str(best_lpni->lpni_net->net_id));
-                               lnet_net_unlock(cpt);
-                               return -EHOSTUNREACH;
-                       }
-                       lpni = list_entry(peer_net->lpn_peer_nis.next,
-                                       struct lnet_peer_ni,
-                                       lpni_peer_nis);
-               }
-               /* Set preferred NI if necessary. */
-               if (lpni->lpni_pref_nnids == 0)
-                       lnet_peer_ni_set_non_mr_pref_nid(lpni, best_ni->ni_nid);
-       }
+       lnet_ni_addref_locked(msg->msg_txni, sd->sd_cpt);
 
        /*
-        * if we already found a best_ni because src_nid is specified and
-        * best_lpni because we are replying to a message then just send
-        * the message
+        * Always set the target.nid to the best peer picked. Either the
+        * NID will be one of the peer NIDs selected, or the same NID as
+        * what was originally set in the target or it will be the NID of
+        * a router if this message should be routed
         */
-       if (best_ni && best_lpni)
-               goto send;
+       msg->msg_target.nid = msg->msg_txpeer->lpni_nid;
 
        /*
-        * If we already found a best_ni because src_nid is specified then
-        * pick the peer then send the message
+        * lnet_msg_commit assigns the correct cpt to the message, which
+        * is used to decrement the correct refcount on the ni when it's
+        * time to return the credits
         */
-       if (best_ni)
-               goto pick_peer;
+       lnet_msg_commit(msg, sd->sd_cpt);
 
        /*
-        * pick the best_ni by going through all the possible networks of
-        * that peer and see which local NI is best suited to talk to that
-        * peer.
-        *
-        * Locally connected networks will always be preferred over
-        * a routed network. If there are only routed paths to the peer,
-        * then the best route is chosen. If all routes are equal then
-        * they are used in round robin.
+        * If we are routing the message then we keep the src_nid that was
+        * set by the originator. If we are not routing then we are the
+        * originator and set it here.
         */
-       list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
-               if (!lnet_is_peer_net_healthy_locked(peer_net))
-                       continue;
-
-               local_net = lnet_get_net_locked(peer_net->lpn_net_id);
-               if (!local_net && !routing && !local_found) {
-                       struct lnet_peer_ni *net_gw;
-
-                       lpni = list_entry(peer_net->lpn_peer_nis.next,
-                                         struct lnet_peer_ni,
-                                         lpni_peer_nis);
-
-                       net_gw = lnet_find_route_locked(NULL,
-                                                       lpni->lpni_nid,
-                                                       rtr_nid);
-                       if (!net_gw)
-                               continue;
-
-                       if (best_gw) {
-                               /*
-                                * lnet_find_route_locked() call
-                                * will return the best_Gw on the
-                                * lpni->lpni_nid network.
-                                * However, best_gw and net_gw can
-                                * be on different networks.
-                                * Therefore need to compare them
-                                * to pick the better of either.
-                                */
-                               if (lnet_compare_peers(best_gw, net_gw) > 0)
-                                       continue;
-                               if (best_gw->lpni_gw_seq <= net_gw->lpni_gw_seq)
-                                       continue;
-                       }
-                       best_gw = net_gw;
-                       final_dst = lpni;
-
-                       routing2 = true;
-               } else {
-                       best_gw = NULL;
-                       final_dst = NULL;
-                       routing2 = false;
-                       local_found = true;
-               }
-
-               /*
-                * a gw on this network is found, but there could be
-                * other better gateways on other networks. So don't pick
-                * the best_ni until we determine the best_gw.
-                */
-               if (best_gw)
-                       continue;
-
-               /* if no local_net found continue */
-               if (!local_net)
-                       continue;
+       if (!msg->msg_routing)
+               msg->msg_hdr.src_nid = cpu_to_le64(msg->msg_txni->ni_nid);
 
+       if (routing) {
+               msg->msg_target_is_router = 1;
+               msg->msg_target.pid = LNET_PID_LUSTRE;
                /*
-                * Iterate through the NIs in this local Net and select
-                * the NI to send from. The selection is determined by
-                * these 3 criterion in the following priority:
-                *      1. NUMA
-                *      2. NI available credits
-                *      3. Round Robin
+                * since we're routing we want to ensure that the
+                * msg_hdr.dest_nid is set to the final destination. When
+                * the router receives this message it knows how to route
+                * it.
+                *
+                * final_dst_lpni is set at the beginning of the
+                * lnet_select_pathway() function and is never changed.
+                * It's safe to use it here.
                 */
-               best_ni = lnet_get_best_ni(local_net, best_ni, md_cpt);
-       }
-
-       if (!best_ni && !best_gw) {
-               lnet_net_unlock(cpt);
-               LCONSOLE_WARN("No local ni found to send from to %s\n",
-                       libcfs_nid2str(dst_nid));
-               return -EINVAL;
-       }
-
-       if (!best_ni) {
-               best_ni = lnet_get_best_ni(best_gw->lpni_net, best_ni, md_cpt);
-               LASSERT(best_gw && best_ni);
-
+               msg->msg_hdr.dest_nid = cpu_to_le64(final_dst_lpni->lpni_nid);
+       } else {
                /*
-                * We're going to route the message, so change the peer to
-                * the router.
+                * if we're not routing set the dest_nid to the best peer
+                * ni NID that we picked earlier in the algorithm.
                 */
-               LASSERT(best_gw->lpni_peer_net);
-               LASSERT(best_gw->lpni_peer_net->lpn_peer);
-               best_gw->lpni_gw_seq++;
-               peer = best_gw->lpni_peer_net->lpn_peer;
+               msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
        }
 
-       /*
-        * Now that we selected the NI to use increment its sequence
-        * number so the Round Robin algorithm will detect that it has
-        * been used and pick the next NI.
-        */
-       best_ni->ni_seq++;
-
-pick_peer:
-       /*
-        * At this point the best_ni is on a local network on which
-        * the peer has a peer_ni as well
-        */
-       peer_net = lnet_peer_get_net_locked(peer,
-                                           best_ni->ni_net->net_id);
-       /*
-        * peer_net is not available or the src_nid is explicitly defined
-        * and the peer_net for that src_nid is unhealthy. find a route to
-        * the destination nid.
-        */
-       if (!peer_net ||
-           (src_nid != LNET_NID_ANY &&
-            !lnet_is_peer_net_healthy_locked(peer_net))) {
-               best_gw = lnet_find_route_locked(best_ni->ni_net,
-                                                dst_nid,
-                                                rtr_nid);
-               /*
-                * if no route is found for that network then
-                * move onto the next peer_ni in the peer
-                */
-               if (!best_gw) {
-                       LCONSOLE_WARN("No route to peer from %s\n",
-                               libcfs_nid2str(best_ni->ni_nid));
-                       lnet_net_unlock(cpt);
-                       return -EHOSTUNREACH;
-               }
+       rc = lnet_post_send_locked(msg, 0);
 
-               CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
-                       libcfs_nid2str(dst_nid),
-                       libcfs_nid2str(best_gw->lpni_nid),
-                       lnet_msgtyp2str(msg->msg_type), msg->msg_len);
+       if (!rc)
+               CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s\n",
+                      libcfs_nid2str(msg->msg_hdr.src_nid),
+                      libcfs_nid2str(msg->msg_txni->ni_nid),
+                      libcfs_nid2str(sd->sd_src_nid),
+                      libcfs_nid2str(msg->msg_hdr.dest_nid),
+                      libcfs_nid2str(sd->sd_dst_nid),
+                      libcfs_nid2str(msg->msg_txpeer->lpni_nid),
+                      lnet_msgtyp2str(msg->msg_type));
 
-               routing2 = true;
-               /*
-                * RULE: Each node considers only the next-hop
-                *
-                * We're going to route the message, so change the peer to
-                * the router.
-                */
-               LASSERT(best_gw->lpni_peer_net);
-               LASSERT(best_gw->lpni_peer_net->lpn_peer);
-               peer = best_gw->lpni_peer_net->lpn_peer;
-       } else if (!lnet_is_peer_net_healthy_locked(peer_net)) {
-               /*
-                * this peer_net is unhealthy but we still have an opportunity
-                * to find another peer_net that we can use
-                */
-               __u32 net_id = peer_net->lpn_net_id;
-               LCONSOLE_WARN("peer net %s unhealthy\n",
-                             libcfs_net2str(net_id));
-               goto again;
-       }
+       return rc;
+}
 
+static struct lnet_peer_ni *
+lnet_select_peer_ni(struct lnet_send_data *sd, struct lnet_peer *peer,
+                   struct lnet_peer_net *peer_net)
+{
        /*
         * Look at the peer NIs for the destination peer that connect
         * to the chosen net. If a peer_ni is preferred when using the
@@ -1935,22 +1720,42 @@ pick_peer:
         * the available transmit credits are used. If the transmit
         * credits are equal, we round-robin over the peer_ni.
         */
-       lpni = NULL;
-       best_lpni_credits = INT_MIN;
-       preferred = false;
-       best_lpni = NULL;
+       struct lnet_peer_ni *lpni = NULL;
+       struct lnet_peer_ni *best_lpni = NULL;
+       struct lnet_ni *best_ni = sd->sd_best_ni;
+       lnet_nid_t dst_nid = sd->sd_dst_nid;
+       int best_lpni_credits = INT_MIN;
+       bool preferred = false;
+       bool ni_is_pref;
+       int best_lpni_healthv = 0;
+       int lpni_healthv;
+
        while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
                /*
-                * if this peer ni is not healthy just skip it, no point in
-                * examining it further
+                * if the best_ni we've chosen aleady has this lpni
+                * preferred, then let's use it
                 */
-               if (!lnet_is_peer_ni_healthy_locked(lpni))
-                       continue;
                ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
                                                          best_ni->ni_nid);
 
+               lpni_healthv = atomic_read(&lpni->lpni_healthv);
+
+               CDEBUG(D_NET, "%s ni_is_pref = %d\n",
+                      libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
+
+               if (best_lpni)
+                       CDEBUG(D_NET, "%s c:[%d, %d], s:[%d, %d]\n",
+                               libcfs_nid2str(lpni->lpni_nid),
+                               lpni->lpni_txcredits, best_lpni_credits,
+                               lpni->lpni_seq, best_lpni->lpni_seq);
+
+               /* pick the healthiest peer ni */
+               if (lpni_healthv < best_lpni_healthv) {
+                       continue;
+               } else if (lpni_healthv > best_lpni_healthv) {
+                       best_lpni_healthv = lpni_healthv;
                /* if this is a preferred peer use it */
-               if (!preferred && ni_is_pref) {
+               } else if (!preferred && ni_is_pref) {
                        preferred = true;
                } else if (preferred && !ni_is_pref) {
                        /*
@@ -1986,167 +1791,1798 @@ pick_peer:
        if (!best_lpni) {
                __u32 net_id = (peer_net) ? peer_net->lpn_net_id :
                        LNET_NIDNET(dst_nid);
-               lnet_net_unlock(cpt);
-               LCONSOLE_WARN("no peer_ni found on peer net %s\n",
+               CDEBUG(D_NET, "no peer_ni found on peer net %s\n",
                                libcfs_net2str(net_id));
-               return -EHOSTUNREACH;
+               return NULL;
        }
 
+       CDEBUG(D_NET, "sd_best_lpni = %s\n",
+              libcfs_nid2str(best_lpni->lpni_nid));
 
-send:
-       /* Shortcut for loopback. */
-       if (best_ni == the_lnet.ln_loni) {
-               /* No send credit hassles with LOLND */
-               lnet_ni_addref_locked(best_ni, cpt);
-               msg->msg_hdr.dest_nid = cpu_to_le64(best_ni->ni_nid);
-               if (!msg->msg_routing)
-                       msg->msg_hdr.src_nid = cpu_to_le64(best_ni->ni_nid);
-               msg->msg_target.nid = best_ni->ni_nid;
-               lnet_msg_commit(msg, cpt);
-               msg->msg_txni = best_ni;
-               lnet_net_unlock(cpt);
+       return best_lpni;
+}
 
-               return LNET_CREDIT_OK;
-       }
+/*
+ * Prerequisite: the best_ni should already be set in the sd
+ */
+static inline struct lnet_peer_ni *
+lnet_find_best_lpni_on_net(struct lnet_send_data *sd, struct lnet_peer *peer,
+                          __u32 net_id)
+{
+       struct lnet_peer_net *peer_net;
 
-       routing = routing || routing2;
+       /*
+        * The gateway is Multi-Rail capable so now we must select the
+        * proper peer_ni
+        */
+       peer_net = lnet_peer_get_net_locked(peer, net_id);
+
+       if (!peer_net) {
+               CERROR("gateway peer %s has no NI on net %s\n",
+                      libcfs_nid2str(peer->lp_primary_nid),
+                      libcfs_net2str(net_id));
+               return NULL;
+       }
+
+       return lnet_select_peer_ni(sd, peer, peer_net);
+}
+
+static inline void
+lnet_set_non_mr_pref_nid(struct lnet_send_data *sd)
+{
+       if (sd->sd_send_case & NMR_DST &&
+           sd->sd_msg->msg_type != LNET_MSG_REPLY &&
+           sd->sd_msg->msg_type != LNET_MSG_ACK &&
+           sd->sd_best_lpni->lpni_pref_nnids == 0) {
+               CDEBUG(D_NET, "Setting preferred local NID %s on NMR peer %s\n",
+                      libcfs_nid2str(sd->sd_best_ni->ni_nid),
+                      libcfs_nid2str(sd->sd_best_lpni->lpni_nid));
+               lnet_peer_ni_set_non_mr_pref_nid(sd->sd_best_lpni,
+                                                sd->sd_best_ni->ni_nid);
+       }
+}
+
+/*
+ * Source Specified
+ * Local Destination
+ * non-mr peer
+ *
+ * use the source and destination NIDs as the pathway
+ */
+static int
+lnet_handle_spec_local_nmr_dst(struct lnet_send_data *sd)
+{
+       /* the destination lpni is set before we get here. */
+
+       /* find local NI */
+       sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
+       if (!sd->sd_best_ni) {
+               CERROR("Can't send to %s: src %s is not a "
+                      "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
+                               libcfs_nid2str(sd->sd_src_nid));
+               return -EINVAL;
+       }
+
+       /*
+        * the preferred NID will only be set for NMR peers
+        */
+       lnet_set_non_mr_pref_nid(sd);
+
+       return lnet_handle_send(sd);
+}
+
+/*
+ * Source Specified
+ * Local Destination
+ * MR Peer
+ *
+ * Run the selection algorithm on the peer NIs unless we're sending
+ * a response, in this case just send to the destination
+ */
+static int
+lnet_handle_spec_local_mr_dst(struct lnet_send_data *sd)
+{
+       sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
+       if (!sd->sd_best_ni) {
+               CERROR("Can't send to %s: src %s is not a "
+                      "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
+                               libcfs_nid2str(sd->sd_src_nid));
+               return -EINVAL;
+       }
+
+       /*
+        * only run the selection algorithm to pick the peer_ni if we're
+        * sending a GET or a PUT. Responses are sent to the same
+        * destination NID provided.
+        */
+       if (!(sd->sd_send_case & SND_RESP)) {
+               sd->sd_best_lpni =
+                 lnet_find_best_lpni_on_net(sd, sd->sd_peer,
+                                            sd->sd_best_ni->ni_net->net_id);
+       }
+
+       if (sd->sd_best_lpni)
+               return lnet_handle_send(sd);
+
+       CERROR("can't send to %s. no NI on %s\n",
+              libcfs_nid2str(sd->sd_dst_nid),
+              libcfs_net2str(sd->sd_best_ni->ni_net->net_id));
+
+       return -EHOSTUNREACH;
+}
+
+struct lnet_ni *
+lnet_find_best_ni_on_spec_net(struct lnet_ni *cur_best_ni,
+                             struct lnet_peer *peer,
+                             struct lnet_peer_net *peer_net,
+                             int cpt,
+                             bool incr_seq)
+{
+       struct lnet_net *local_net;
+       struct lnet_ni *best_ni;
+
+       local_net = lnet_get_net_locked(peer_net->lpn_net_id);
+       if (!local_net)
+               return NULL;
+
+       /*
+        * Iterate through the NIs in this local Net and select
+        * the NI to send from. The selection is determined by
+        * these 3 criterion in the following priority:
+        *      1. NUMA
+        *      2. NI available credits
+        *      3. Round Robin
+        */
+       best_ni = lnet_get_best_ni(local_net, cur_best_ni,
+                                  peer, peer_net, cpt);
+
+       if (incr_seq && best_ni)
+               best_ni->ni_seq++;
+
+       return best_ni;
+}
+
+static int
+lnet_handle_find_routed_path(struct lnet_send_data *sd,
+                            lnet_nid_t dst_nid,
+                            struct lnet_peer_ni **gw_lpni,
+                            struct lnet_peer **gw_peer)
+{
+       struct lnet_peer_ni *gw;
+       lnet_nid_t src_nid = sd->sd_src_nid;
+
+       gw = lnet_find_route_locked(NULL, LNET_NIDNET(dst_nid),
+                                   sd->sd_rtr_nid);
+       if (!gw) {
+               CERROR("no route to %s from %s\n",
+                      libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
+               return -EHOSTUNREACH;
+       }
+
+       /* get the peer of the gw_ni */
+       LASSERT(gw->lpni_peer_net);
+       LASSERT(gw->lpni_peer_net->lpn_peer);
+
+       *gw_peer = gw->lpni_peer_net->lpn_peer;
+
+       if (!sd->sd_best_ni)
+               sd->sd_best_ni = lnet_find_best_ni_on_spec_net(NULL, *gw_peer,
+                                       gw->lpni_peer_net,
+                                       sd->sd_md_cpt,
+                                       true);
+
+       if (!sd->sd_best_ni) {
+               CERROR("Internal Error. Expected local ni on %s "
+                      "but non found :%s\n",
+                      libcfs_net2str(gw->lpni_peer_net->lpn_net_id),
+                      libcfs_nid2str(sd->sd_src_nid));
+               return -EFAULT;
+       }
+
+       /*
+        * if gw is MR let's find its best peer_ni
+        */
+       if (lnet_peer_is_multi_rail(*gw_peer)) {
+               gw = lnet_find_best_lpni_on_net(sd, *gw_peer,
+                                               sd->sd_best_ni->ni_net->net_id);
+               /*
+                * We've already verified that the gw has an NI on that
+                * desired net, but we're not finding it. Something is
+                * wrong.
+                */
+               if (!gw) {
+                       CERROR("Internal Error. Route expected to %s from %s\n",
+                               libcfs_nid2str(dst_nid),
+                               libcfs_nid2str(src_nid));
+                       return -EFAULT;
+               }
+       }
+
+       *gw_lpni = gw;
+
+       return 0;
+}
+
+/*
+ * Handle two cases:
+ *
+ * Case 1:
+ *  Source specified
+ *  Remote destination
+ *  Non-MR destination
+ *
+ * Case 2:
+ *  Source specified
+ *  Remote destination
+ *  MR destination
+ *
+ * The handling of these two cases is similar. Even though the destination
+ * can be MR or non-MR, we'll deal directly with the router.
+ */
+static int
+lnet_handle_spec_router_dst(struct lnet_send_data *sd)
+{
+       int rc;
+       struct lnet_peer_ni *gw_lpni = NULL;
+       struct lnet_peer *gw_peer = NULL;
+
+       /* find local NI */
+       sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
+       if (!sd->sd_best_ni) {
+               CERROR("Can't send to %s: src %s is not a "
+                      "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
+                               libcfs_nid2str(sd->sd_src_nid));
+               return -EINVAL;
+       }
+
+       rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
+                                    &gw_peer);
+       if (rc < 0)
+               return rc;
+
+       if (sd->sd_send_case & NMR_DST)
+               /*
+               * since the final destination is non-MR let's set its preferred
+               * NID before we send
+               */
+               lnet_set_non_mr_pref_nid(sd);
+
+       /*
+        * We're going to send to the gw found so let's set its
+        * info
+        */
+       sd->sd_peer = gw_peer;
+       sd->sd_best_lpni = gw_lpni;
+
+       return lnet_handle_send(sd);
+}
+
+struct lnet_ni *
+lnet_find_best_ni_on_local_net(struct lnet_peer *peer, int md_cpt)
+{
+       struct lnet_peer_net *peer_net = NULL;
+       struct lnet_ni *best_ni = NULL;
+
+       /*
+        * The peer can have multiple interfaces, some of them can be on
+        * the local network and others on a routed network. We should
+        * prefer the local network. However if the local network is not
+        * available then we need to try the routed network
+        */
+
+       /* go through all the peer nets and find the best_ni */
+       list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
+               /*
+                * The peer's list of nets can contain non-local nets. We
+                * want to only examine the local ones.
+                */
+               if (!lnet_get_net_locked(peer_net->lpn_net_id))
+                       continue;
+               best_ni = lnet_find_best_ni_on_spec_net(best_ni, peer,
+                                                  peer_net, md_cpt, false);
+       }
+
+       if (best_ni)
+               /* increment sequence number so we can round robin */
+               best_ni->ni_seq++;
+
+       return best_ni;
+}
+
+static struct lnet_ni *
+lnet_find_existing_preferred_best_ni(struct lnet_send_data *sd)
+{
+       struct lnet_ni *best_ni = NULL;
+       struct lnet_peer_net *peer_net;
+       struct lnet_peer *peer = sd->sd_peer;
+       struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
+       struct lnet_peer_ni *lpni;
+       int cpt = sd->sd_cpt;
+
+       /*
+        * We must use a consistent source address when sending to a
+        * non-MR peer. However, a non-MR peer can have multiple NIDs
+        * on multiple networks, and we may even need to talk to this
+        * peer on multiple networks -- certain types of
+        * load-balancing configuration do this.
+        *
+        * So we need to pick the NI the peer prefers for this
+        * particular network.
+        */
+
+       /* Get the target peer_ni */
+       peer_net = lnet_peer_get_net_locked(peer,
+                       LNET_NIDNET(best_lpni->lpni_nid));
+       LASSERT(peer_net != NULL);
+       list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
+                               lpni_peer_nis) {
+               if (lpni->lpni_pref_nnids == 0)
+                       continue;
+               LASSERT(lpni->lpni_pref_nnids == 1);
+               best_ni = lnet_nid2ni_locked(
+                               lpni->lpni_pref.nid, cpt);
+               break;
+       }
+
+       return best_ni;
+}
+
+/* Prerequisite: sd->sd_peer and sd->sd_best_lpni should be set */
+static int
+lnet_select_preferred_best_ni(struct lnet_send_data *sd)
+{
+       struct lnet_ni *best_ni = NULL;
+       struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
+
+       /*
+        * We must use a consistent source address when sending to a
+        * non-MR peer. However, a non-MR peer can have multiple NIDs
+        * on multiple networks, and we may even need to talk to this
+        * peer on multiple networks -- certain types of
+        * load-balancing configuration do this.
+        *
+        * So we need to pick the NI the peer prefers for this
+        * particular network.
+        */
+
+       best_ni = lnet_find_existing_preferred_best_ni(sd);
+
+       /* if best_ni is still not set just pick one */
+       if (!best_ni) {
+               best_ni =
+                 lnet_find_best_ni_on_spec_net(NULL, sd->sd_peer,
+                                               sd->sd_best_lpni->lpni_peer_net,
+                                               sd->sd_md_cpt, true);
+               /* If there is no best_ni we don't have a route */
+               if (!best_ni) {
+                       CERROR("no path to %s from net %s\n",
+                               libcfs_nid2str(best_lpni->lpni_nid),
+                               libcfs_net2str(best_lpni->lpni_net->net_id));
+                       return -EHOSTUNREACH;
+               }
+       }
+
+       sd->sd_best_ni = best_ni;
+
+       /* Set preferred NI if necessary. */
+       lnet_set_non_mr_pref_nid(sd);
+
+       return 0;
+}
+
+
+/*
+ * Source not specified
+ * Local destination
+ * Non-MR Peer
+ *
+ * always use the same source NID for NMR peers
+ * If we've talked to that peer before then we already have a preferred
+ * source NI associated with it. Otherwise, we select a preferred local NI
+ * and store it in the peer
+ */
+static int
+lnet_handle_any_local_nmr_dst(struct lnet_send_data *sd)
+{
+       int rc;
+
+       /* sd->sd_best_lpni is already set to the final destination */
+
+       /*
+        * At this point we should've created the peer ni and peer. If we
+        * can't find it, then something went wrong. Instead of assert
+        * output a relevant message and fail the send
+        */
+       if (!sd->sd_best_lpni) {
+               CERROR("Internal fault. Unable to send msg %s to %s. "
+                      "NID not known\n",
+                      lnet_msgtyp2str(sd->sd_msg->msg_type),
+                      libcfs_nid2str(sd->sd_dst_nid));
+               return -EFAULT;
+       }
+
+       rc = lnet_select_preferred_best_ni(sd);
+       if (!rc)
+               rc = lnet_handle_send(sd);
+
+       return rc;
+}
+
+static int
+lnet_handle_any_mr_dsta(struct lnet_send_data *sd)
+{
+       /*
+        * NOTE we've already handled the remote peer case. So we only
+        * need to worry about the local case here.
+        *
+        * if we're sending a response, ACK or reply, we need to send it
+        * to the destination NID given to us. At this point we already
+        * have the peer_ni we're suppose to send to, so just find the
+        * best_ni on the peer net and use that. Since we're sending to an
+        * MR peer then we can just run the selection algorithm on our
+        * local NIs and pick the best one.
+        */
+       if (sd->sd_send_case & SND_RESP) {
+               sd->sd_best_ni =
+                 lnet_find_best_ni_on_spec_net(NULL, sd->sd_peer,
+                                               sd->sd_best_lpni->lpni_peer_net,
+                                               sd->sd_md_cpt, true);
+
+               if (!sd->sd_best_ni) {
+                       /*
+                        * We're not going to deal with not able to send
+                        * a response to the provided final destination
+                        */
+                       CERROR("Can't send response to %s. "
+                              "No local NI available\n",
+                               libcfs_nid2str(sd->sd_dst_nid));
+                       return -EHOSTUNREACH;
+               }
+
+               return lnet_handle_send(sd);
+       }
+
+       /*
+        * If we get here that means we're sending a fresh request, PUT or
+        * GET, so we need to run our standard selection algorithm.
+        * First find the best local interface that's on any of the peer's
+        * networks.
+        */
+       sd->sd_best_ni = lnet_find_best_ni_on_local_net(sd->sd_peer,
+                                                       sd->sd_md_cpt);
+       if (sd->sd_best_ni) {
+               sd->sd_best_lpni =
+                 lnet_find_best_lpni_on_net(sd, sd->sd_peer,
+                                            sd->sd_best_ni->ni_net->net_id);
+
+               /*
+                * if we're successful in selecting a peer_ni on the local
+                * network, then send to it. Otherwise fall through and
+                * try and see if we can reach it over another routed
+                * network
+                */
+               if (sd->sd_best_lpni) {
+                       /*
+                        * in case we initially started with a routed
+                        * destination, let's reset to local
+                        */
+                       sd->sd_send_case &= ~REMOTE_DST;
+                       sd->sd_send_case |= LOCAL_DST;
+                       return lnet_handle_send(sd);
+               }
+
+               CERROR("Internal Error. Expected to have a best_lpni: "
+                      "%s -> %s\n",
+                      libcfs_nid2str(sd->sd_src_nid),
+                      libcfs_nid2str(sd->sd_dst_nid));
+
+               return -EFAULT;
+       }
+
+       /*
+        * Peer doesn't have a local network. Let's see if there is
+        * a remote network we can reach it on.
+        */
+       return PASS_THROUGH;
+}
+
+/*
+ * Case 1:
+ *     Source NID not specified
+ *     Local destination
+ *     MR peer
+ *
+ * Case 2:
+ *     Source NID not speified
+ *     Remote destination
+ *     MR peer
+ *
+ * In both of these cases if we're sending a response, ACK or REPLY, then
+ * we need to send to the destination NID provided.
+ *
+ * In the remote case let's deal with MR routers.
+ *
+ */
+
+static int
+lnet_handle_any_mr_dst(struct lnet_send_data *sd)
+{
+       int rc = 0;
+       struct lnet_peer *gw_peer = NULL;
+       struct lnet_peer_ni *gw_lpni = NULL;
+
+       /*
+        * handle sending a response to a remote peer here so we don't
+        * have to worry about it if we hit lnet_handle_any_mr_dsta()
+        */
+       if (sd->sd_send_case & REMOTE_DST &&
+           sd->sd_send_case & SND_RESP) {
+               struct lnet_peer_ni *gw;
+               struct lnet_peer *gw_peer;
+
+               rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw,
+                                                 &gw_peer);
+               if (rc < 0) {
+                       CERROR("Can't send response to %s. "
+                              "No route available\n",
+                               libcfs_nid2str(sd->sd_dst_nid));
+                       return -EHOSTUNREACH;
+               }
+
+               sd->sd_best_lpni = gw;
+               sd->sd_peer = gw_peer;
+
+               return lnet_handle_send(sd);
+       }
+
+       /*
+        * Even though the NID for the peer might not be on a local network,
+        * since the peer is MR there could be other interfaces on the
+        * local network. In that case we'd still like to prefer the local
+        * network over the routed network. If we're unable to do that
+        * then we select the best router among the different routed networks,
+        * and if the router is MR then we can deal with it as such.
+        */
+       rc = lnet_handle_any_mr_dsta(sd);
+       if (rc != PASS_THROUGH)
+               return rc;
+
+       /*
+        * TODO; One possible enhancement is to run the selection
+        * algorithm on the peer. However for remote peers the credits are
+        * not decremented, so we'll be basically going over the peer NIs
+        * in round robin. An MR router will run the selection algorithm
+        * on the next-hop interfaces.
+        */
+       rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
+                                         &gw_peer);
+       if (rc < 0)
+               return rc;
+
+       sd->sd_send_case &= ~LOCAL_DST;
+       sd->sd_send_case |= REMOTE_DST;
+
+       sd->sd_peer = gw_peer;
+       sd->sd_best_lpni = gw_lpni;
+
+       return lnet_handle_send(sd);
+}
+
+/*
+ * Source not specified
+ * Remote destination
+ * Non-MR peer
+ *
+ * Must send to the specified peer NID using the same source NID that
+ * we've used before. If it's the first time to talk to that peer then
+ * find the source NI and assign it as preferred to that peer
+ */
+static int
+lnet_handle_any_router_nmr_dst(struct lnet_send_data *sd)
+{
+       int rc;
+       struct lnet_peer_ni *gw_lpni = NULL;
+       struct lnet_peer *gw_peer = NULL;
+
+       /*
+        * Let's set if we have a preferred NI to talk to this NMR peer
+        */
+       sd->sd_best_ni = lnet_find_existing_preferred_best_ni(sd);
+
+       /*
+        * find the router and that'll find the best NI if we didn't find
+        * it already.
+        */
+       rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
+                                         &gw_peer);
+       if (rc < 0)
+               return rc;
+
+       /*
+        * set the best_ni we've chosen as the preferred one for
+        * this peer
+        */
+       lnet_set_non_mr_pref_nid(sd);
+
+       /* we'll be sending to the gw */
+       sd->sd_best_lpni = gw_lpni;
+       sd->sd_peer = gw_peer;
+
+       return lnet_handle_send(sd);
+}
+
+static int
+lnet_handle_send_case_locked(struct lnet_send_data *sd)
+{
+       /*
+        * turn off the SND_RESP bit.
+        * It will be checked in the case handling
+        */
+       __u32 send_case = sd->sd_send_case &= ~SND_RESP ;
+
+       CDEBUG(D_NET, "Source %s%s to %s %s %s destination\n",
+               (send_case & SRC_SPEC) ? "Specified: " : "ANY",
+               (send_case & SRC_SPEC) ? libcfs_nid2str(sd->sd_src_nid) : "",
+               (send_case & MR_DST) ? "MR: " : "NMR: ",
+               libcfs_nid2str(sd->sd_dst_nid),
+               (send_case & LOCAL_DST) ? "local" : "routed");
+
+       switch (send_case) {
+       /*
+        * For all cases where the source is specified, we should always
+        * use the destination NID, whether it's an MR destination or not,
+        * since we're continuing a series of related messages for the
+        * same RPC
+        */
+       case SRC_SPEC_LOCAL_NMR_DST:
+               return lnet_handle_spec_local_nmr_dst(sd);
+       case SRC_SPEC_LOCAL_MR_DST:
+               return lnet_handle_spec_local_mr_dst(sd);
+       case SRC_SPEC_ROUTER_NMR_DST:
+       case SRC_SPEC_ROUTER_MR_DST:
+               return lnet_handle_spec_router_dst(sd);
+       case SRC_ANY_LOCAL_NMR_DST:
+               return lnet_handle_any_local_nmr_dst(sd);
+       case SRC_ANY_LOCAL_MR_DST:
+       case SRC_ANY_ROUTER_MR_DST:
+               return lnet_handle_any_mr_dst(sd);
+       case SRC_ANY_ROUTER_NMR_DST:
+               return lnet_handle_any_router_nmr_dst(sd);
+       default:
+               CERROR("Unknown send case\n");
+               return -1;
+       }
+}
+
+static int
+lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
+                   struct lnet_msg *msg, lnet_nid_t rtr_nid)
+{
+       struct lnet_peer_ni     *lpni;
+       struct lnet_peer        *peer;
+       struct lnet_send_data   send_data;
+       int                     cpt, rc;
+       int                     md_cpt;
+       __u32                   send_case = 0;
+
+       memset(&send_data, 0, sizeof(send_data));
+
+       /*
+        * get an initial CPT to use for locking. The idea here is not to
+        * serialize the calls to select_pathway, so that as many
+        * operations can run concurrently as possible. To do that we use
+        * the CPT where this call is being executed. Later on when we
+        * determine the CPT to use in lnet_message_commit, we switch the
+        * lock and check if there was any configuration change.  If none,
+        * then we proceed, if there is, then we restart the operation.
+        */
+       cpt = lnet_net_lock_current();
+
+       md_cpt = lnet_cpt_of_md(msg->msg_md, msg->msg_offset);
+       if (md_cpt == CFS_CPT_ANY)
+               md_cpt = cpt;
+
+again:
+
+       /*
+        * If we're being asked to send to the loopback interface, there
+        * is no need to go through any selection. We can just shortcut
+        * the entire process and send over lolnd
+        */
+       if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) {
+               /* No send credit hassles with LOLND */
+               lnet_ni_addref_locked(the_lnet.ln_loni, cpt);
+               msg->msg_hdr.dest_nid = cpu_to_le64(the_lnet.ln_loni->ni_nid);
+               if (!msg->msg_routing)
+                       msg->msg_hdr.src_nid =
+                               cpu_to_le64(the_lnet.ln_loni->ni_nid);
+               msg->msg_target.nid = the_lnet.ln_loni->ni_nid;
+               lnet_msg_commit(msg, cpt);
+               msg->msg_txni = the_lnet.ln_loni;
+               lnet_net_unlock(cpt);
+
+               return LNET_CREDIT_OK;
+       }
+
+       /*
+        * find an existing peer_ni, or create one and mark it as having been
+        * created due to network traffic. This call will create the
+        * peer->peer_net->peer_ni tree.
+        */
+       lpni = lnet_nid2peerni_locked(dst_nid, LNET_NID_ANY, cpt);
+       if (IS_ERR(lpni)) {
+               lnet_net_unlock(cpt);
+               return PTR_ERR(lpni);
+       }
+
+       /*
+        * Cache the original src_nid. If we need to resend the message
+        * then we'll need to know whether the src_nid was originally
+        * specified for this message. If it was originally specified,
+        * then we need to keep using the same src_nid since it's
+        * continuing the same sequence of messages.
+        */
+       msg->msg_src_nid_param = src_nid;
+
+       /*
+        * Now that we have a peer_ni, check if we want to discover
+        * the peer. Traffic to the LNET_RESERVED_PORTAL should not
+        * trigger discovery.
+        */
+       peer = lpni->lpni_peer_net->lpn_peer;
+       if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) {
+               lnet_nid_t primary_nid;
+               rc = lnet_discover_peer_locked(lpni, cpt, false);
+               if (rc) {
+                       lnet_peer_ni_decref_locked(lpni);
+                       lnet_net_unlock(cpt);
+                       return rc;
+               }
+               /* The peer may have changed. */
+               peer = lpni->lpni_peer_net->lpn_peer;
+               /* queue message and return */
+               msg->msg_rtr_nid_param = rtr_nid;
+               msg->msg_sending = 0;
+               list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
+               lnet_peer_ni_decref_locked(lpni);
+               primary_nid = peer->lp_primary_nid;
+               lnet_net_unlock(cpt);
+
+               CDEBUG(D_NET, "%s pending discovery\n",
+                      libcfs_nid2str(primary_nid));
+
+               return LNET_DC_WAIT;
+       }
+       lnet_peer_ni_decref_locked(lpni);
+
+       /*
+        * Identify the different send cases
+        */
+       if (src_nid == LNET_NID_ANY)
+               send_case |= SRC_ANY;
+       else
+               send_case |= SRC_SPEC;
+
+       if (lnet_get_net_locked(LNET_NIDNET(dst_nid)))
+               send_case |= LOCAL_DST;
+       else
+               send_case |= REMOTE_DST;
+
+       /*
+        * if this is a non-MR peer or if we're recovering a peer ni then
+        * let's consider this an NMR case so we can hit the destination
+        * NID.
+        */
+       if (!lnet_peer_is_multi_rail(peer) || msg->msg_recovery)
+               send_case |= NMR_DST;
+       else
+               send_case |= MR_DST;
+
+       if (msg->msg_type == LNET_MSG_REPLY ||
+           msg->msg_type == LNET_MSG_ACK)
+               send_case |= SND_RESP;
+
+       /* assign parameters to the send_data */
+       send_data.sd_msg = msg;
+       send_data.sd_rtr_nid = rtr_nid;
+       send_data.sd_src_nid = src_nid;
+       send_data.sd_dst_nid = dst_nid;
+       send_data.sd_best_lpni = lpni;
+       /*
+        * keep a pointer to the final destination in case we're going to
+        * route, so we'll need to access it later
+        */
+       send_data.sd_final_dst_lpni = lpni;
+       send_data.sd_peer = peer;
+       send_data.sd_md_cpt = md_cpt;
+       send_data.sd_cpt = cpt;
+       send_data.sd_send_case = send_case;
+
+       rc = lnet_handle_send_case_locked(&send_data);
+
+       if (rc == REPEAT_SEND)
+               goto again;
+
+       lnet_net_unlock(send_data.sd_cpt);
+
+       return rc;
+}
+
+int
+lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
+{
+       lnet_nid_t              dst_nid = msg->msg_target.nid;
+       int                     rc;
+
+       /*
+        * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
+        * but we might want to use pre-determined router for ACK/REPLY
+        * in the future
+        */
+       /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
+       LASSERT(msg->msg_txpeer == NULL);
+       LASSERT(msg->msg_txni == NULL);
+       LASSERT(!msg->msg_sending);
+       LASSERT(!msg->msg_target_is_router);
+       LASSERT(!msg->msg_receiving);
+
+       msg->msg_sending = 1;
+
+       LASSERT(!msg->msg_tx_committed);
+
+       rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid);
+       if (rc < 0)
+               return rc;
+
+       if (rc == LNET_CREDIT_OK)
+               lnet_ni_send(msg->msg_txni, msg);
+
+       /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */
+       return 0;
+}
+
+enum lnet_mt_event_type {
+       MT_TYPE_LOCAL_NI = 0,
+       MT_TYPE_PEER_NI
+};
+
+struct lnet_mt_event_info {
+       enum lnet_mt_event_type mt_type;
+       lnet_nid_t mt_nid;
+};
+
+void
+lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
+{
+       struct lnet_rsp_tracker *rspt;
+
+       /*
+        * msg has a refcount on the MD so the MD is not going away.
+        * The rspt queue for the cpt is protected by
+        * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
+        */
+       lnet_res_lock(cpt);
+       if (!md->md_rspt_ptr) {
+               lnet_res_unlock(cpt);
+               return;
+       }
+       rspt = md->md_rspt_ptr;
+       md->md_rspt_ptr = NULL;
+
+       /* debug code */
+       LASSERT(rspt->rspt_cpt == cpt);
+
+       /*
+        * invalidate the handle to indicate that a response has been
+        * received, which will then lead the monitor thread to clean up
+        * the rspt block.
+        */
+       LNetInvalidateMDHandle(&rspt->rspt_mdh);
+       lnet_res_unlock(cpt);
+}
+
+static void
+lnet_finalize_expired_responses(bool force)
+{
+       struct lnet_libmd *md;
+       struct list_head local_queue;
+       struct lnet_rsp_tracker *rspt, *tmp;
+       int i;
+
+       if (the_lnet.ln_mt_rstq == NULL)
+               return;
+
+       cfs_cpt_for_each(i, lnet_cpt_table()) {
+               INIT_LIST_HEAD(&local_queue);
+
+               lnet_net_lock(i);
+               if (!the_lnet.ln_mt_rstq[i]) {
+                       lnet_net_unlock(i);
+                       continue;
+               }
+               list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
+               lnet_net_unlock(i);
+
+               list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
+                       /*
+                        * The rspt mdh will be invalidated when a response
+                        * is received or whenever we want to discard the
+                        * block the monitor thread will walk the queue
+                        * and clean up any rsts with an invalid mdh.
+                        * The monitor thread will walk the queue until
+                        * the first unexpired rspt block. This means that
+                        * some rspt blocks which received their
+                        * corresponding responses will linger in the
+                        * queue until they are cleaned up eventually.
+                        */
+                       lnet_res_lock(i);
+                       if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
+                               lnet_res_unlock(i);
+                               list_del_init(&rspt->rspt_on_list);
+                               lnet_rspt_free(rspt, i);
+                               continue;
+                       }
+
+                       if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 ||
+                           force) {
+                               md = lnet_handle2md(&rspt->rspt_mdh);
+                               if (!md) {
+                                       LNetInvalidateMDHandle(&rspt->rspt_mdh);
+                                       lnet_res_unlock(i);
+                                       list_del_init(&rspt->rspt_on_list);
+                                       lnet_rspt_free(rspt, i);
+                                       continue;
+                               }
+                               LASSERT(md->md_rspt_ptr == rspt);
+                               md->md_rspt_ptr = NULL;
+                               lnet_res_unlock(i);
+
+                               lnet_net_lock(i);
+                               the_lnet.ln_counters[i]->response_timeout_count++;
+                               lnet_net_unlock(i);
+
+                               list_del_init(&rspt->rspt_on_list);
+
+                               CDEBUG(D_NET, "Response timed out: md = %p\n", md);
+                               LNetMDUnlink(rspt->rspt_mdh);
+                               lnet_rspt_free(rspt, i);
+                       } else {
+                               lnet_res_unlock(i);
+                               break;
+                       }
+               }
+
+               lnet_net_lock(i);
+               if (!list_empty(&local_queue))
+                       list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
+               lnet_net_unlock(i);
+       }
+}
+
+static void
+lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt)
+{
+       struct lnet_msg *msg;
+
+       while (!list_empty(resendq)) {
+               struct lnet_peer_ni *lpni;
+
+               msg = list_entry(resendq->next, struct lnet_msg,
+                                msg_list);
+
+               list_del_init(&msg->msg_list);
+
+               lpni = lnet_find_peer_ni_locked(msg->msg_hdr.dest_nid);
+               if (!lpni) {
+                       lnet_net_unlock(cpt);
+                       CERROR("Expected that a peer is already created for %s\n",
+                              libcfs_nid2str(msg->msg_hdr.dest_nid));
+                       msg->msg_no_resend = true;
+                       lnet_finalize(msg, -EFAULT);
+                       lnet_net_lock(cpt);
+               } else {
+                       struct lnet_peer *peer;
+                       int rc;
+                       lnet_nid_t src_nid = LNET_NID_ANY;
+
+                       /*
+                        * if this message is not being routed and the
+                        * peer is non-MR then we must use the same
+                        * src_nid that was used in the original send.
+                        * Otherwise if we're routing the message (IE
+                        * we're a router) then we can use any of our
+                        * local interfaces. It doesn't matter to the
+                        * final destination.
+                        */
+                       peer = lpni->lpni_peer_net->lpn_peer;
+                       if (!msg->msg_routing &&
+                           !lnet_peer_is_multi_rail(peer))
+                               src_nid = le64_to_cpu(msg->msg_hdr.src_nid);
+
+                       /*
+                        * If we originally specified a src NID, then we
+                        * must attempt to reuse it in the resend as well.
+                        */
+                       if (msg->msg_src_nid_param != LNET_NID_ANY)
+                               src_nid = msg->msg_src_nid_param;
+                       lnet_peer_ni_decref_locked(lpni);
+
+                       lnet_net_unlock(cpt);
+                       CDEBUG(D_NET, "resending %s->%s: %s recovery %d\n",
+                              libcfs_nid2str(src_nid),
+                              libcfs_id2str(msg->msg_target),
+                              lnet_msgtyp2str(msg->msg_type),
+                              msg->msg_recovery);
+                       rc = lnet_send(src_nid, msg, LNET_NID_ANY);
+                       if (rc) {
+                               CERROR("Error sending %s to %s: %d\n",
+                                      lnet_msgtyp2str(msg->msg_type),
+                                      libcfs_id2str(msg->msg_target), rc);
+                               msg->msg_no_resend = true;
+                               lnet_finalize(msg, rc);
+                       }
+                       lnet_net_lock(cpt);
+                       if (!rc)
+                               the_lnet.ln_counters[cpt]->resend_count++;
+               }
+       }
+}
+
+static void
+lnet_resend_pending_msgs(void)
+{
+       int i;
+
+       cfs_cpt_for_each(i, lnet_cpt_table()) {
+               lnet_net_lock(i);
+               lnet_resend_pending_msgs_locked(the_lnet.ln_mt_resendqs[i], i);
+               lnet_net_unlock(i);
+       }
+}
+
+/* called with cpt and ni_lock held */
+static void
+lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt)
+{
+       struct lnet_handle_md recovery_mdh;
+
+       LNetInvalidateMDHandle(&recovery_mdh);
+
+       if (ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING) {
+               recovery_mdh = ni->ni_ping_mdh;
+               LNetInvalidateMDHandle(&ni->ni_ping_mdh);
+       }
+       lnet_ni_unlock(ni);
+       lnet_net_unlock(cpt);
+       if (!LNetMDHandleIsInvalid(recovery_mdh))
+               LNetMDUnlink(recovery_mdh);
+       lnet_net_lock(cpt);
+       lnet_ni_lock(ni);
+}
+
+static void
+lnet_recover_local_nis(void)
+{
+       struct lnet_mt_event_info *ev_info;
+       struct list_head processed_list;
+       struct list_head local_queue;
+       struct lnet_handle_md mdh;
+       struct lnet_ni *tmp;
+       struct lnet_ni *ni;
+       lnet_nid_t nid;
+       int healthv;
+       int rc;
+
+       INIT_LIST_HEAD(&local_queue);
+       INIT_LIST_HEAD(&processed_list);
+
+       /*
+        * splice the recovery queue on a local queue. We will iterate
+        * through the local queue and update it as needed. Once we're
+        * done with the traversal, we'll splice the local queue back on
+        * the head of the ln_mt_localNIRecovq. Any newly added local NIs
+        * will be traversed in the next iteration.
+        */
+       lnet_net_lock(0);
+       list_splice_init(&the_lnet.ln_mt_localNIRecovq,
+                        &local_queue);
+       lnet_net_unlock(0);
+
+       list_for_each_entry_safe(ni, tmp, &local_queue, ni_recovery) {
+               /*
+                * if an NI is being deleted or it is now healthy, there
+                * is no need to keep it around in the recovery queue.
+                * The monitor thread is the only thread responsible for
+                * removing the NI from the recovery queue.
+                * Multiple threads can be adding NIs to the recovery
+                * queue.
+                */
+               healthv = atomic_read(&ni->ni_healthv);
+
+               lnet_net_lock(0);
+               lnet_ni_lock(ni);
+               if (!(ni->ni_state & LNET_NI_STATE_ACTIVE) ||
+                   healthv == LNET_MAX_HEALTH_VALUE) {
+                       list_del_init(&ni->ni_recovery);
+                       lnet_unlink_ni_recovery_mdh_locked(ni, 0);
+                       lnet_ni_unlock(ni);
+                       lnet_ni_decref_locked(ni, 0);
+                       lnet_net_unlock(0);
+                       continue;
+               }
+               lnet_ni_unlock(ni);
+               lnet_net_unlock(0);
+
+
+               CDEBUG(D_NET, "attempting to recover local ni: %s\n",
+                      libcfs_nid2str(ni->ni_nid));
+
+               lnet_ni_lock(ni);
+               if (!(ni->ni_state & LNET_NI_STATE_RECOVERY_PENDING)) {
+                       ni->ni_state |= LNET_NI_STATE_RECOVERY_PENDING;
+                       lnet_ni_unlock(ni);
+
+                       LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
+                       if (!ev_info) {
+                               CERROR("out of memory. Can't recover %s\n",
+                                      libcfs_nid2str(ni->ni_nid));
+                               lnet_ni_lock(ni);
+                               ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+                               lnet_ni_unlock(ni);
+                               continue;
+                       }
+
+                       mdh = ni->ni_ping_mdh;
+                       /*
+                        * Invalidate the ni mdh in case it's deleted.
+                        * We'll unlink the mdh in this case below.
+                        */
+                       LNetInvalidateMDHandle(&ni->ni_ping_mdh);
+                       nid = ni->ni_nid;
+
+                       /*
+                        * remove the NI from the local queue and drop the
+                        * reference count to it while we're recovering
+                        * it. The reason for that, is that the NI could
+                        * be deleted, and the way the code is structured
+                        * is if we don't drop the NI, then the deletion
+                        * code will enter a loop waiting for the
+                        * reference count to be removed while holding the
+                        * ln_mutex_lock(). When we look up the peer to
+                        * send to in lnet_select_pathway() we will try to
+                        * lock the ln_mutex_lock() as well, leading to
+                        * a deadlock. By dropping the refcount and
+                        * removing it from the list, we allow for the NI
+                        * to be removed, then we use the cached NID to
+                        * look it up again. If it's gone, then we just
+                        * continue examining the rest of the queue.
+                        */
+                       lnet_net_lock(0);
+                       list_del_init(&ni->ni_recovery);
+                       lnet_ni_decref_locked(ni, 0);
+                       lnet_net_unlock(0);
+
+                       ev_info->mt_type = MT_TYPE_LOCAL_NI;
+                       ev_info->mt_nid = nid;
+                       rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+                                           ev_info, the_lnet.ln_mt_eqh, true);
+                       /* lookup the nid again */
+                       lnet_net_lock(0);
+                       ni = lnet_nid2ni_locked(nid, 0);
+                       if (!ni) {
+                               /*
+                                * the NI has been deleted when we dropped
+                                * the ref count
+                                */
+                               lnet_net_unlock(0);
+                               LNetMDUnlink(mdh);
+                               continue;
+                       }
+                       /*
+                        * Same note as in lnet_recover_peer_nis(). When
+                        * we're sending the ping, the NI is free to be
+                        * deleted or manipulated. By this point it
+                        * could've been added back on the recovery queue,
+                        * and a refcount taken on it.
+                        * So we can't just add it blindly again or we'll
+                        * corrupt the queue. We must check under lock if
+                        * it's not on any list and if not then add it
+                        * to the processed list, which will eventually be
+                        * spliced back on to the recovery queue.
+                        */
+                       ni->ni_ping_mdh = mdh;
+                       if (list_empty(&ni->ni_recovery)) {
+                               list_add_tail(&ni->ni_recovery, &processed_list);
+                               lnet_ni_addref_locked(ni, 0);
+                       }
+                       lnet_net_unlock(0);
+
+                       lnet_ni_lock(ni);
+                       if (rc)
+                               ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+               }
+               lnet_ni_unlock(ni);
+       }
+
+       /*
+        * put back the remaining NIs on the ln_mt_localNIRecovq to be
+        * reexamined in the next iteration.
+        */
+       list_splice_init(&processed_list, &local_queue);
+       lnet_net_lock(0);
+       list_splice(&local_queue, &the_lnet.ln_mt_localNIRecovq);
+       lnet_net_unlock(0);
+}
+
+static struct list_head **
+lnet_create_array_of_queues(void)
+{
+       struct list_head **qs;
+       struct list_head *q;
+       int i;
+
+       qs = cfs_percpt_alloc(lnet_cpt_table(),
+                             sizeof(struct list_head));
+       if (!qs) {
+               CERROR("Failed to allocate queues\n");
+               return NULL;
+       }
+
+       cfs_percpt_for_each(q, i, qs)
+               INIT_LIST_HEAD(q);
+
+       return qs;
+}
+
+static int
+lnet_resendqs_create(void)
+{
+       struct list_head **resendqs;
+       resendqs = lnet_create_array_of_queues();
+
+       if (!resendqs)
+               return -ENOMEM;
+
+       lnet_net_lock(LNET_LOCK_EX);
+       the_lnet.ln_mt_resendqs = resendqs;
+       lnet_net_unlock(LNET_LOCK_EX);
+
+       return 0;
+}
+
+static void
+lnet_clean_local_ni_recoveryq(void)
+{
+       struct lnet_ni *ni;
+
+       /* This is only called when the monitor thread has stopped */
+       lnet_net_lock(0);
+
+       while (!list_empty(&the_lnet.ln_mt_localNIRecovq)) {
+               ni = list_entry(the_lnet.ln_mt_localNIRecovq.next,
+                               struct lnet_ni, ni_recovery);
+               list_del_init(&ni->ni_recovery);
+               lnet_ni_lock(ni);
+               lnet_unlink_ni_recovery_mdh_locked(ni, 0);
+               lnet_ni_unlock(ni);
+               lnet_ni_decref_locked(ni, 0);
+       }
+
+       lnet_net_unlock(0);
+}
+
+static void
+lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt)
+{
+       struct lnet_handle_md recovery_mdh;
+
+       LNetInvalidateMDHandle(&recovery_mdh);
+
+       if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) {
+               recovery_mdh = lpni->lpni_recovery_ping_mdh;
+               LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+       }
+       spin_unlock(&lpni->lpni_lock);
+       lnet_net_unlock(cpt);
+       if (!LNetMDHandleIsInvalid(recovery_mdh))
+               LNetMDUnlink(recovery_mdh);
+       lnet_net_lock(cpt);
+       spin_lock(&lpni->lpni_lock);
+}
+
+static void
+lnet_clean_peer_ni_recoveryq(void)
+{
+       struct lnet_peer_ni *lpni, *tmp;
+
+       lnet_net_lock(LNET_LOCK_EX);
+
+       list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_mt_peerNIRecovq,
+                                lpni_recovery) {
+               list_del_init(&lpni->lpni_recovery);
+               spin_lock(&lpni->lpni_lock);
+               lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX);
+               spin_unlock(&lpni->lpni_lock);
+               lnet_peer_ni_decref_locked(lpni);
+       }
+
+       lnet_net_unlock(LNET_LOCK_EX);
+}
+
+static void
+lnet_clean_resendqs(void)
+{
+       struct lnet_msg *msg, *tmp;
+       struct list_head msgs;
+       int i;
+
+       INIT_LIST_HEAD(&msgs);
+
+       cfs_cpt_for_each(i, lnet_cpt_table()) {
+               lnet_net_lock(i);
+               list_splice_init(the_lnet.ln_mt_resendqs[i], &msgs);
+               lnet_net_unlock(i);
+               list_for_each_entry_safe(msg, tmp, &msgs, msg_list) {
+                       list_del_init(&msg->msg_list);
+                       msg->msg_no_resend = true;
+                       lnet_finalize(msg, -ESHUTDOWN);
+               }
+       }
+
+       cfs_percpt_free(the_lnet.ln_mt_resendqs);
+}
 
-       /*
-        * Increment sequence number of the peer selected so that we
-        * pick the next one in Round Robin.
-        */
-       best_lpni->lpni_seq++;
+static void
+lnet_recover_peer_nis(void)
+{
+       struct lnet_mt_event_info *ev_info;
+       struct list_head processed_list;
+       struct list_head local_queue;
+       struct lnet_handle_md mdh;
+       struct lnet_peer_ni *lpni;
+       struct lnet_peer_ni *tmp;
+       lnet_nid_t nid;
+       int healthv;
+       int rc;
 
-       /*
-        * grab a reference on the peer_ni so it sticks around even if
-        * we need to drop and relock the lnet_net_lock below.
-        */
-       lnet_peer_ni_addref_locked(best_lpni);
+       INIT_LIST_HEAD(&local_queue);
+       INIT_LIST_HEAD(&processed_list);
 
        /*
-        * Use lnet_cpt_of_nid() to determine the CPT used to commit the
-        * message. This ensures that we get a CPT that is correct for
-        * the NI when the NI has been restricted to a subset of all CPTs.
-        * If the selected CPT differs from the one currently locked, we
-        * must unlock and relock the lnet_net_lock(), and then check whether
-        * the configuration has changed. We don't have a hold on the best_ni
-        * yet, and it may have vanished.
+        * Always use cpt 0 for locking across all interactions with
+        * ln_mt_peerNIRecovq
         */
-       cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
-       if (cpt != cpt2) {
-               __u32 seq = lnet_get_dlc_seq_locked();
-               lnet_net_unlock(cpt);
-               cpt = cpt2;
-               lnet_net_lock(cpt);
-               if (seq != lnet_get_dlc_seq_locked()) {
-                       lnet_peer_ni_decref_locked(best_lpni);
-                       goto again;
+       lnet_net_lock(0);
+       list_splice_init(&the_lnet.ln_mt_peerNIRecovq,
+                        &local_queue);
+       lnet_net_unlock(0);
+
+       list_for_each_entry_safe(lpni, tmp, &local_queue,
+                                lpni_recovery) {
+               /*
+                * The same protection strategy is used here as is in the
+                * local recovery case.
+                */
+               lnet_net_lock(0);
+               healthv = atomic_read(&lpni->lpni_healthv);
+               spin_lock(&lpni->lpni_lock);
+               if (lpni->lpni_state & LNET_PEER_NI_DELETING ||
+                   healthv == LNET_MAX_HEALTH_VALUE) {
+                       list_del_init(&lpni->lpni_recovery);
+                       lnet_unlink_lpni_recovery_mdh_locked(lpni, 0);
+                       spin_unlock(&lpni->lpni_lock);
+                       lnet_peer_ni_decref_locked(lpni);
+                       lnet_net_unlock(0);
+                       continue;
+               }
+               spin_unlock(&lpni->lpni_lock);
+               lnet_net_unlock(0);
+
+               /*
+                * NOTE: we're racing with peer deletion from user space.
+                * It's possible that a peer is deleted after we check its
+                * state. In this case the recovery can create a new peer
+                */
+               spin_lock(&lpni->lpni_lock);
+               if (!(lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) &&
+                   !(lpni->lpni_state & LNET_PEER_NI_DELETING)) {
+                       lpni->lpni_state |= LNET_PEER_NI_RECOVERY_PENDING;
+                       spin_unlock(&lpni->lpni_lock);
+
+                       LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
+                       if (!ev_info) {
+                               CERROR("out of memory. Can't recover %s\n",
+                                      libcfs_nid2str(lpni->lpni_nid));
+                               spin_lock(&lpni->lpni_lock);
+                               lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+                               spin_unlock(&lpni->lpni_lock);
+                               continue;
+                       }
+
+                       /* look at the comments in lnet_recover_local_nis() */
+                       mdh = lpni->lpni_recovery_ping_mdh;
+                       LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+                       nid = lpni->lpni_nid;
+                       lnet_net_lock(0);
+                       list_del_init(&lpni->lpni_recovery);
+                       lnet_peer_ni_decref_locked(lpni);
+                       lnet_net_unlock(0);
+
+                       ev_info->mt_type = MT_TYPE_PEER_NI;
+                       ev_info->mt_nid = nid;
+                       rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+                                           ev_info, the_lnet.ln_mt_eqh, true);
+                       lnet_net_lock(0);
+                       /*
+                        * lnet_find_peer_ni_locked() grabs a refcount for
+                        * us. No need to take it explicitly.
+                        */
+                       lpni = lnet_find_peer_ni_locked(nid);
+                       if (!lpni) {
+                               lnet_net_unlock(0);
+                               LNetMDUnlink(mdh);
+                               continue;
+                       }
+
+                       lpni->lpni_recovery_ping_mdh = mdh;
+                       /*
+                        * While we're unlocked the lpni could've been
+                        * readded on the recovery queue. In this case we
+                        * don't need to add it to the local queue, since
+                        * it's already on there and the thread that added
+                        * it would've incremented the refcount on the
+                        * peer, which means we need to decref the refcount
+                        * that was implicitly grabbed by find_peer_ni_locked.
+                        * Otherwise, if the lpni is still not on
+                        * the recovery queue, then we'll add it to the
+                        * processed list.
+                        */
+                       if (list_empty(&lpni->lpni_recovery))
+                               list_add_tail(&lpni->lpni_recovery, &processed_list);
+                       else
+                               lnet_peer_ni_decref_locked(lpni);
+                       lnet_net_unlock(0);
+
+                       spin_lock(&lpni->lpni_lock);
+                       if (rc)
+                               lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
                }
+               spin_unlock(&lpni->lpni_lock);
        }
 
-       /*
-        * store the best_lpni in the message right away to avoid having
-        * to do the same operation under different conditions
-        */
-       msg->msg_txpeer = best_lpni;
-       msg->msg_txni = best_ni;
+       list_splice_init(&processed_list, &local_queue);
+       lnet_net_lock(0);
+       list_splice(&local_queue, &the_lnet.ln_mt_peerNIRecovq);
+       lnet_net_unlock(0);
+}
 
-       /*
-        * grab a reference for the best_ni since now it's in use in this
-        * send. the reference will need to be dropped when the message is
-        * finished in lnet_finalize()
-        */
-       lnet_ni_addref_locked(msg->msg_txni, cpt);
+static int
+lnet_monitor_thread(void *arg)
+{
+       int wakeup_counter = 0;
 
        /*
-        * Always set the target.nid to the best peer picked. Either the
-        * nid will be one of the preconfigured NIDs, or the same NID as
-        * what was originally set in the target or it will be the NID of
-        * a router if this message should be routed
+        * The monitor thread takes care of the following:
+        *  1. Checks the aliveness of routers
+        *  2. Checks if there are messages on the resend queue to resend
+        *     them.
+        *  3. Check if there are any NIs on the local recovery queue and
+        *     pings them
+        *  4. Checks if there are any NIs on the remote recovery queue
+        *     and pings them.
         */
-       msg->msg_target.nid = msg->msg_txpeer->lpni_nid;
+       cfs_block_allsigs();
 
-       /*
-        * lnet_msg_commit assigns the correct cpt to the message, which
-        * is used to decrement the correct refcount on the ni when it's
-        * time to return the credits
-        */
-       lnet_msg_commit(msg, cpt);
+       while (the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING) {
+               if (lnet_router_checker_active())
+                       lnet_check_routers();
 
-       /*
-        * If we are routing the message then we don't need to overwrite
-        * the src_nid since it would've been set at the origin. Otherwise
-        * we are the originator so we need to set it.
-        */
-       if (!msg->msg_routing)
-               msg->msg_hdr.src_nid = cpu_to_le64(msg->msg_txni->ni_nid);
+               lnet_resend_pending_msgs();
+
+               wakeup_counter++;
+               if (wakeup_counter >= lnet_transaction_timeout / 2) {
+                       lnet_finalize_expired_responses(false);
+                       wakeup_counter = 0;
+               }
+
+               lnet_recover_local_nis();
+
+               lnet_recover_peer_nis();
 
-       if (routing) {
-               msg->msg_target_is_router = 1;
-               msg->msg_target.pid = LNET_PID_LUSTRE;
                /*
-                * since we're routing we want to ensure that the
-                * msg_hdr.dest_nid is set to the final destination. When
-                * the router receives this message it knows how to route
-                * it.
+                * TODO do we need to check if we should sleep without
+                * timeout?  Technically, an active system will always
+                * have messages in flight so this check will always
+                * evaluate to false. And on an idle system do we care
+                * if we wake up every 1 second? Although, we've seen
+                * cases where we get a complaint that an idle thread
+                * is waking up unnecessarily.
                 */
-               msg->msg_hdr.dest_nid =
-                       cpu_to_le64(final_dst ? final_dst->lpni_nid : dst_nid);
-       } else {
+               wait_event_interruptible_timeout(the_lnet.ln_mt_waitq,
+                                               false,
+                                               cfs_time_seconds(1));
+       }
+
+       /* clean up the router checker */
+       lnet_prune_rc_data(1);
+
+       /* Shutting down */
+       the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+
+       /* signal that the monitor thread is exiting */
+       up(&the_lnet.ln_mt_signal);
+
+       return 0;
+}
+
+/*
+ * lnet_send_ping
+ * Sends a ping.
+ * Returns == 0 if success
+ * Returns > 0 if LNetMDBind or prior fails
+ * Returns < 0 if LNetGet fails
+ */
+int
+lnet_send_ping(lnet_nid_t dest_nid,
+              struct lnet_handle_md *mdh, int nnis,
+              void *user_data, struct lnet_handle_eq eqh, bool recovery)
+{
+       struct lnet_md md = { NULL };
+       struct lnet_process_id id;
+       struct lnet_ping_buffer *pbuf;
+       int rc;
+
+       if (dest_nid == LNET_NID_ANY) {
+               rc = -EHOSTUNREACH;
+               goto fail_error;
+       }
+
+       pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
+       if (!pbuf) {
+               rc = ENOMEM;
+               goto fail_error;
+       }
+
+       /* initialize md content */
+       md.start     = &pbuf->pb_info;
+       md.length    = LNET_PING_INFO_SIZE(nnis);
+       md.threshold = 2; /* GET/REPLY */
+       md.max_size  = 0;
+       md.options   = LNET_MD_TRUNCATE;
+       md.user_ptr  = user_data;
+       md.eq_handle = eqh;
+
+       rc = LNetMDBind(md, LNET_UNLINK, mdh);
+       if (rc) {
+               lnet_ping_buffer_decref(pbuf);
+               CERROR("Can't bind MD: %d\n", rc);
+               rc = -rc; /* change the rc to positive */
+               goto fail_error;
+       }
+       id.pid = LNET_PID_LUSTRE;
+       id.nid = dest_nid;
+
+       rc = LNetGet(LNET_NID_ANY, *mdh, id,
+                    LNET_RESERVED_PORTAL,
+                    LNET_PROTO_PING_MATCHBITS, 0, recovery);
+
+       if (rc)
+               goto fail_unlink_md;
+
+       return 0;
+
+fail_unlink_md:
+       LNetMDUnlink(*mdh);
+       LNetInvalidateMDHandle(mdh);
+fail_error:
+       return rc;
+}
+
+static void
+lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
+                          int status)
+{
+       lnet_nid_t nid = ev_info->mt_nid;
+
+       if (ev_info->mt_type == MT_TYPE_LOCAL_NI) {
+               struct lnet_ni *ni;
+
+               lnet_net_lock(0);
+               ni = lnet_nid2ni_locked(nid, 0);
+               if (!ni) {
+                       lnet_net_unlock(0);
+                       return;
+               }
+               lnet_ni_lock(ni);
+               ni->ni_state &= ~LNET_NI_STATE_RECOVERY_PENDING;
+               lnet_ni_unlock(ni);
+               lnet_net_unlock(0);
+
+               if (status != 0) {
+                       CERROR("local NI recovery failed with %d\n", status);
+                       return;
+               }
                /*
-                * if we're not routing set the dest_nid to the best peer
-                * ni that we picked earlier in the algorithm.
+                * need to increment healthv for the ni here, because in
+                * the lnet_finalize() path we don't have access to this
+                * NI. And in order to get access to it, we'll need to
+                * carry forward too much information.
+                * In the peer case, it'll naturally be incremented
                 */
-               msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
+               lnet_inc_healthv(&ni->ni_healthv);
+       } else {
+               struct lnet_peer_ni *lpni;
+               int cpt;
+
+               cpt = lnet_net_lock_current();
+               lpni = lnet_find_peer_ni_locked(nid);
+               if (!lpni) {
+                       lnet_net_unlock(cpt);
+                       return;
+               }
+               spin_lock(&lpni->lpni_lock);
+               lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+               spin_unlock(&lpni->lpni_lock);
+               lnet_peer_ni_decref_locked(lpni);
+               lnet_net_unlock(cpt);
+
+               if (status != 0)
+                       CERROR("peer NI recovery failed with %d\n", status);
        }
+}
 
-       rc = lnet_post_send_locked(msg, 0);
+static void
+lnet_mt_event_handler(struct lnet_event *event)
+{
+       struct lnet_mt_event_info *ev_info = event->md.user_ptr;
+       struct lnet_ping_buffer *pbuf;
+
+       /* TODO: remove assert */
+       LASSERT(event->type == LNET_EVENT_REPLY ||
+               event->type == LNET_EVENT_SEND ||
+               event->type == LNET_EVENT_UNLINK);
+
+       CDEBUG(D_NET, "Received event: %d status: %d\n", event->type,
+              event->status);
+
+       switch (event->type) {
+       case LNET_EVENT_UNLINK:
+               CDEBUG(D_NET, "%s recovery ping unlinked\n",
+                      libcfs_nid2str(ev_info->mt_nid));
+       case LNET_EVENT_REPLY:
+               lnet_handle_recovery_reply(ev_info, event->status);
+               break;
+       case LNET_EVENT_SEND:
+               CDEBUG(D_NET, "%s recovery message sent %s:%d\n",
+                              libcfs_nid2str(ev_info->mt_nid),
+                              (event->status) ? "unsuccessfully" :
+                              "successfully", event->status);
+               break;
+       default:
+               CERROR("Unexpected event: %d\n", event->type);
+               break;
+       }
+       if (event->unlinked) {
+               LIBCFS_FREE(ev_info, sizeof(*ev_info));
+               pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
+               lnet_ping_buffer_decref(pbuf);
+       }
+}
 
-       if (!rc)
-               CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s\n",
-                      libcfs_nid2str(msg->msg_hdr.src_nid),
-                      libcfs_nid2str(msg->msg_txni->ni_nid),
-                      libcfs_nid2str(src_nid),
-                      libcfs_nid2str(msg->msg_hdr.dest_nid),
-                      libcfs_nid2str(dst_nid),
-                      libcfs_nid2str(msg->msg_txpeer->lpni_nid),
-                      lnet_msgtyp2str(msg->msg_type));
+static int
+lnet_rsp_tracker_create(void)
+{
+       struct list_head **rstqs;
+       rstqs = lnet_create_array_of_queues();
 
-       lnet_net_unlock(cpt);
+       if (!rstqs)
+               return -ENOMEM;
 
-       return rc;
+       the_lnet.ln_mt_rstq = rstqs;
+
+       return 0;
 }
 
-int
-lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
+static void
+lnet_rsp_tracker_clean(void)
 {
-       lnet_nid_t              dst_nid = msg->msg_target.nid;
-       int                     rc;
+       lnet_finalize_expired_responses(true);
 
-       /*
-        * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
-        * but we might want to use pre-determined router for ACK/REPLY
-        * in the future
-        */
-       /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
-       LASSERT (msg->msg_txpeer == NULL);
-       LASSERT (!msg->msg_sending);
-       LASSERT (!msg->msg_target_is_router);
-       LASSERT (!msg->msg_receiving);
+       cfs_percpt_free(the_lnet.ln_mt_rstq);
+       the_lnet.ln_mt_rstq = NULL;
+}
 
-       msg->msg_sending = 1;
+int lnet_monitor_thr_start(void)
+{
+       int rc = 0;
+       struct task_struct *task;
 
-       LASSERT(!msg->msg_tx_committed);
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_SHUTDOWN)
+               return -EALREADY;
 
-       rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid);
-       if (rc < 0)
+       rc = lnet_resendqs_create();
+       if (rc)
                return rc;
 
-       if (rc == LNET_CREDIT_OK)
-               lnet_ni_send(msg->msg_txni, msg);
+       rc = lnet_rsp_tracker_create();
+       if (rc)
+               goto clean_queues;
+
+       rc = LNetEQAlloc(0, lnet_mt_event_handler, &the_lnet.ln_mt_eqh);
+       if (rc != 0) {
+               CERROR("Can't allocate monitor thread EQ: %d\n", rc);
+               goto clean_queues;
+       }
+
+       /* Pre monitor thread start processing */
+       rc = lnet_router_pre_mt_start();
+       if (rc)
+               goto free_mem;
+
+       sema_init(&the_lnet.ln_mt_signal, 0);
+
+       the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING;
+       task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread");
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CERROR("Can't start monitor thread: %d\n", rc);
+               goto clean_thread;
+       }
+
+       /* post monitor thread start processing */
+       lnet_router_post_mt_start();
 
-       /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */
        return 0;
+
+clean_thread:
+       the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
+       /* block until event callback signals exit */
+       down(&the_lnet.ln_mt_signal);
+       /* clean up */
+       lnet_router_cleanup();
+free_mem:
+       the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+       lnet_rsp_tracker_clean();
+       lnet_clean_local_ni_recoveryq();
+       lnet_clean_peer_ni_recoveryq();
+       lnet_clean_resendqs();
+       LNetEQFree(the_lnet.ln_mt_eqh);
+       LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
+       return rc;
+clean_queues:
+       lnet_rsp_tracker_clean();
+       lnet_clean_local_ni_recoveryq();
+       lnet_clean_peer_ni_recoveryq();
+       lnet_clean_resendqs();
+       return rc;
+}
+
+void lnet_monitor_thr_stop(void)
+{
+       int rc;
+
+       if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
+               return;
+
+       LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
+       the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
+
+       /* tell the monitor thread that we're shutting down */
+       wake_up(&the_lnet.ln_mt_waitq);
+
+       /* block until monitor thread signals that it's done */
+       down(&the_lnet.ln_mt_signal);
+       LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN);
+
+       /* perform cleanup tasks */
+       lnet_router_cleanup();
+       lnet_rsp_tracker_clean();
+       lnet_clean_local_ni_recoveryq();
+       lnet_clean_peer_ni_recoveryq();
+       lnet_clean_resendqs();
+       rc = LNetEQFree(the_lnet.ln_mt_eqh);
+       LASSERT(rc == 0);
+       return;
 }
 
 void
@@ -2313,13 +3749,13 @@ lnet_parse_get(struct lnet_ni *ni, struct lnet_msg *msg, int rdma_get)
 static int
 lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg)
 {
-       void             *private = msg->msg_private;
-       struct lnet_hdr  *hdr = &msg->msg_hdr;
+       void *private = msg->msg_private;
+       struct lnet_hdr *hdr = &msg->msg_hdr;
        struct lnet_process_id src = {0};
-       struct lnet_libmd        *md;
-       int               rlength;
-       int               mlength;
-       int                     cpt;
+       struct lnet_libmd *md;
+       int rlength;
+       int mlength;
+       int cpt;
 
        cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
        lnet_res_lock(cpt);
@@ -2380,10 +3816,10 @@ lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg)
 static int
 lnet_parse_ack(struct lnet_ni *ni, struct lnet_msg *msg)
 {
-       struct lnet_hdr  *hdr = &msg->msg_hdr;
+       struct lnet_hdr *hdr = &msg->msg_hdr;
        struct lnet_process_id src = {0};
-       struct lnet_libmd        *md;
-       int                     cpt;
+       struct lnet_libmd *md;
+       int cpt;
 
        src.nid = hdr->src_nid;
        src.pid = hdr->src_pid;
@@ -2699,7 +4135,7 @@ lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
        }
 
        if (!list_empty(&the_lnet.ln_drop_rules) &&
-           lnet_drop_rule_match(hdr)) {
+           lnet_drop_rule_match(hdr, NULL)) {
                CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate"
                              "silent message loss\n",
                       libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
@@ -2849,6 +4285,8 @@ lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
                lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt,
                                  msg->msg_private, msg->msg_len,
                                  msg->msg_type);
+
+               msg->msg_no_resend = true;
                /*
                 * NB: message will not generate event because w/o attached MD,
                 * but we still should give error code so lnet_msg_decommit()
@@ -2891,6 +4329,43 @@ lnet_recv_delayed_msg_list(struct list_head *head)
        }
 }
 
+static void
+lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt,
+                       struct lnet_libmd *md, struct lnet_handle_md mdh)
+{
+       s64 timeout_ns;
+
+       /*
+        * MD has a refcount taken by message so it's not going away.
+        * The MD however can be looked up. We need to secure the access
+        * to the md_rspt_ptr by taking the res_lock.
+        * The rspt can be accessed without protection up to when it gets
+        * added to the list.
+        */
+
+       /* debug code */
+       LASSERT(md->md_rspt_ptr == NULL);
+
+       /* we'll use that same event in case we never get a response  */
+       rspt->rspt_mdh = mdh;
+       rspt->rspt_cpt = cpt;
+       timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
+       rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns);
+
+       lnet_res_lock(cpt);
+       /* store the rspt so we can access it when we get the REPLY */
+       md->md_rspt_ptr = rspt;
+       lnet_res_unlock(cpt);
+
+       /*
+        * add to the list of tracked responses. It's added to tail of the
+        * list in order to expire all the older entries first.
+        */
+       lnet_net_lock(cpt);
+       list_add_tail(&rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]);
+       lnet_net_unlock(cpt);
+}
+
 /**
  * Initiate an asynchronous PUT operation.
  *
@@ -2941,10 +4416,11 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
        __u64 match_bits, unsigned int offset,
        __u64 hdr_data)
 {
-       struct lnet_msg         *msg;
-       struct lnet_libmd       *md;
-       int                     cpt;
-       int                     rc;
+       struct lnet_msg *msg;
+       struct lnet_libmd *md;
+       int cpt;
+       int rc;
+       struct lnet_rsp_tracker *rspt = NULL;
 
        LASSERT(the_lnet.ln_refcount > 0);
 
@@ -2964,6 +4440,17 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
        msg->msg_vmflush = !!memory_pressure_get();
 
        cpt = lnet_cpt_of_cookie(mdh.cookie);
+
+       if (ack == LNET_ACK_REQ) {
+               rspt = lnet_rspt_alloc(cpt);
+               if (!rspt) {
+                       CERROR("Dropping PUT to %s: ENOMEM on response tracker\n",
+                               libcfs_id2str(target));
+                       return -ENOMEM;
+               }
+               INIT_LIST_HEAD(&rspt->rspt_on_list);
+       }
+
        lnet_res_lock(cpt);
 
        md = lnet_handle2md(&mdh);
@@ -2976,6 +4463,7 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
                               md->md_me->me_portal);
                lnet_res_unlock(cpt);
 
+               LIBCFS_FREE(rspt, sizeof(*rspt));
                lnet_msg_free(msg);
                return -ENOENT;
        }
@@ -3008,10 +4496,15 @@ LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
+       if (ack == LNET_ACK_REQ)
+               lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
+
        rc = lnet_send(self, msg, LNET_NID_ANY);
        if (rc != 0) {
                CNETERR("Error sending PUT to %s: %d\n",
                        libcfs_id2str(target), rc);
+               msg->msg_no_resend = true;
+               lnet_detach_rsp_tracker(msg->msg_md, cpt);
                lnet_finalize(msg, rc);
        }
 
@@ -3141,12 +4634,13 @@ EXPORT_SYMBOL(lnet_set_reply_msg_len);
 int
 LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
        struct lnet_process_id target, unsigned int portal,
-       __u64 match_bits, unsigned int offset)
+       __u64 match_bits, unsigned int offset, bool recovery)
 {
-       struct lnet_msg         *msg;
-       struct lnet_libmd       *md;
-       int                     cpt;
-       int                     rc;
+       struct lnet_msg *msg;
+       struct lnet_libmd *md;
+       struct lnet_rsp_tracker *rspt;
+       int cpt;
+       int rc;
 
        LASSERT(the_lnet.ln_refcount > 0);
 
@@ -3159,13 +4653,24 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
        }
 
        msg = lnet_msg_alloc();
-       if (msg == NULL) {
+       if (!msg) {
                CERROR("Dropping GET to %s: ENOMEM on struct lnet_msg\n",
                       libcfs_id2str(target));
                return -ENOMEM;
        }
 
        cpt = lnet_cpt_of_cookie(mdh.cookie);
+
+       rspt = lnet_rspt_alloc(cpt);
+       if (!rspt) {
+               CERROR("Dropping GET to %s: ENOMEM on response tracker\n",
+                      libcfs_id2str(target));
+               return -ENOMEM;
+       }
+       INIT_LIST_HEAD(&rspt->rspt_on_list);
+
+       msg->msg_recovery = recovery;
+
        lnet_res_lock(cpt);
 
        md = lnet_handle2md(&mdh);
@@ -3180,6 +4685,7 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
                lnet_res_unlock(cpt);
 
                lnet_msg_free(msg);
+               LIBCFS_FREE(rspt, sizeof(*rspt));
                return -ENOENT;
        }
 
@@ -3204,10 +4710,14 @@ LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
+       lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
+
        rc = lnet_send(self, msg, LNET_NID_ANY);
        if (rc < 0) {
                CNETERR("Error sending GET to %s: %d\n",
                        libcfs_id2str(target), rc);
+               msg->msg_no_resend = true;
+               lnet_detach_rsp_tracker(msg->msg_md, cpt);
                lnet_finalize(msg, rc);
        }
 
index 383232d..8fed27f 100644 (file)
@@ -143,13 +143,17 @@ lnet_msg_commit(struct lnet_msg *msg, int cpt)
 {
        struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt];
        struct lnet_counters *counters = the_lnet.ln_counters[cpt];
+       s64 timeout_ns;
+
+       /* set the message deadline */
+       timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
+       msg->msg_deadline = ktime_add_ns(ktime_get(), timeout_ns);
 
        /* routed message can be committed for both receiving and sending */
        LASSERT(!msg->msg_tx_committed);
 
        if (msg->msg_sending) {
                LASSERT(!msg->msg_receiving);
-
                msg->msg_tx_cpt = cpt;
                msg->msg_tx_committed = 1;
                if (msg->msg_rx_committed) { /* routed message REPLY */
@@ -163,8 +167,9 @@ lnet_msg_commit(struct lnet_msg *msg, int cpt)
        }
 
        LASSERT(!msg->msg_onactivelist);
+
        msg->msg_onactivelist = 1;
-       list_add(&msg->msg_activelist, &container->msc_active);
+       list_add_tail(&msg->msg_activelist, &container->msc_active);
 
        counters->msgs_alloc++;
        if (counters->msgs_alloc > counters->msgs_max)
@@ -456,14 +461,395 @@ lnet_complete_msg_locked(struct lnet_msg *msg, int cpt)
        return 0;
 }
 
+static void
+lnet_dec_healthv_locked(atomic_t *healthv)
+{
+       int h = atomic_read(healthv);
+
+       if (h < lnet_health_sensitivity) {
+               atomic_set(healthv, 0);
+       } else {
+               h -= lnet_health_sensitivity;
+               atomic_set(healthv, h);
+       }
+}
+
+static void
+lnet_handle_local_failure(struct lnet_msg *msg)
+{
+       struct lnet_ni *local_ni;
+
+       local_ni = msg->msg_txni;
+
+       /*
+        * the lnet_net_lock(0) is used to protect the addref on the ni
+        * and the recovery queue.
+        */
+       lnet_net_lock(0);
+       /* the mt could've shutdown and cleaned up the queues */
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
+               lnet_net_unlock(0);
+               return;
+       }
+
+       lnet_dec_healthv_locked(&local_ni->ni_healthv);
+       /*
+        * add the NI to the recovery queue if it's not already there
+        * and it's health value is actually below the maximum. It's
+        * possible that the sensitivity might be set to 0, and the health
+        * value will not be reduced. In this case, there is no reason to
+        * invoke recovery
+        */
+       if (list_empty(&local_ni->ni_recovery) &&
+           atomic_read(&local_ni->ni_healthv) < LNET_MAX_HEALTH_VALUE) {
+               CERROR("ni %s added to recovery queue. Health = %d\n",
+                       libcfs_nid2str(local_ni->ni_nid),
+                       atomic_read(&local_ni->ni_healthv));
+               list_add_tail(&local_ni->ni_recovery,
+                             &the_lnet.ln_mt_localNIRecovq);
+               lnet_ni_addref_locked(local_ni, 0);
+       }
+       lnet_net_unlock(0);
+}
+
+static void
+lnet_handle_remote_failure(struct lnet_msg *msg)
+{
+       struct lnet_peer_ni *lpni;
+
+       lpni = msg->msg_txpeer;
+
+       /* lpni could be NULL if we're in the LOLND case */
+       if (!lpni)
+               return;
+
+       lnet_net_lock(0);
+       lnet_dec_healthv_locked(&lpni->lpni_healthv);
+       /*
+        * add the peer NI to the recovery queue if it's not already there
+        * and it's health value is actually below the maximum. It's
+        * possible that the sensitivity might be set to 0, and the health
+        * value will not be reduced. In this case, there is no reason to
+        * invoke recovery
+        */
+       lnet_peer_ni_add_to_recoveryq_locked(lpni);
+       lnet_net_unlock(0);
+}
+
+static void
+lnet_incr_hstats(struct lnet_msg *msg, enum lnet_msg_hstatus hstatus)
+{
+       struct lnet_ni *ni = msg->msg_txni;
+       struct lnet_peer_ni *lpni = msg->msg_txpeer;
+       struct lnet_counters *counters = the_lnet.ln_counters[0];
+
+       switch (hstatus) {
+       case LNET_MSG_STATUS_LOCAL_INTERRUPT:
+               atomic_inc(&ni->ni_hstats.hlt_local_interrupt);
+               counters->local_interrupt_count++;
+               break;
+       case LNET_MSG_STATUS_LOCAL_DROPPED:
+               atomic_inc(&ni->ni_hstats.hlt_local_dropped);
+               counters->local_dropped_count++;
+               break;
+       case LNET_MSG_STATUS_LOCAL_ABORTED:
+               atomic_inc(&ni->ni_hstats.hlt_local_aborted);
+               counters->local_aborted_count++;
+               break;
+       case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
+               atomic_inc(&ni->ni_hstats.hlt_local_no_route);
+               counters->local_no_route_count++;
+               break;
+       case LNET_MSG_STATUS_LOCAL_TIMEOUT:
+               atomic_inc(&ni->ni_hstats.hlt_local_timeout);
+               counters->local_timeout_count++;
+               break;
+       case LNET_MSG_STATUS_LOCAL_ERROR:
+               atomic_inc(&ni->ni_hstats.hlt_local_error);
+               counters->local_error_count++;
+               break;
+       case LNET_MSG_STATUS_REMOTE_DROPPED:
+               if (lpni)
+                       atomic_inc(&lpni->lpni_hstats.hlt_remote_dropped);
+               counters->remote_dropped_count++;
+               break;
+       case LNET_MSG_STATUS_REMOTE_ERROR:
+               if (lpni)
+                       atomic_inc(&lpni->lpni_hstats.hlt_remote_error);
+               counters->remote_error_count++;
+               break;
+       case LNET_MSG_STATUS_REMOTE_TIMEOUT:
+               if (lpni)
+                       atomic_inc(&lpni->lpni_hstats.hlt_remote_timeout);
+               counters->remote_timeout_count++;
+               break;
+       case LNET_MSG_STATUS_NETWORK_TIMEOUT:
+               if (lpni)
+                       atomic_inc(&lpni->lpni_hstats.hlt_network_timeout);
+               counters->network_timeout_count++;
+               break;
+       case LNET_MSG_STATUS_OK:
+               break;
+       default:
+               LBUG();
+       }
+}
+
+/*
+ * Do a health check on the message:
+ * return -1 if we're not going to handle the error or
+ *   if we've reached the maximum number of retries.
+ *   success case will return -1 as well
+ * return 0 if it the message is requeued for send
+ */
+static int
+lnet_health_check(struct lnet_msg *msg)
+{
+       enum lnet_msg_hstatus hstatus = msg->msg_health_status;
+       bool lo = false;
+
+       /* if we're shutting down no point in handling health. */
+       if (the_lnet.ln_state != LNET_STATE_RUNNING)
+               return -1;
+
+       LASSERT(msg->msg_txni);
+
+       /*
+        * if we're sending to the LOLND then the msg_txpeer will not be
+        * set. So no need to sanity check it.
+        */
+       if (LNET_NETTYP(LNET_NIDNET(msg->msg_txni->ni_nid)) != LOLND)
+               LASSERT(msg->msg_txpeer);
+       else
+               lo = true;
+
+       if (hstatus != LNET_MSG_STATUS_OK &&
+           ktime_compare(ktime_get(), msg->msg_deadline) >= 0)
+               return -1;
+
+       /*
+        * stats are only incremented for errors so avoid wasting time
+        * incrementing statistics if there is no error.
+        */
+       if (hstatus != LNET_MSG_STATUS_OK) {
+               lnet_net_lock(0);
+               lnet_incr_hstats(msg, hstatus);
+               lnet_net_unlock(0);
+       }
+
+       CDEBUG(D_NET, "health check: %s->%s: %s: %s\n",
+              libcfs_nid2str(msg->msg_txni->ni_nid),
+              (lo) ? "self" : libcfs_nid2str(msg->msg_txpeer->lpni_nid),
+              lnet_msgtyp2str(msg->msg_type),
+              lnet_health_error2str(hstatus));
+
+       switch (hstatus) {
+       case LNET_MSG_STATUS_OK:
+               lnet_inc_healthv(&msg->msg_txni->ni_healthv);
+               /*
+                * It's possible msg_txpeer is NULL in the LOLND
+                * case.
+                */
+               if (msg->msg_txpeer)
+                       lnet_inc_healthv(&msg->msg_txpeer->lpni_healthv);
+
+               /* we can finalize this message */
+               return -1;
+       case LNET_MSG_STATUS_LOCAL_INTERRUPT:
+       case LNET_MSG_STATUS_LOCAL_DROPPED:
+       case LNET_MSG_STATUS_LOCAL_ABORTED:
+       case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
+       case LNET_MSG_STATUS_LOCAL_TIMEOUT:
+               lnet_handle_local_failure(msg);
+               /* add to the re-send queue */
+               goto resend;
+
+       /*
+        * These errors will not trigger a resend so simply
+        * finalize the message
+        */
+       case LNET_MSG_STATUS_LOCAL_ERROR:
+               lnet_handle_local_failure(msg);
+               return -1;
+
+       /*
+        * TODO: since the remote dropped the message we can
+        * attempt a resend safely.
+        */
+       case LNET_MSG_STATUS_REMOTE_DROPPED:
+               lnet_handle_remote_failure(msg);
+               goto resend;
+
+       case LNET_MSG_STATUS_REMOTE_ERROR:
+       case LNET_MSG_STATUS_REMOTE_TIMEOUT:
+       case LNET_MSG_STATUS_NETWORK_TIMEOUT:
+               lnet_handle_remote_failure(msg);
+               return -1;
+       default:
+               LBUG();
+       }
+
+resend:
+       /* don't resend recovery messages */
+       if (msg->msg_recovery)
+               return -1;
+
+       /*
+        * if we explicitly indicated we don't want to resend then just
+        * return
+        */
+       if (msg->msg_no_resend)
+               return -1;
+
+       /* check if the message has exceeded the number of retries */
+       if (msg->msg_retry_count >= lnet_retry_count)
+               return -1;
+       msg->msg_retry_count++;
+
+       lnet_net_lock(msg->msg_tx_cpt);
+
+       /*
+        * remove message from the active list and reset it in preparation
+        * for a resend. Two exception to this
+        *
+        * 1. the router case, whe a message is committed for rx when
+        * received, then tx when it is sent. When committed to both tx and
+        * rx we don't want to remove it from the active list.
+        *
+        * 2. The REPLY case since it uses the same msg block for the GET
+        * that was received.
+        */
+       if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) {
+               list_del_init(&msg->msg_activelist);
+               msg->msg_onactivelist = 0;
+       }
+       /*
+        * The msg_target.nid which was originally set
+        * when calling LNetGet() or LNetPut() might've
+        * been overwritten if we're routing this message.
+        * Call lnet_return_tx_credits_locked() to return
+        * the credit this message consumed. The message will
+        * consume another credit when it gets resent.
+        */
+       msg->msg_target.nid = msg->msg_hdr.dest_nid;
+       lnet_msg_decommit_tx(msg, -EAGAIN);
+       msg->msg_sending = 0;
+       msg->msg_receiving = 0;
+       msg->msg_target_is_router = 0;
+
+       CDEBUG(D_NET, "%s->%s:%s:%s - queuing for resend\n",
+              libcfs_nid2str(msg->msg_hdr.src_nid),
+              libcfs_nid2str(msg->msg_hdr.dest_nid),
+              lnet_msgtyp2str(msg->msg_type),
+              lnet_health_error2str(hstatus));
+
+       list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]);
+       lnet_net_unlock(msg->msg_tx_cpt);
+
+       wake_up(&the_lnet.ln_mt_waitq);
+       return 0;
+}
+
+static void
+lnet_detach_md(struct lnet_msg *msg, int status)
+{
+       int cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
+
+       lnet_res_lock(cpt);
+       lnet_msg_detach_md(msg, status);
+       lnet_res_unlock(cpt);
+}
+
+static bool
+lnet_is_health_check(struct lnet_msg *msg)
+{
+       bool hc;
+       int status = msg->msg_ev.status;
+
+       /*
+        * perform a health check for any message committed for transmit
+        */
+       hc = msg->msg_tx_committed;
+
+       /* Check for status inconsistencies */
+       if (hc &&
+           ((!status && msg->msg_health_status != LNET_MSG_STATUS_OK) ||
+            (status && msg->msg_health_status == LNET_MSG_STATUS_OK))) {
+               CERROR("Msg is in inconsistent state, don't perform health "
+                      "checking (%d, %d)\n", status, msg->msg_health_status);
+               hc = false;
+       }
+
+       CDEBUG(D_NET, "health check = %d, status = %d, hstatus = %d\n",
+              hc, status, msg->msg_health_status);
+
+       return hc;
+}
+
+char *
+lnet_health_error2str(enum lnet_msg_hstatus hstatus)
+{
+       switch (hstatus) {
+       case LNET_MSG_STATUS_LOCAL_INTERRUPT:
+               return "LOCAL_INTERRUPT";
+       case LNET_MSG_STATUS_LOCAL_DROPPED:
+               return "LOCAL_DROPPED";
+       case LNET_MSG_STATUS_LOCAL_ABORTED:
+               return "LOCAL_ABORTED";
+       case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
+               return "LOCAL_NO_ROUTE";
+       case LNET_MSG_STATUS_LOCAL_TIMEOUT:
+               return "LOCAL_TIMEOUT";
+       case LNET_MSG_STATUS_LOCAL_ERROR:
+               return "LOCAL_ERROR";
+       case LNET_MSG_STATUS_REMOTE_DROPPED:
+               return "REMOTE_DROPPED";
+       case LNET_MSG_STATUS_REMOTE_ERROR:
+               return "REMOTE_ERROR";
+       case LNET_MSG_STATUS_REMOTE_TIMEOUT:
+               return "REMOTE_TIMEOUT";
+       case LNET_MSG_STATUS_NETWORK_TIMEOUT:
+               return "NETWORK_TIMEOUT";
+       case LNET_MSG_STATUS_OK:
+               return "OK";
+       default:
+               return "<UNKNOWN>";
+       }
+}
+
+bool
+lnet_send_error_simulation(struct lnet_msg *msg,
+                          enum lnet_msg_hstatus *hstatus)
+{
+       if (!msg)
+               return false;
+
+       if (list_empty(&the_lnet.ln_drop_rules))
+           return false;
+
+       /* match only health rules */
+       if (!lnet_drop_rule_match(&msg->msg_hdr, hstatus))
+               return false;
+
+       CDEBUG(D_NET, "src %s, dst %s: %s simulate health error: %s\n",
+               libcfs_nid2str(msg->msg_hdr.src_nid),
+               libcfs_nid2str(msg->msg_hdr.dest_nid),
+               lnet_msgtyp2str(msg->msg_type),
+               lnet_health_error2str(*hstatus));
+
+       return true;
+}
+EXPORT_SYMBOL(lnet_send_error_simulation);
+
 void
 lnet_finalize(struct lnet_msg *msg, int status)
 {
-       struct lnet_msg_container       *container;
-       int                             my_slot;
-       int                             cpt;
-       int                             rc;
-       int                             i;
+       struct lnet_msg_container *container;
+       int my_slot;
+       int cpt;
+       int rc;
+       int i;
+       bool hc;
 
        LASSERT(!in_interrupt());
 
@@ -472,15 +858,38 @@ lnet_finalize(struct lnet_msg *msg, int status)
 
        msg->msg_ev.status = status;
 
-       if (msg->msg_md != NULL) {
+       /*
+        * if this is an ACK or a REPLY then make sure to remove the
+        * response tracker.
+        */
+       if (msg->msg_ev.type == LNET_EVENT_REPLY ||
+           msg->msg_ev.type == LNET_EVENT_ACK) {
                cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
-
-               lnet_res_lock(cpt);
-               lnet_msg_detach_md(msg, status);
-               lnet_res_unlock(cpt);
+               lnet_detach_rsp_tracker(msg->msg_md, cpt);
        }
 
- again:
+       /* if the message is successfully sent, no need to keep the MD around */
+       if (msg->msg_md != NULL && !status)
+               lnet_detach_md(msg, status);
+
+again:
+       hc = lnet_is_health_check(msg);
+
+       /*
+        * the MD would've been detached from the message if it was
+        * successfully sent. However, if it wasn't successfully sent the
+        * MD would be around. And since we recalculate whether to
+        * health check or not, it's possible that we change our minds and
+        * we don't want to health check this message. In this case also
+        * free the MD.
+        *
+        * If the message is successful we're going to
+        * go through the lnet_health_check() function, but that'll just
+        * increment the appropriate health value and return.
+        */
+       if (msg->msg_md != NULL && !hc)
+               lnet_detach_md(msg, status);
+
        rc = 0;
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
                /* not committed to network yet */
@@ -489,6 +898,30 @@ lnet_finalize(struct lnet_msg *msg, int status)
                return;
        }
 
+       if (hc) {
+               /*
+                * Check the health status of the message. If it has one
+                * of the errors that we're supposed to handle, and it has
+                * not timed out, then
+                *      1. Decrement the appropriate health_value
+                *      2. queue the message on the resend queue
+
+                * if the message send is success, timed out or failed in the
+                * health check for any reason then we'll just finalize the
+                * message. Otherwise just return since the message has been
+                * put on the resend queue.
+                */
+               if (!lnet_health_check(msg))
+                       return;
+
+               /*
+                * if we get here then we need to clean up the md because we're
+                * finalizing the message.
+               */
+               if (msg->msg_md != NULL)
+                       lnet_detach_md(msg, status);
+       }
+
        /*
         * NB: routed message can be committed for both receiving and sending,
         * we should finalize in LIFO order and keep counters correct.
@@ -523,7 +956,7 @@ lnet_finalize(struct lnet_msg *msg, int status)
                msg = list_entry(container->msc_finalizing.next,
                                 struct lnet_msg, msg_list);
 
-               list_del(&msg->msg_list);
+               list_del_init(&msg->msg_list);
 
                /* NB drops and regains the lnet lock if it actually does
                 * anything, so my finalizing friends can chomp along too */
@@ -561,7 +994,7 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container)
                                  struct lnet_msg, msg_activelist);
                LASSERT(msg->msg_onactivelist);
                msg->msg_onactivelist = 0;
-               list_del(&msg->msg_activelist);
+               list_del_init(&msg->msg_activelist);
                lnet_msg_free(msg);
                count++;
        }
index 05daed2..04c98d5 100644 (file)
@@ -294,13 +294,58 @@ lnet_drop_rule_reset(void)
        EXIT;
 }
 
+static void
+lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
+{
+       unsigned int random;
+       int choice;
+       int delta;
+       int best_delta;
+       int i;
+
+       /* assign a random failure */
+       random = cfs_rand();
+       choice = random % (LNET_MSG_STATUS_END - LNET_MSG_STATUS_OK);
+       if (choice == 0)
+               choice++;
+
+       if (mask == HSTATUS_RANDOM) {
+               *hstatus = choice;
+               return;
+       }
+
+       if (mask & (1 << choice)) {
+               *hstatus = choice;
+               return;
+       }
+
+       /* round to the closest ON bit */
+       i = HSTATUS_END;
+       best_delta = HSTATUS_END;
+       while (i > 0) {
+               if (mask & (1 << i)) {
+                       delta = choice - i;
+                       if (delta < 0)
+                               delta *= -1;
+                       if (delta < best_delta) {
+                               best_delta = delta;
+                               choice = i;
+                       }
+               }
+               i--;
+       }
+
+       *hstatus = choice;
+}
+
 /**
  * check source/destination NID, portal, message type and drop rate,
  * decide whether should drop this message or not
  */
 static bool
 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
-               lnet_nid_t dst, unsigned int type, unsigned int portal)
+               lnet_nid_t dst, unsigned int type, unsigned int portal,
+               enum lnet_msg_hstatus *hstatus)
 {
        struct lnet_fault_attr  *attr = &rule->dr_attr;
        bool                     drop;
@@ -308,9 +353,23 @@ drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
        if (!lnet_fault_attr_match(attr, src, dst, type, portal))
                return false;
 
+       /*
+        * if we're trying to match a health status error but it hasn't
+        * been set in the rule, then don't match
+        */
+       if ((hstatus && !attr->u.drop.da_health_error_mask) ||
+           (!hstatus && attr->u.drop.da_health_error_mask))
+               return false;
+
        /* match this rule, check drop rate now */
        spin_lock(&rule->dr_lock);
-       if (rule->dr_drop_time != 0) { /* time based drop */
+       if (attr->u.drop.da_random) {
+               int value = cfs_rand() % attr->u.drop.da_interval;
+               if (value >= (attr->u.drop.da_interval / 2))
+                       drop = true;
+               else
+                       drop = false;
+       } else if (rule->dr_drop_time != 0) { /* time based drop */
                time64_t now = ktime_get_seconds();
 
                rule->dr_stat.fs_count++;
@@ -344,6 +403,9 @@ drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
        }
 
        if (drop) { /* drop this message, update counters */
+               if (hstatus)
+                       lnet_fault_match_health(hstatus,
+                               attr->u.drop.da_health_error_mask);
                lnet_fault_stat_inc(&rule->dr_stat, type);
                rule->dr_stat.u.drop.ds_dropped++;
        }
@@ -356,15 +418,15 @@ drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
  * Check if message from \a src to \a dst can match any existed drop rule
  */
 bool
-lnet_drop_rule_match(struct lnet_hdr *hdr)
+lnet_drop_rule_match(struct lnet_hdr *hdr, enum lnet_msg_hstatus *hstatus)
 {
-       struct lnet_drop_rule   *rule;
-       lnet_nid_t               src = le64_to_cpu(hdr->src_nid);
-       lnet_nid_t               dst = le64_to_cpu(hdr->dest_nid);
-       unsigned int             typ = le32_to_cpu(hdr->type);
-       unsigned int             ptl = -1;
-       bool                     drop = false;
-       int                      cpt;
+       lnet_nid_t src = le64_to_cpu(hdr->src_nid);
+       lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
+       unsigned int typ = le32_to_cpu(hdr->type);
+       struct lnet_drop_rule *rule;
+       unsigned int ptl = -1;
+       bool drop = false;
+       int cpt;
 
        /* NB: if Portal is specified, then only PUT and GET will be
         * filtered by drop rule */
@@ -375,12 +437,13 @@ lnet_drop_rule_match(struct lnet_hdr *hdr)
 
        cpt = lnet_net_lock_current();
        list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
-               drop = drop_rule_match(rule, src, dst, typ, ptl);
+               drop = drop_rule_match(rule, src, dst, typ, ptl,
+                                      hstatus);
                if (drop)
                        break;
        }
-
        lnet_net_unlock(cpt);
+
        return drop;
 }
 
index f2b0819..6d32f19 100644 (file)
@@ -166,6 +166,7 @@ lnet_peer_ni_alloc(lnet_nid_t nid)
        INIT_LIST_HEAD(&lpni->lpni_routes);
        INIT_LIST_HEAD(&lpni->lpni_hashlist);
        INIT_LIST_HEAD(&lpni->lpni_peer_nis);
+       INIT_LIST_HEAD(&lpni->lpni_recovery);
        INIT_LIST_HEAD(&lpni->lpni_on_remote_peer_ni_list);
 
        spin_lock_init(&lpni->lpni_lock);
@@ -175,7 +176,7 @@ lnet_peer_ni_alloc(lnet_nid_t nid)
        lpni->lpni_ping_feats = LNET_PING_FEAT_INVAL;
        lpni->lpni_nid = nid;
        lpni->lpni_cpt = cpt;
-       lnet_set_peer_ni_health_locked(lpni, true);
+       atomic_set(&lpni->lpni_healthv, LNET_MAX_HEALTH_VALUE);
 
        net = lnet_get_net_locked(LNET_NIDNET(nid));
        lpni->lpni_net = net;
@@ -374,6 +375,14 @@ lnet_peer_ni_del_locked(struct lnet_peer_ni *lpni)
        /* remove peer ni from the hash list. */
        list_del_init(&lpni->lpni_hashlist);
 
+       /*
+        * indicate the peer is being deleted so the monitor thread can
+        * remove it from the recovery queue.
+        */
+       spin_lock(&lpni->lpni_lock);
+       lpni->lpni_state |= LNET_PEER_NI_DELETING;
+       spin_unlock(&lpni->lpni_lock);
+
        /* decrement the ref count on the peer table */
        ptable = the_lnet.ln_peer_tables[lpni->lpni_cpt];
        LASSERT(ptable->pt_number > 0);
@@ -2691,8 +2700,6 @@ static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
        /* Look for a direct-connected NID for this peer. */
        lpni = NULL;
        while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
-               if (!lnet_is_peer_ni_healthy_locked(lpni))
-                       continue;
                if (!lnet_get_net_locked(lpni->lpni_peer_net->lpn_net_id))
                        continue;
                break;
@@ -2703,8 +2710,6 @@ static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
        /* Look for a routed-connected NID for this peer. */
        lpni = NULL;
        while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
-               if (!lnet_is_peer_ni_healthy_locked(lpni))
-                       continue;
                if (!lnet_find_rnet_locked(lpni->lpni_peer_net->lpn_net_id))
                        continue;
                break;
@@ -2719,9 +2724,7 @@ static lnet_nid_t lnet_peer_select_nid(struct lnet_peer *lp)
 static int lnet_peer_send_ping(struct lnet_peer *lp)
 __must_hold(&lp->lp_lock)
 {
-       struct lnet_md md = { NULL };
-       struct lnet_process_id id;
-       struct lnet_ping_buffer *pbuf;
+       lnet_nid_t pnid;
        int nnis;
        int rc;
        int cpt;
@@ -2730,55 +2733,37 @@ __must_hold(&lp->lp_lock)
        lp->lp_state &= ~LNET_PEER_FORCE_PING;
        spin_unlock(&lp->lp_lock);
 
-       nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN);
-       pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
-       if (!pbuf) {
-               rc = -ENOMEM;
-               goto fail_error;
-       }
-
-       /* initialize md content */
-       md.start     = &pbuf->pb_info;
-       md.length    = LNET_PING_INFO_SIZE(nnis);
-       md.threshold = 2; /* GET/REPLY */
-       md.max_size  = 0;
-       md.options   = LNET_MD_TRUNCATE;
-       md.user_ptr  = lp;
-       md.eq_handle = the_lnet.ln_dc_eqh;
-
-       rc = LNetMDBind(md, LNET_UNLINK, &lp->lp_ping_mdh);
-       if (rc != 0) {
-               lnet_ping_buffer_decref(pbuf);
-               CERROR("Can't bind MD: %d\n", rc);
-               goto fail_error;
-       }
        cpt = lnet_net_lock_current();
        /* Refcount for MD. */
        lnet_peer_addref_locked(lp);
-       id.pid = LNET_PID_LUSTRE;
-       id.nid = lnet_peer_select_nid(lp);
+       pnid = lnet_peer_select_nid(lp);
        lnet_net_unlock(cpt);
 
-       if (id.nid == LNET_NID_ANY) {
-               rc = -EHOSTUNREACH;
-               goto fail_unlink_md;
-       }
+       nnis = MAX(lp->lp_data_nnis, LNET_INTERFACES_MIN);
 
-       rc = LNetGet(LNET_NID_ANY, lp->lp_ping_mdh, id,
-                    LNET_RESERVED_PORTAL,
-                    LNET_PROTO_PING_MATCHBITS, 0);
+       rc = lnet_send_ping(pnid, &lp->lp_ping_mdh, nnis, lp,
+                           the_lnet.ln_dc_eqh, false);
 
-       if (rc)
-               goto fail_unlink_md;
+       /*
+        * if LNetMDBind in lnet_send_ping fails we need to decrement the
+        * refcount on the peer, otherwise LNetMDUnlink will be called
+        * which will eventually do that.
+        */
+       if (rc > 0) {
+               lnet_net_lock(cpt);
+               lnet_peer_decref_locked(lp);
+               lnet_net_unlock(cpt);
+               rc = -rc; /* change the rc to negative value */
+               goto fail_error;
+       } else if (rc < 0) {
+               goto fail_error;
+       }
 
        CDEBUG(D_NET, "peer %s\n", libcfs_nid2str(lp->lp_primary_nid));
 
        spin_lock(&lp->lp_lock);
        return 0;
 
-fail_unlink_md:
-       LNetMDUnlink(lp->lp_ping_mdh);
-       LNetInvalidateMDHandle(&lp->lp_ping_mdh);
 fail_error:
        CDEBUG(D_NET, "peer %s: %d\n", libcfs_nid2str(lp->lp_primary_nid), rc);
        /*
@@ -2945,25 +2930,6 @@ __must_hold(&lp->lp_lock)
 }
 
 /*
- * Returns the first peer on the ln_dc_working queue if its timeout
- * has expired. Takes the current time as an argument so as to not
- * obsessively re-check the clock. The oldest discovery request will
- * be at the head of the queue.
- */
-static struct lnet_peer *lnet_peer_get_dc_timed_out(time64_t now)
-{
-       struct lnet_peer *lp;
-
-       if (list_empty(&the_lnet.ln_dc_working))
-               return NULL;
-       lp = list_first_entry(&the_lnet.ln_dc_working,
-                             struct lnet_peer, lp_dc_list);
-       if (now < lp->lp_last_queued + lnet_transaction_timeout)
-               return NULL;
-       return lp;
-}
-
-/*
  * Discovering this peer is taking too long. Cancel any Ping or Push
  * that discovery is waiting on by unlinking the relevant MDs. The
  * lnet_discovery_event_handler() will proceed from here and complete
@@ -3018,8 +2984,6 @@ static int lnet_peer_discovery_wait_for_work(void)
                        break;
                if (!list_empty(&the_lnet.ln_msg_resend))
                        break;
-               if (lnet_peer_get_dc_timed_out(ktime_get_real_seconds()))
-                       break;
                lnet_net_unlock(cpt);
 
                /*
@@ -3089,7 +3053,6 @@ static void lnet_resend_msgs(void)
 static int lnet_peer_discovery(void *arg)
 {
        struct lnet_peer *lp;
-       time64_t now;
        int rc;
 
        CDEBUG(D_NET, "started\n");
@@ -3126,9 +3089,6 @@ static int lnet_peer_discovery(void *arg)
                         * forever, in case the GET message (for ping)
                         * doesn't get a REPLY or the PUT message (for
                         * push) doesn't get an ACK.
-                        *
-                        * TODO: LNet Health will deal with this scenario
-                        * in a generic way.
                         */
                        lp->lp_last_queued = ktime_get_real_seconds();
                        lnet_net_unlock(LNET_LOCK_EX);
@@ -3181,23 +3141,6 @@ static int lnet_peer_discovery(void *arg)
                                break;
                }
 
-               /*
-                * Now that the ln_dc_request queue has been emptied
-                * check the ln_dc_working queue for peers that are
-                * taking too long. Move all that are found to the
-                * ln_dc_expired queue and time out any pending
-                * Ping or Push. We have to drop the lnet_net_lock
-                * in the loop because lnet_peer_cancel_discovery()
-                * calls LNetMDUnlink().
-                */
-               now = ktime_get_real_seconds();
-               while ((lp = lnet_peer_get_dc_timed_out(now)) != NULL) {
-                       list_move(&lp->lp_dc_list, &the_lnet.ln_dc_expired);
-                       lnet_net_unlock(LNET_LOCK_EX);
-                       lnet_peer_cancel_discovery(lp);
-                       lnet_net_lock(LNET_LOCK_EX);
-               }
-
                lnet_net_unlock(LNET_LOCK_EX);
        }
 
@@ -3400,6 +3343,7 @@ int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
 {
        struct lnet_ioctl_element_stats *lpni_stats;
        struct lnet_ioctl_element_msg_stats *lpni_msg_stats;
+       struct lnet_ioctl_peer_ni_hstats *lpni_hstats;
        struct lnet_peer_ni_credit_info *lpni_info;
        struct lnet_peer_ni *lpni;
        struct lnet_peer *lp;
@@ -3415,7 +3359,7 @@ int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
        }
 
        size = sizeof(nid) + sizeof(*lpni_info) + sizeof(*lpni_stats)
-               + sizeof(*lpni_msg_stats);
+               + sizeof(*lpni_msg_stats) + sizeof(*lpni_hstats);
        size *= lp->lp_nnis;
        if (size > cfg->prcfg_size) {
                cfg->prcfg_size = size;
@@ -3441,6 +3385,9 @@ int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
        LIBCFS_ALLOC(lpni_msg_stats, sizeof(*lpni_msg_stats));
        if (!lpni_msg_stats)
                goto out_free_stats;
+       LIBCFS_ALLOC(lpni_hstats, sizeof(*lpni_hstats));
+       if (!lpni_hstats)
+               goto out_free_msg_stats;
 
 
        lpni = NULL;
@@ -3448,7 +3395,7 @@ int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
        while ((lpni = lnet_get_next_peer_ni_locked(lp, NULL, lpni)) != NULL) {
                nid = lpni->lpni_nid;
                if (copy_to_user(bulk, &nid, sizeof(nid)))
-                       goto out_free_msg_stats;
+                       goto out_free_hstats;
                bulk += sizeof(nid);
 
                memset(lpni_info, 0, sizeof(*lpni_info));
@@ -3467,7 +3414,7 @@ int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
                lpni_info->cr_peer_min_tx_credits = lpni->lpni_mintxcredits;
                lpni_info->cr_peer_tx_qnob = lpni->lpni_txqnob;
                if (copy_to_user(bulk, lpni_info, sizeof(*lpni_info)))
-                       goto out_free_msg_stats;
+                       goto out_free_hstats;
                bulk += sizeof(*lpni_info);
 
                memset(lpni_stats, 0, sizeof(*lpni_stats));
@@ -3478,15 +3425,30 @@ int lnet_get_peer_info(struct lnet_ioctl_peer_cfg *cfg, void __user *bulk)
                lpni_stats->iel_drop_count = lnet_sum_stats(&lpni->lpni_stats,
                                                            LNET_STATS_TYPE_DROP);
                if (copy_to_user(bulk, lpni_stats, sizeof(*lpni_stats)))
-                       goto out_free_msg_stats;
+                       goto out_free_hstats;
                bulk += sizeof(*lpni_stats);
                lnet_usr_translate_stats(lpni_msg_stats, &lpni->lpni_stats);
                if (copy_to_user(bulk, lpni_msg_stats, sizeof(*lpni_msg_stats)))
-                       goto out_free_msg_stats;
+                       goto out_free_hstats;
                bulk += sizeof(*lpni_msg_stats);
+               lpni_hstats->hlpni_network_timeout =
+                 atomic_read(&lpni->lpni_hstats.hlt_network_timeout);
+               lpni_hstats->hlpni_remote_dropped =
+                 atomic_read(&lpni->lpni_hstats.hlt_remote_dropped);
+               lpni_hstats->hlpni_remote_timeout =
+                 atomic_read(&lpni->lpni_hstats.hlt_remote_timeout);
+               lpni_hstats->hlpni_remote_error =
+                 atomic_read(&lpni->lpni_hstats.hlt_remote_error);
+               lpni_hstats->hlpni_health_value =
+                 atomic_read(&lpni->lpni_healthv);
+               if (copy_to_user(bulk, lpni_hstats, sizeof(*lpni_hstats)))
+                       goto out_free_hstats;
+               bulk += sizeof(*lpni_hstats);
        }
        rc = 0;
 
+out_free_hstats:
+       LIBCFS_FREE(lpni_hstats, sizeof(*lpni_hstats));
 out_free_msg_stats:
        LIBCFS_FREE(lpni_msg_stats, sizeof(*lpni_msg_stats));
 out_free_stats:
@@ -3498,3 +3460,71 @@ out_lp_decref:
 out:
        return rc;
 }
+
+void
+lnet_peer_ni_add_to_recoveryq_locked(struct lnet_peer_ni *lpni)
+{
+       /* the mt could've shutdown and cleaned up the queues */
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
+               return;
+
+       if (list_empty(&lpni->lpni_recovery) &&
+           atomic_read(&lpni->lpni_healthv) < LNET_MAX_HEALTH_VALUE) {
+               CERROR("lpni %s added to recovery queue. Health = %d\n",
+                       libcfs_nid2str(lpni->lpni_nid),
+                       atomic_read(&lpni->lpni_healthv));
+               list_add_tail(&lpni->lpni_recovery, &the_lnet.ln_mt_peerNIRecovq);
+               lnet_peer_ni_addref_locked(lpni);
+       }
+}
+
+/* Call with the ln_api_mutex held */
+void
+lnet_peer_ni_set_healthv(lnet_nid_t nid, int value, bool all)
+{
+       struct lnet_peer_table *ptable;
+       struct lnet_peer *lp;
+       struct lnet_peer_net *lpn;
+       struct lnet_peer_ni *lpni;
+       int lncpt;
+       int cpt;
+
+       if (the_lnet.ln_state != LNET_STATE_RUNNING)
+               return;
+
+       if (!all) {
+               lnet_net_lock(LNET_LOCK_EX);
+               lpni = lnet_find_peer_ni_locked(nid);
+               if (!lpni) {
+                       lnet_net_unlock(LNET_LOCK_EX);
+                       return;
+               }
+               atomic_set(&lpni->lpni_healthv, value);
+               lnet_peer_ni_add_to_recoveryq_locked(lpni);
+               lnet_peer_ni_decref_locked(lpni);
+               lnet_net_unlock(LNET_LOCK_EX);
+               return;
+       }
+
+       lncpt = cfs_percpt_number(the_lnet.ln_peer_tables);
+
+       /*
+        * Walk all the peers and reset the healhv for each one to the
+        * maximum value.
+        */
+       lnet_net_lock(LNET_LOCK_EX);
+       for (cpt = 0; cpt < lncpt; cpt++) {
+               ptable = the_lnet.ln_peer_tables[cpt];
+               list_for_each_entry(lp, &ptable->pt_peer_list, lp_peer_list) {
+                       list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
+                               list_for_each_entry(lpni, &lpn->lpn_peer_nis,
+                                                   lpni_peer_nis) {
+                                       atomic_set(&lpni->lpni_healthv, value);
+                                       lnet_peer_ni_add_to_recoveryq_locked(lpni);
+                               }
+                       }
+               }
+       }
+       lnet_net_unlock(LNET_LOCK_EX);
+}
+
index 24fe7e7..14d3cae 100644 (file)
@@ -68,9 +68,6 @@ lnet_peer_buffer_credits(struct lnet_net *net)
        return net->net_tunables.lct_peer_tx_credits;
 }
 
-/* forward ref's */
-static int lnet_router_checker(void *);
-
 static int check_routers_before_use;
 module_param(check_routers_before_use, int, 0444);
 MODULE_PARM_DESC(check_routers_before_use, "Assume routers are down and ping them before use");
@@ -433,8 +430,8 @@ lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
        if (rnet != rnet2)
                LIBCFS_FREE(rnet, sizeof(*rnet));
 
-       /* indicate to startup the router checker if configured */
-       wake_up(&the_lnet.ln_rc_waitq);
+       /* kick start the monitor thread to handle the added route */
+       wake_up(&the_lnet.ln_mt_waitq);
 
        return rc;
 }
@@ -836,7 +833,7 @@ lnet_wait_known_routerstate(void)
        struct list_head *entry;
        int all_known;
 
-       LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
+       LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
 
        for (;;) {
                int cpt = lnet_net_lock_current();
@@ -1066,7 +1063,7 @@ lnet_ping_router_locked(struct lnet_peer_ni *rtr)
        lnet_ni_notify_locked(ni, rtr);
 
        if (!lnet_isrouter(rtr) ||
-           the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
+           the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
                /* router table changed or router checker is shutting down */
                lnet_peer_ni_decref_locked(rtr);
                return;
@@ -1110,7 +1107,7 @@ lnet_ping_router_locked(struct lnet_peer_ni *rtr)
                lnet_net_unlock(rtr->lpni_cpt);
 
                rc = LNetGet(LNET_NID_ANY, mdh, id, LNET_RESERVED_PORTAL,
-                            LNET_PROTO_PING_MATCHBITS, 0);
+                            LNET_PROTO_PING_MATCHBITS, 0, false);
 
                lnet_net_lock(rtr->lpni_cpt);
                if (rc != 0)
@@ -1121,14 +1118,9 @@ lnet_ping_router_locked(struct lnet_peer_ni *rtr)
        return;
 }
 
-int
-lnet_router_checker_start(void)
+int lnet_router_pre_mt_start(void)
 {
-       int                     rc;
-       int                     eqsz = 0;
-       struct task_struct     *task;
-
-       LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
+       int rc;
 
        if (check_routers_before_use &&
            dead_router_check_interval <= 0) {
@@ -1138,60 +1130,36 @@ lnet_router_checker_start(void)
                return -EINVAL;
        }
 
-       sema_init(&the_lnet.ln_rc_signal, 0);
-
        rc = LNetEQAlloc(0, lnet_router_checker_event, &the_lnet.ln_rc_eqh);
        if (rc != 0) {
-               CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc);
+               CERROR("Can't allocate EQ(0): %d\n", rc);
                return -ENOMEM;
        }
 
-       the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
-       task = kthread_run(lnet_router_checker, NULL, "router_checker");
-       if (IS_ERR(task)) {
-               rc = PTR_ERR(task);
-               CERROR("Can't start router checker thread: %d\n", rc);
-               /* block until event callback signals exit */
-               down(&the_lnet.ln_rc_signal);
-               rc = LNetEQFree(the_lnet.ln_rc_eqh);
-               LASSERT(rc == 0);
-               the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
-               return -ENOMEM;
-       }
+       return 0;
+}
 
+void lnet_router_post_mt_start(void)
+{
        if (check_routers_before_use) {
                /* Note that a helpful side-effect of pinging all known routers
                 * at startup is that it makes them drop stale connections they
                 * may have to a previous instance of me. */
                lnet_wait_known_routerstate();
        }
-
-       return 0;
 }
 
 void
-lnet_router_checker_stop (void)
+lnet_router_cleanup(void)
 {
        int rc;
 
-       if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
-               return;
-
-       LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
-       the_lnet.ln_rc_state = LNET_RC_STATE_STOPPING;
-       /* wakeup the RC thread if it's sleeping */
-       wake_up(&the_lnet.ln_rc_waitq);
-
-       /* block until event callback signals exit */
-       down(&the_lnet.ln_rc_signal);
-       LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
-
        rc = LNetEQFree(the_lnet.ln_rc_eqh);
        LASSERT(rc == 0);
        return;
 }
 
-static void
+void
 lnet_prune_rc_data(int wait_unlink)
 {
        struct lnet_rc_data *rcd;
@@ -1200,7 +1168,7 @@ lnet_prune_rc_data(int wait_unlink)
        struct list_head head;
        int i = 2;
 
-       if (likely(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
+       if (likely(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING &&
                   list_empty(&the_lnet.ln_rcd_deathrow) &&
                   list_empty(&the_lnet.ln_rcd_zombie)))
                return;
@@ -1209,7 +1177,7 @@ lnet_prune_rc_data(int wait_unlink)
 
        lnet_net_lock(LNET_LOCK_EX);
 
-       if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
                /* router checker is stopping, prune all */
                list_for_each_entry(lp, &the_lnet.ln_routers,
                                    lpni_rtr_list) {
@@ -1273,18 +1241,13 @@ lnet_prune_rc_data(int wait_unlink)
 }
 
 /*
- * This function is called to check if the RC should block indefinitely.
- * It's called from lnet_router_checker() as well as being passed to
- * wait_event_interruptible() to avoid the lost wake_up problem.
- *
- * When it's called from wait_event_interruptible() it is necessary to
- * also not sleep if the rc state is not running to avoid a deadlock
- * when the system is shutting down
+ * This function is called from the monitor thread to check if there are
+ * any active routers that need to be checked.
  */
-static inline bool
+inline bool
 lnet_router_checker_active(void)
 {
-       if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING)
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
                return true;
 
        /* Router Checker thread needs to run when routing is enabled in
@@ -1292,79 +1255,58 @@ lnet_router_checker_active(void)
        if (the_lnet.ln_routing)
                return true;
 
+       /* if there are routers that need to be cleaned up then do so */
+       if (!list_empty(&the_lnet.ln_rcd_deathrow) ||
+           !list_empty(&the_lnet.ln_rcd_zombie))
+               return true;
+
        return !list_empty(&the_lnet.ln_routers) &&
                (live_router_check_interval > 0 ||
                 dead_router_check_interval > 0);
 }
 
-static int
-lnet_router_checker(void *arg)
+void
+lnet_check_routers(void)
 {
        struct lnet_peer_ni *rtr;
        struct list_head *entry;
+       __u64   version;
+       int     cpt;
+       int     cpt2;
 
-       cfs_block_allsigs();
-
-       while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
-               __u64   version;
-               int     cpt;
-               int     cpt2;
-
-               cpt = lnet_net_lock_current();
+       cpt = lnet_net_lock_current();
 rescan:
-               version = the_lnet.ln_routers_version;
-
-               list_for_each(entry, &the_lnet.ln_routers) {
-                       rtr = list_entry(entry, struct lnet_peer_ni,
-                                        lpni_rtr_list);
-
-                       cpt2 = rtr->lpni_cpt;
-                       if (cpt != cpt2) {
-                               lnet_net_unlock(cpt);
-                               cpt = cpt2;
-                               lnet_net_lock(cpt);
-                               /* the routers list has changed */
-                               if (version != the_lnet.ln_routers_version)
-                                       goto rescan;
-                       }
+       version = the_lnet.ln_routers_version;
 
-                       lnet_ping_router_locked(rtr);
+       list_for_each(entry, &the_lnet.ln_routers) {
+               rtr = list_entry(entry, struct lnet_peer_ni,
+                                       lpni_rtr_list);
 
-                       /* NB dropped lock */
-                       if (version != the_lnet.ln_routers_version) {
-                               /* the routers list has changed */
+               cpt2 = rtr->lpni_cpt;
+               if (cpt != cpt2) {
+                       lnet_net_unlock(cpt);
+                       cpt = cpt2;
+                       lnet_net_lock(cpt);
+                       /* the routers list has changed */
+                       if (version != the_lnet.ln_routers_version)
                                goto rescan;
-                       }
                }
 
-               if (the_lnet.ln_routing)
-                       lnet_update_ni_status_locked();
-
-               lnet_net_unlock(cpt);
-
-               lnet_prune_rc_data(0); /* don't wait for UNLINK */
+               lnet_ping_router_locked(rtr);
 
-               /* Call schedule_timeout() here always adds 1 to load average
-                * because kernel counts # active tasks as nr_running
-                * + nr_uninterruptible. */
-               /* if there are any routes then wakeup every second.  If
-                * there are no routes then sleep indefinitely until woken
-                * up by a user adding a route */
-               if (!lnet_router_checker_active())
-                       wait_event_interruptible(the_lnet.ln_rc_waitq,
-                                                lnet_router_checker_active());
-               else
-                       wait_event_interruptible_timeout(the_lnet.ln_rc_waitq,
-                                                        false,
-                                                        cfs_time_seconds(1));
+               /* NB dropped lock */
+               if (version != the_lnet.ln_routers_version) {
+                       /* the routers list has changed */
+                       goto rescan;
+               }
        }
 
-       lnet_prune_rc_data(1); /* wait for UNLINK */
+       if (the_lnet.ln_routing)
+               lnet_update_ni_status_locked();
 
-       the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
-       up(&the_lnet.ln_rc_signal);
-       /* The unlink event callback will signal final completion */
-       return 0;
+       lnet_net_unlock(cpt);
+
+       lnet_prune_rc_data(0); /* don't wait for UNLINK */
 }
 
 void
index be5e37c..112f5bf 100644 (file)
@@ -87,12 +87,13 @@ lnet_selftest_exit(void)
 void
 lnet_selftest_structure_assertion(void)
 {
-       CLASSERT(sizeof(struct srpc_msg) == 160);
+/*     CLASSERT(sizeof(struct srpc_msg) == 160);
        CLASSERT(sizeof(struct srpc_test_reqst) == 70);
        CLASSERT(offsetof(struct srpc_msg, msg_body.tes_reqst.tsr_concur) == 72);
        CLASSERT(offsetof(struct srpc_msg, msg_body.tes_reqst.tsr_ndest) == 78);
        CLASSERT(sizeof(struct srpc_stat_reply) == 136);
        CLASSERT(sizeof(struct srpc_stat_reqst) == 28);
+*/
 }
 
 static int __init
index 4e1c504..17e7e60 100644 (file)
@@ -422,7 +422,7 @@ srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
         } else {
                 LASSERT ((options & LNET_MD_OP_GET) != 0);
 
-                rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
+                rc = LNetGet(self, *mdh, peer, portal, matchbits, 0, false);
         }
 
         if (rc != 0) {
index 32ed598..c2b3667 100644 (file)
@@ -2028,6 +2028,61 @@ out:
        return rc;
 }
 
+static int
+lustre_lnet_config_healthv(int value, bool all, lnet_nid_t nid,
+                          enum lnet_health_type type, char *name,
+                          int seq_no, struct cYAML **err_rc)
+{
+       struct lnet_ioctl_reset_health_cfg data;
+       int rc = LUSTRE_CFG_RC_NO_ERR;
+       char err_str[LNET_MAX_STR_LEN];
+
+       snprintf(err_str, sizeof(err_str), "\"success\"");
+
+       LIBCFS_IOC_INIT_V2(data, rh_hdr);
+       data.rh_type = type;
+       data.rh_all = all;
+       data.rh_value = value;
+       data.rh_nid = nid;
+
+       rc = l_ioctl(LNET_DEV_ID, IOC_LIBCFS_SET_HEALHV, &data);
+       if (rc != 0) {
+               rc = -errno;
+               snprintf(err_str,
+                        sizeof(err_str), "Can not configure health value");
+       }
+
+       cYAML_build_error(rc, seq_no, ADD_CMD, name, err_str, err_rc);
+
+       return rc;
+}
+
+int lustre_lnet_config_ni_healthv(int value, bool all, char *ni_nid, int seq_no,
+                                 struct cYAML **err_rc)
+{
+       lnet_nid_t nid;
+       if (ni_nid)
+               nid = libcfs_str2nid(ni_nid);
+       else
+               nid = LNET_NID_ANY;
+       return lustre_lnet_config_healthv(value, all, nid,
+                                         LNET_HEALTH_TYPE_LOCAL_NI,
+                                         "ni healthv", seq_no, err_rc);
+}
+
+int lustre_lnet_config_peer_ni_healthv(int value, bool all, char *lpni_nid,
+                                      int seq_no, struct cYAML **err_rc)
+{
+       lnet_nid_t nid;
+       if (lpni_nid)
+               nid = libcfs_str2nid(lpni_nid);
+       else
+               nid = LNET_NID_ANY;
+       return lustre_lnet_config_healthv(value, all, nid,
+                                         LNET_HEALTH_TYPE_PEER_NI,
+                                         "peer_ni healthv", seq_no, err_rc);
+}
+
 static bool
 add_msg_stats_to_yaml_blk(struct cYAML *yaml,
                          struct lnet_ioctl_comm_count *counts)
@@ -2078,6 +2133,7 @@ int lustre_lnet_show_net(char *nw, int detail, int seq_no,
        struct lnet_ioctl_config_lnd_tunables *lnd;
        struct lnet_ioctl_element_stats *stats;
        struct lnet_ioctl_element_msg_stats msg_stats;
+       struct lnet_ioctl_local_ni_hstats hstats;
        __u32 net = LNET_NIDNET(LNET_NID_ANY);
        __u32 prev_net = LNET_NIDNET(LNET_NID_ANY);
        int rc = LUSTRE_CFG_RC_OUT_OF_MEM, i, j;
@@ -2085,7 +2141,8 @@ int lustre_lnet_show_net(char *nw, int detail, int seq_no,
        struct cYAML *root = NULL, *tunables = NULL,
                *net_node = NULL, *interfaces = NULL,
                *item = NULL, *first_seq = NULL,
-               *tmp = NULL, *statistics = NULL;
+               *tmp = NULL, *statistics = NULL,
+               *yhstats = NULL;
        int str_buf_len = LNET_MAX_SHOW_NUM_CPT * 2;
        char str_buf[str_buf_len];
        char *pos;
@@ -2277,6 +2334,48 @@ int lustre_lnet_show_net(char *nw, int detail, int seq_no,
                                        goto out;
                        }
 
+                       LIBCFS_IOC_INIT_V2(hstats, hlni_hdr);
+                       hstats.hlni_nid = ni_data->lic_nid;
+                       /* grab health stats */
+                       rc = l_ioctl(LNET_DEV_ID,
+                                    IOC_LIBCFS_GET_LOCAL_HSTATS,
+                                    &hstats);
+                       if (rc != 0) {
+                               l_errno = errno;
+                               goto continue_without_msg_stats;
+                       }
+                       yhstats = cYAML_create_object(item, "health stats");
+                       if (!yhstats)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "health value",
+                                               hstats.hlni_health_value)
+                                                       == NULL)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "interrupts",
+                                               hstats.hlni_local_interrupt)
+                                                       == NULL)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "dropped",
+                                               hstats.hlni_local_dropped)
+                                                       == NULL)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "aborted",
+                                               hstats.hlni_local_aborted)
+                                                       == NULL)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "no route",
+                                               hstats.hlni_local_no_route)
+                                                       == NULL)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "timeouts",
+                                               hstats.hlni_local_timeout)
+                                                       == NULL)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "error",
+                                               hstats.hlni_local_error)
+                                                       == NULL)
+                               goto out;
+
 continue_without_msg_stats:
                        tunables = cYAML_create_object(item, "tunables");
                        if (!tunables)
@@ -2429,6 +2528,72 @@ int ioctl_set_value(__u32 val, int ioc, char *name,
        return rc;
 }
 
+int lustre_lnet_config_hsensitivity(int sen, int seq_no, struct cYAML **err_rc)
+{
+       int rc = LUSTRE_CFG_RC_NO_ERR;
+       char err_str[LNET_MAX_STR_LEN];
+       char val[LNET_MAX_STR_LEN];
+
+       snprintf(err_str, sizeof(err_str), "\"success\"");
+
+       snprintf(val, sizeof(val), "%d", sen);
+
+       rc = write_sysfs_file(modparam_path, "lnet_health_sensitivity", val,
+                             1, strlen(val) + 1);
+       if (rc)
+               snprintf(err_str, sizeof(err_str),
+                        "\"cannot configure health sensitivity: %s\"",
+                        strerror(errno));
+
+       cYAML_build_error(rc, seq_no, ADD_CMD, "health_sensitivity", err_str, err_rc);
+
+       return rc;
+}
+
+int lustre_lnet_config_transaction_to(int timeout, int seq_no, struct cYAML **err_rc)
+{
+       int rc = LUSTRE_CFG_RC_NO_ERR;
+       char err_str[LNET_MAX_STR_LEN];
+       char val[LNET_MAX_STR_LEN];
+
+       snprintf(err_str, sizeof(err_str), "\"success\"");
+
+       snprintf(val, sizeof(val), "%d", timeout);
+
+       rc = write_sysfs_file(modparam_path, "lnet_transaction_timeout", val,
+                             1, strlen(val) + 1);
+       if (rc)
+               snprintf(err_str, sizeof(err_str),
+                        "\"cannot configure transaction timeout: %s\"",
+                        strerror(errno));
+
+       cYAML_build_error(rc, seq_no, ADD_CMD, "transaction_timeout", err_str, err_rc);
+
+       return rc;
+}
+
+int lustre_lnet_config_retry_count(int count, int seq_no, struct cYAML **err_rc)
+{
+       int rc = LUSTRE_CFG_RC_NO_ERR;
+       char err_str[LNET_MAX_STR_LEN];
+       char val[LNET_MAX_STR_LEN];
+
+       snprintf(err_str, sizeof(err_str), "\"success\"");
+
+       snprintf(val, sizeof(val), "%d", count);
+
+       rc = write_sysfs_file(modparam_path, "lnet_retry_count", val,
+                             1, strlen(val) + 1);
+       if (rc)
+               snprintf(err_str, sizeof(err_str),
+                        "\"cannot configure retry count: %s\"",
+                        strerror(errno));
+
+       cYAML_build_error(rc, seq_no, ADD_CMD, "retry_count", err_str, err_rc);
+
+       return rc;
+}
+
 int lustre_lnet_config_max_intf(int max, int seq_no, struct cYAML **err_rc)
 {
        int rc = LUSTRE_CFG_RC_NO_ERR;
@@ -2706,6 +2871,7 @@ int lustre_lnet_show_peer(char *knid, int detail, int seq_no,
        struct lnet_peer_ni_credit_info *lpni_cri;
        struct lnet_ioctl_element_stats *lpni_stats;
        struct lnet_ioctl_element_msg_stats *msg_stats;
+       struct lnet_ioctl_peer_ni_hstats *hstats;
        lnet_nid_t *nidp;
        int rc = LUSTRE_CFG_RC_OUT_OF_MEM;
        int i, j, k;
@@ -2714,7 +2880,8 @@ int lustre_lnet_show_peer(char *knid, int detail, int seq_no,
        __u32 size;
        struct cYAML *root = NULL, *peer = NULL, *peer_ni = NULL,
                     *first_seq = NULL, *peer_root = NULL, *tmp = NULL,
-                    *msg_statistics = NULL, *statistics = NULL;
+                    *msg_statistics = NULL, *statistics = NULL,
+                    *yhstats;
        char err_str[LNET_MAX_STR_LEN];
        struct lnet_process_id *list = NULL;
        void *data = NULL;
@@ -2851,7 +3018,8 @@ int lustre_lnet_show_peer(char *knid, int detail, int seq_no,
                        lpni_cri = (void*)nidp + sizeof(nidp);
                        lpni_stats = (void *)lpni_cri + sizeof(*lpni_cri);
                        msg_stats = (void *)lpni_stats + sizeof(*lpni_stats);
-                       lpni_data = (void *)msg_stats + sizeof(*msg_stats);
+                       hstats = (void *)msg_stats + sizeof(*msg_stats);
+                       lpni_data = (void *)hstats + sizeof(*hstats);
 
                        peer_ni = cYAML_create_seq_item(tmp);
                        if (peer_ni == NULL)
@@ -2946,6 +3114,29 @@ int lustre_lnet_show_peer(char *knid, int detail, int seq_no,
                                        goto out;
                        }
 
+                       yhstats = cYAML_create_object(peer_ni, "health stats");
+                       if (!yhstats)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "health value",
+                                               hstats->hlpni_health_value)
+                                                       == NULL)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "dropped",
+                                               hstats->hlpni_remote_dropped)
+                                                       == NULL)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "timeout",
+                                               hstats->hlpni_remote_timeout)
+                                                       == NULL)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "error",
+                                               hstats->hlpni_remote_error)
+                                                       == NULL)
+                               goto out;
+                       if (cYAML_create_number(yhstats, "network timeout",
+                                               hstats->hlpni_network_timeout)
+                                                       == NULL)
+                               goto out;
                }
        }
 
@@ -3187,6 +3378,165 @@ static int ioctl_show_global_values(int ioc, int seq_no, char *name,
                                       data.sv_value, show_rc, err_rc, l_errno);
 }
 
+int lustre_lnet_show_hsensitivity(int seq_no, struct cYAML **show_rc,
+                                 struct cYAML **err_rc)
+{
+       int rc = LUSTRE_CFG_RC_OUT_OF_MEM;
+       char val[LNET_MAX_STR_LEN];
+       int sen = -1, l_errno = 0;
+       char err_str[LNET_MAX_STR_LEN];
+
+       snprintf(err_str, sizeof(err_str), "\"out of memory\"");
+
+       rc = read_sysfs_file(modparam_path, "lnet_health_sensitivity", val,
+                            1, sizeof(val));
+       if (rc) {
+               l_errno = -errno;
+               snprintf(err_str, sizeof(err_str),
+                        "\"cannot get health sensitivity: %d\"", rc);
+       } else {
+               sen = atoi(val);
+       }
+
+       return build_global_yaml_entry(err_str, sizeof(err_str), seq_no,
+                                      "health_sensitivity", sen, show_rc,
+                                      err_rc, l_errno);
+}
+
+int lustre_lnet_show_transaction_to(int seq_no, struct cYAML **show_rc,
+                                   struct cYAML **err_rc)
+{
+       int rc = LUSTRE_CFG_RC_OUT_OF_MEM;
+       char val[LNET_MAX_STR_LEN];
+       int tto = -1, l_errno = 0;
+       char err_str[LNET_MAX_STR_LEN];
+
+       snprintf(err_str, sizeof(err_str), "\"out of memory\"");
+
+       rc = read_sysfs_file(modparam_path, "lnet_transaction_timeout", val,
+                            1, sizeof(val));
+       if (rc) {
+               l_errno = -errno;
+               snprintf(err_str, sizeof(err_str),
+                        "\"cannot get transaction timeout: %d\"", rc);
+       } else {
+               tto = atoi(val);
+       }
+
+       return build_global_yaml_entry(err_str, sizeof(err_str), seq_no,
+                                      "transaction_timeout", tto, show_rc,
+                                      err_rc, l_errno);
+}
+
+int lustre_lnet_show_retry_count(int seq_no, struct cYAML **show_rc,
+                                struct cYAML **err_rc)
+{
+       int rc = LUSTRE_CFG_RC_OUT_OF_MEM;
+       char val[LNET_MAX_STR_LEN];
+       int retry_count = -1, l_errno = 0;
+       char err_str[LNET_MAX_STR_LEN];
+
+       snprintf(err_str, sizeof(err_str), "\"out of memory\"");
+
+       rc = read_sysfs_file(modparam_path, "lnet_retry_count", val,
+                            1, sizeof(val));
+       if (rc) {
+               l_errno = -errno;
+               snprintf(err_str, sizeof(err_str),
+                        "\"cannot get retry count: %d\"", rc);
+       } else {
+               retry_count = atoi(val);
+       }
+
+       return build_global_yaml_entry(err_str, sizeof(err_str), seq_no,
+                                      "retry_count", retry_count, show_rc,
+                                      err_rc, l_errno);
+}
+
+int show_recovery_queue(enum lnet_health_type type, char *name, int seq_no,
+                       struct cYAML **show_rc, struct cYAML **err_rc)
+{
+       struct lnet_ioctl_recovery_list nid_list;
+       struct cYAML *root = NULL, *nids = NULL;
+       int rc, i;
+       char err_str[LNET_MAX_STR_LEN];
+
+       snprintf(err_str, sizeof(err_str), "failed to print recovery queue\n");
+
+       LIBCFS_IOC_INIT_V2(nid_list, rlst_hdr);
+       nid_list.rlst_type = type;
+
+       rc = l_ioctl(LNET_DEV_ID, IOC_LIBCFS_GET_RECOVERY_QUEUE, &nid_list);
+       if (rc) {
+               rc = errno;
+               goto out;
+       }
+
+       root = cYAML_create_object(NULL, NULL);
+       if (root == NULL)
+               goto out;
+
+       nids = cYAML_create_object(root, name);
+       if (nids == NULL)
+               goto out;
+
+       rc = -EINVAL;
+
+       for (i = 0; i < nid_list.rlst_num_nids; i++) {
+               char nidenum[LNET_MAX_STR_LEN];
+               snprintf(nidenum, sizeof(nidenum), "nid-%d", i);
+               if (!cYAML_create_string(nids, nidenum,
+                       libcfs_nid2str(nid_list.rlst_nid_array[i])))
+                       goto out;
+       }
+
+       snprintf(err_str, sizeof(err_str), "success\n");
+
+       rc = 0;
+
+out:
+       if (show_rc == NULL || rc != LUSTRE_CFG_RC_NO_ERR) {
+               cYAML_free_tree(root);
+       } else if (show_rc != NULL && *show_rc != NULL) {
+               struct cYAML *show_node;
+               /* find the net node, if one doesn't exist
+                * then insert one.  Otherwise add to the one there
+                */
+               show_node = cYAML_get_object_item(*show_rc, name);
+               if (show_node != NULL && cYAML_is_sequence(show_node)) {
+                       cYAML_insert_child(show_node, nids);
+                       free(nids);
+                       free(root);
+               } else if (show_node == NULL) {
+                       cYAML_insert_sibling((*show_rc)->cy_child,
+                                               nids);
+                       free(root);
+               } else {
+                       cYAML_free_tree(root);
+               }
+       } else {
+               *show_rc = root;
+       }
+
+       cYAML_build_error(rc, seq_no, SHOW_CMD, name, err_str, err_rc);
+
+       return rc;
+}
+
+int lustre_lnet_show_local_ni_recovq(int seq_no, struct cYAML **show_rc,
+                                    struct cYAML **err_rc)
+{
+       return show_recovery_queue(LNET_HEALTH_TYPE_LOCAL_NI, "local NI recovery",
+                                  seq_no, show_rc, err_rc);
+}
+
+int lustre_lnet_show_peer_ni_recovq(int seq_no, struct cYAML **show_rc,
+                                   struct cYAML **err_rc)
+{
+       return show_recovery_queue(LNET_HEALTH_TYPE_PEER_NI, "peer NI recovery",
+                                  seq_no, show_rc, err_rc);
+}
+
 int lustre_lnet_show_max_intf(int seq_no, struct cYAML **show_rc,
                              struct cYAML **err_rc)
 {
@@ -3291,6 +3641,10 @@ int lustre_lnet_show_stats(int seq_no, struct cYAML **show_rc,
                                data.st_cntrs.msgs_max) == NULL)
                goto out;
 
+       if (cYAML_create_number(stats, "rst_alloc",
+                               data.st_cntrs.rst_alloc) == NULL)
+               goto out;
+
        if (cYAML_create_number(stats, "errors",
                                data.st_cntrs.errors) == NULL)
                goto out;
@@ -3299,6 +3653,54 @@ int lustre_lnet_show_stats(int seq_no, struct cYAML **show_rc,
                                data.st_cntrs.send_count) == NULL)
                goto out;
 
+       if (cYAML_create_number(stats, "resend_count",
+                               data.st_cntrs.resend_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "response_timeout_count",
+                               data.st_cntrs.response_timeout_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "local_interrupt_count",
+                               data.st_cntrs.local_interrupt_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "local_dropped_count",
+                               data.st_cntrs.local_dropped_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "local_aborted_count",
+                               data.st_cntrs.local_aborted_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "local_no_route_count",
+                               data.st_cntrs.local_no_route_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "local_timeout_count",
+                               data.st_cntrs.local_timeout_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "local_error_count",
+                               data.st_cntrs.local_error_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "remote_dropped_count",
+                               data.st_cntrs.remote_dropped_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "remote_error_count",
+                               data.st_cntrs.remote_error_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "remote_timeout_count",
+                               data.st_cntrs.remote_timeout_count) == NULL)
+               goto out;
+
+       if (cYAML_create_number(stats, "network_timeout_count",
+                               data.st_cntrs.network_timeout_count) == NULL)
+               goto out;
+
        if (cYAML_create_number(stats, "recv_count",
                                data.st_cntrs.recv_count) == NULL)
                goto out;
@@ -4083,7 +4485,8 @@ static int handle_yaml_config_global_settings(struct cYAML *tree,
                                              struct cYAML **show_rc,
                                              struct cYAML **err_rc)
 {
-       struct cYAML *max_intf, *numa, *discovery, *seq_no;
+       struct cYAML *max_intf, *numa, *discovery, *retry, *tto, *seq_no,
+                    *sen;
        int rc = 0;
 
        seq_no = cYAML_get_object_item(tree, "seq_no");
@@ -4108,6 +4511,27 @@ static int handle_yaml_config_global_settings(struct cYAML *tree,
                                                        : -1,
                                                  err_rc);
 
+       retry = cYAML_get_object_item(tree, "retry_count");
+       if (retry)
+               rc = lustre_lnet_config_retry_count(retry->cy_valueint,
+                                                   seq_no ? seq_no->cy_valueint
+                                                       : -1,
+                                                   err_rc);
+
+       tto = cYAML_get_object_item(tree, "transaction_timeout");
+       if (tto)
+               rc = lustre_lnet_config_transaction_to(tto->cy_valueint,
+                                                      seq_no ? seq_no->cy_valueint
+                                                               : -1,
+                                                      err_rc);
+
+       sen = cYAML_get_object_item(tree, "health_sensitivity");
+       if (sen)
+               rc = lustre_lnet_config_hsensitivity(sen->cy_valueint,
+                                                    seq_no ? seq_no->cy_valueint
+                                                       : -1,
+                                                    err_rc);
+
        return rc;
 }
 
@@ -4148,7 +4572,8 @@ static int handle_yaml_show_global_settings(struct cYAML *tree,
                                            struct cYAML **show_rc,
                                            struct cYAML **err_rc)
 {
-       struct cYAML *max_intf, *numa, *discovery, *seq_no;
+       struct cYAML *max_intf, *numa, *discovery, *retry, *tto, *seq_no,
+                    *sen;
        int rc = 0;
 
        seq_no = cYAML_get_object_item(tree, "seq_no");
@@ -4170,6 +4595,24 @@ static int handle_yaml_show_global_settings(struct cYAML *tree,
                                                        : -1,
                                                show_rc, err_rc);
 
+       retry = cYAML_get_object_item(tree, "retry_count");
+       if (retry)
+               rc = lustre_lnet_show_retry_count(seq_no ? seq_no->cy_valueint
+                                                       : -1,
+                                                 show_rc, err_rc);
+
+       tto = cYAML_get_object_item(tree, "transaction_timeout");
+       if (tto)
+               rc = lustre_lnet_show_transaction_to(seq_no ? seq_no->cy_valueint
+                                                       : -1,
+                                                    show_rc, err_rc);
+
+       sen = cYAML_get_object_item(tree, "health_sensitivity");
+       if (sen)
+               rc = lustre_lnet_show_hsensitivity(seq_no ? seq_no->cy_valueint
+                                                       : -1,
+                                                    show_rc, err_rc);
+
        return rc;
 }
 
index ce005d1..4cd6d5e 100644 (file)
@@ -225,6 +225,113 @@ int lustre_lnet_show_numa_range(int seq_no, struct cYAML **show_rc,
                                struct cYAML **err_rc);
 
 /*
+ * lustre_lnet_config_ni_healthv
+ *   set the health value of the NI. -1 resets the value to maximum.
+ *
+ *   value: health value to set.
+ *   all: true to set all local NIs to that value.
+ *   ni_nid: NI NID to set its health value. all parameter always takes
+ *   precedence
+ *   seq_no - sequence number of the request
+ *   err_rc - [OUT] struct cYAML tree describing the error. Freed by
+ *   caller
+ */
+int lustre_lnet_config_ni_healthv(int value, bool all, char *ni_nid,
+                                 int seq_no, struct cYAML **err_rc);
+
+/*
+ * lustre_lnet_config_peer_ni_healthv
+ *   set the health value of the peer NI. -1 resets the value to maximum.
+ *
+ *   value: health value to set.
+ *   all: true to set all local NIs to that value.
+ *   pni_nid: Peer NI NID to set its health value. all parameter always takes
+ *   precedence
+ *   seq_no - sequence number of the request
+ *   err_rc - [OUT] struct cYAML tree describing the error. Freed by
+ *   caller
+ */
+int lustre_lnet_config_peer_ni_healthv(int value, bool all, char *pni_nid,
+                                      int seq_no, struct cYAML **err_rc);
+
+/*
+ * lustre_lnet_config_hsensitivity
+ *   sets the health sensitivity; the value by which to decrement the
+ *   health value of a local or peer NI. If 0 then health is turned off
+ *
+ *   sen - sensitivity value to configure
+ *   seq_no - sequence number of the request
+ *   err_rc - [OUT] struct cYAML tree describing the error. Freed by
+ *   caller
+ */
+int lustre_lnet_config_hsensitivity(int sen, int seq_no, struct cYAML **err_rc);
+
+/*
+ * lustre_lnet_show_hsensitivity
+ *    show the health sensitivity in the system
+ *
+ *   seq_no - sequence number of the request
+ *   show_rc - [OUT] struct cYAML tree containing health sensitivity info
+ *   err_rc - [OUT] struct cYAML tree describing the error. Freed by
+ *   caller
+ */
+int lustre_lnet_show_hsensitivity(int seq_no, struct cYAML **show_rc,
+                                 struct cYAML **err_rc);
+
+/*
+ * lustre_lnet_config_transaction_to
+ *   sets the timeout after which a message expires or a timeout event is
+ *   propagated for an expired response.
+ *
+ *   timeout - timeout value to configure
+ *   seq_no - sequence number of the request
+ *   err_rc - [OUT] struct cYAML tree describing the error. Freed by
+ *   caller
+ */
+int lustre_lnet_config_transaction_to(int timeout, int seq_no, struct cYAML **err_rc);
+
+/*
+ * lustre_lnet_show_transaction_to
+ *    show the transaction timeout in the system
+ *
+ *   seq_no - sequence number of the request
+ *   show_rc - [OUT] struct cYAML tree containing transaction timeout info
+ *   err_rc - [OUT] struct cYAML tree describing the error. Freed by
+ *   caller
+ */
+int lustre_lnet_show_transaction_to(int seq_no, struct cYAML **show_rc,
+                                   struct cYAML **err_rc);
+
+/*
+ * lustre_lnet_config_retry_count
+ *   sets the maximum number of retries to resend a message
+ *
+ *   count - maximum value to configure
+ *   seq_no - sequence number of the request
+ *   err_rc - [OUT] struct cYAML tree describing the error. Freed by
+ *   caller
+ */
+int lustre_lnet_config_retry_count(int count, int seq_no, struct cYAML **err_rc);
+
+/*
+ * lustre_lnet_show_retry_count
+ *    show current maximum number of retries in the system
+ *
+ *   seq_no - sequence number of the request
+ *   show_rc - [OUT] struct cYAML tree containing retry count info
+ *   err_rc - [OUT] struct cYAML tree describing the error. Freed by
+ *   caller
+ */
+int lustre_lnet_show_retry_count(int seq_no, struct cYAML **show_rc,
+                                struct cYAML **err_rc);
+
+int lustre_lnet_show_local_ni_recovq(int seq_no, struct cYAML **show_rc,
+                                    struct cYAML **err_rc);
+
+int lustre_lnet_show_peer_ni_recovq(int seq_no, struct cYAML **show_rc,
+                                   struct cYAML **err_rc);
+
+/*
  * lustre_lnet_config_max_intf
  *   Sets the maximum number of interfaces per node. this tunable is
  *   primarily useful for sanity checks prior to allocating memory.
index e14d43e..e6975af 100644 (file)
@@ -48,11 +48,15 @@ static int jt_show_net(int argc, char **argv);
 static int jt_show_routing(int argc, char **argv);
 static int jt_show_stats(int argc, char **argv);
 static int jt_show_peer(int argc, char **argv);
+static int jt_show_recovery(int argc, char **argv);
 static int jt_show_global(int argc, char **argv);
 static int jt_set_tiny(int argc, char **argv);
 static int jt_set_small(int argc, char **argv);
 static int jt_set_large(int argc, char **argv);
 static int jt_set_numa(int argc, char **argv);
+static int jt_set_retry_count(int argc, char **argv);
+static int jt_set_transaction_to(int argc, char **argv);
+static int jt_set_hsensitivity(int argc, char **argv);
 static int jt_add_peer_nid(int argc, char **argv);
 static int jt_del_peer_nid(int argc, char **argv);
 static int jt_set_max_intf(int argc, char **argv);
@@ -69,10 +73,12 @@ static int jt_route(int argc, char **argv);
 static int jt_net(int argc, char **argv);
 static int jt_routing(int argc, char **argv);
 static int jt_set(int argc, char **argv);
+static int jt_debug(int argc, char **argv);
 static int jt_stats(int argc, char **argv);
 static int jt_global(int argc, char **argv);
 static int jt_peers(int argc, char **argv);
-
+static int jt_set_ni_value(int argc, char **argv);
+static int jt_set_peer_ni_value(int argc, char **argv);
 
 command_t cmd_list[] = {
        {"lnet", jt_lnet, 0, "lnet {configure | unconfigure} [--all]"},
@@ -85,6 +91,7 @@ command_t cmd_list[] = {
        {"import", jt_import, 0, "import FILE.yaml"},
        {"export", jt_export, 0, "export FILE.yaml"},
        {"stats", jt_stats, 0, "stats {show | help}"},
+       {"debug", jt_debug, 0, "debug recovery {local | peer}"},
        {"global", jt_global, 0, "global {show | help}"},
        {"peer", jt_peers, 0, "peer {add | del | show | help}"},
        {"ping", jt_ping, 0, "ping nid,[nid,...]"},
@@ -138,6 +145,10 @@ command_t net_cmds[] = {
         "\t--net: net name (e.g. tcp0) to filter on\n"
         "\t--verbose: display detailed output per network."
                       " Optional argument of '2' outputs more stats\n"},
+       {"set", jt_set_ni_value, 0, "set local NI specific parameter\n"
+        "\t--nid: NI NID to set the\n"
+        "\t--health: specify health value to set\n"
+        "\t--all: set all NIs value to the one specified\n"},
        { 0, 0, 0, NULL }
 };
 
@@ -151,6 +162,13 @@ command_t stats_cmds[] = {
        { 0, 0, 0, NULL }
 };
 
+command_t debug_cmds[] = {
+       {"recovery", jt_show_recovery, 0, "list recovery queues\n"
+               "\t--local : list local recovery queue\n"
+               "\t--peer : list peer recovery queue\n"},
+       { 0, 0, 0, NULL }
+};
+
 command_t global_cmds[] = {
        {"show", jt_show_global, 0, "show global variables\n"},
        { 0, 0, 0, NULL }
@@ -174,6 +192,14 @@ command_t set_cmds[] = {
        {"discovery", jt_set_discovery, 0, "enable/disable peer discovery\n"
         "\t0 - disable peer discovery\n"
         "\t1 - enable peer discovery (default)\n"},
+       {"retry_count", jt_set_retry_count, 0, "number of retries\n"
+        "\t0 - turn of retries\n"
+        "\t>0 - number of retries\n"},
+       {"transaction_timeout", jt_set_transaction_to, 0, "Message/Response timeout\n"
+        "\t>0 - timeout in seconds\n"},
+       {"health_sensitivity", jt_set_hsensitivity, 0, "sensitivity to failure\n"
+        "\t0 - turn off health evaluation\n"
+        "\t>0 - sensitivity value not more than 1000\n"},
        { 0, 0, 0, NULL }
 };
 
@@ -195,6 +221,10 @@ command_t peer_cmds[] = {
         "\t--verbose: display detailed output per peer."
                       " Optional argument of '2' outputs more stats\n"},
        {"list", jt_list_peer, 0, "list all peers\n"},
+       {"set", jt_set_peer_ni_value, 0, "set peer ni specific parameter\n"
+        "\t--nid: Peer NI NID to set the\n"
+        "\t--health: specify health value to set\n"
+        "\t--all: set all peer_nis values to the one specified\n"},
        { 0, 0, 0, NULL }
 };
 
@@ -328,6 +358,90 @@ static int jt_set_numa(int argc, char **argv)
        return rc;
 }
 
+static int jt_set_hsensitivity(int argc, char **argv)
+{
+       long int value;
+       int rc;
+       struct cYAML *err_rc = NULL;
+
+       rc = check_cmd(set_cmds, "set", "health_sensitivity", 2, argc, argv);
+       if (rc)
+               return rc;
+
+       rc = parse_long(argv[1], &value);
+       if (rc != 0) {
+               cYAML_build_error(-1, -1, "parser", "set",
+                                 "cannot parse health sensitivity value", &err_rc);
+               cYAML_print_tree2file(stderr, err_rc);
+               cYAML_free_tree(err_rc);
+               return -1;
+       }
+
+       rc = lustre_lnet_config_hsensitivity(value, -1, &err_rc);
+       if (rc != LUSTRE_CFG_RC_NO_ERR)
+               cYAML_print_tree2file(stderr, err_rc);
+
+       cYAML_free_tree(err_rc);
+
+       return rc;
+}
+
+static int jt_set_transaction_to(int argc, char **argv)
+{
+       long int value;
+       int rc;
+       struct cYAML *err_rc = NULL;
+
+       rc = check_cmd(set_cmds, "set", "transaction_timeout", 2, argc, argv);
+       if (rc)
+               return rc;
+
+       rc = parse_long(argv[1], &value);
+       if (rc != 0) {
+               cYAML_build_error(-1, -1, "parser", "set",
+                                 "cannot parse transaction timeout value", &err_rc);
+               cYAML_print_tree2file(stderr, err_rc);
+               cYAML_free_tree(err_rc);
+               return -1;
+       }
+
+       rc = lustre_lnet_config_transaction_to(value, -1, &err_rc);
+       if (rc != LUSTRE_CFG_RC_NO_ERR)
+               cYAML_print_tree2file(stderr, err_rc);
+
+       cYAML_free_tree(err_rc);
+
+       return rc;
+}
+
+static int jt_set_retry_count(int argc, char **argv)
+{
+       long int value;
+       int rc;
+       struct cYAML *err_rc = NULL;
+
+       rc = check_cmd(set_cmds, "set", "retry_count", 2, argc, argv);
+       if (rc)
+               return rc;
+
+       rc = parse_long(argv[1], &value);
+       if (rc != 0) {
+               cYAML_build_error(-1, -1, "parser", "set",
+                                 "cannot parse retry_count value", &err_rc);
+               cYAML_print_tree2file(stderr, err_rc);
+               cYAML_free_tree(err_rc);
+               return -1;
+       }
+
+       rc = lustre_lnet_config_retry_count(value, -1, &err_rc);
+       if (rc != LUSTRE_CFG_RC_NO_ERR)
+               cYAML_print_tree2file(stderr, err_rc);
+
+       cYAML_free_tree(err_rc);
+
+       return rc;
+}
+
 static int jt_set_discovery(int argc, char **argv)
 {
        long int value;
@@ -858,6 +972,103 @@ static int jt_show_route(int argc, char **argv)
        return rc;
 }
 
+static int set_value_helper(int argc, char **argv,
+                           int (*cb)(int, bool, char*, int, struct cYAML**))
+{
+       char *nid = NULL;
+       long int healthv = -1;
+       bool all = false;
+       int rc, opt;
+       struct cYAML *err_rc = NULL;
+
+       const char *const short_options = "h:n:a";
+       static const struct option long_options[] = {
+               { .name = "nid", .has_arg = required_argument, .val = 'n' },
+               { .name = "health", .has_arg = required_argument, .val = 'h' },
+               { .name = "all", .has_arg = no_argument, .val = 'a' },
+               { .name = NULL } };
+
+       rc = check_cmd(net_cmds, "net", "set", 0, argc, argv);
+       if (rc)
+               return rc;
+
+       while ((opt = getopt_long(argc, argv, short_options,
+                                  long_options, NULL)) != -1) {
+               switch (opt) {
+               case 'n':
+                       nid = optarg;
+                       break;
+               case 'h':
+                       if (parse_long(argv[optind++], &healthv) != 0)
+                               healthv = -1;
+                       break;
+               case 'a':
+                       all = true;
+               default:
+                       return 0;
+               }
+       }
+
+       rc = cb(healthv, all, nid, -1, &err_rc);
+
+       if (rc != LUSTRE_CFG_RC_NO_ERR)
+               cYAML_print_tree2file(stderr, err_rc);
+
+       cYAML_free_tree(err_rc);
+
+       return rc;
+}
+
+static int jt_set_ni_value(int argc, char **argv)
+{
+       return set_value_helper(argc, argv, lustre_lnet_config_ni_healthv);
+}
+
+static int jt_set_peer_ni_value(int argc, char **argv)
+{
+       return set_value_helper(argc, argv, lustre_lnet_config_peer_ni_healthv);
+}
+
+static int jt_show_recovery(int argc, char **argv)
+{
+       int rc, opt;
+       struct cYAML *err_rc = NULL, *show_rc = NULL;
+
+       const char *const short_options = "lp";
+       static const struct option long_options[] = {
+               { .name = "local", .has_arg = no_argument, .val = 'l' },
+               { .name = "peer", .has_arg = no_argument, .val = 'p' },
+               { .name = NULL } };
+
+       rc = check_cmd(debug_cmds, "recovery", NULL, 0, argc, argv);
+       if (rc)
+               return rc;
+
+       while ((opt = getopt_long(argc, argv, short_options,
+                                  long_options, NULL)) != -1) {
+               switch (opt) {
+               case 'l':
+                       rc = lustre_lnet_show_local_ni_recovq(-1, &show_rc, &err_rc);
+                       break;
+               case 'p':
+                       rc = lustre_lnet_show_peer_ni_recovq(-1, &show_rc, &err_rc);
+                       break;
+               default:
+                       return 0;
+               }
+       }
+
+       if (rc != LUSTRE_CFG_RC_NO_ERR)
+               cYAML_print_tree2file(stderr, err_rc);
+       else if (show_rc)
+               cYAML_print_tree(show_rc);
+
+       cYAML_free_tree(err_rc);
+       cYAML_free_tree(show_rc);
+
+       return rc;
+}
+
 static int jt_show_net(int argc, char **argv)
 {
        char *network = NULL;
@@ -980,6 +1191,24 @@ static int jt_show_global(int argc, char **argv)
                goto out;
        }
 
+       rc = lustre_lnet_show_retry_count(-1, &show_rc, &err_rc);
+       if (rc != LUSTRE_CFG_RC_NO_ERR) {
+               cYAML_print_tree2file(stderr, err_rc);
+               goto out;
+       }
+
+       rc = lustre_lnet_show_transaction_to(-1, &show_rc, &err_rc);
+       if (rc != LUSTRE_CFG_RC_NO_ERR) {
+               cYAML_print_tree2file(stderr, err_rc);
+               goto out;
+       }
+
+       rc = lustre_lnet_show_hsensitivity(-1, &show_rc, &err_rc);
+       if (rc != LUSTRE_CFG_RC_NO_ERR) {
+               cYAML_print_tree2file(stderr, err_rc);
+               goto out;
+       }
+
        if (show_rc)
                cYAML_print_tree(show_rc);
 
@@ -1045,6 +1274,17 @@ static int jt_stats(int argc, char **argv)
        return Parser_execarg(argc - 1, &argv[1], stats_cmds);
 }
 
+static int jt_debug(int argc, char **argv)
+{
+       int rc;
+
+       rc = check_cmd(debug_cmds, "recovery", NULL, 2, argc, argv);
+       if (rc)
+               return rc;
+
+       return Parser_execarg(argc - 1, &argv[1], debug_cmds);
+}
+
 static int jt_global(int argc, char **argv)
 {
        int rc;
index 29d10e3..fb6a282 100644 (file)
@@ -231,7 +231,7 @@ int ptlrpc_start_bulk_transfer(struct ptlrpc_bulk_desc *desc)
                                     desc->bd_portal, mbits, 0, 0);
                else
                        rc = LNetGet(self_nid, desc->bd_mds[posted_md],
-                                    peer_id, desc->bd_portal, mbits, 0);
+                                    peer_id, desc->bd_portal, mbits, 0, false);
 
                posted_md++;
                if (rc != 0) {
index c9632b9..2a2939b 100644 (file)
@@ -117,7 +117,8 @@ command_t cmdlist[] = {
         "                    <<-r | --rate DROP_RATE> |\n"
         "                    <-i | --interval SECONDS>>\n"
         "                    [<-p | --portal> PORTAL...]\n"
-        "                    [<-m | --message> <PUT|ACK|GET|REPLY>...]\n"},
+        "                    [<-m | --message> <PUT|ACK|GET|REPLY>...]\n"
+        "                    [< -e | --health_error]\n"},
        {"net_drop_del", jt_ptl_drop_del, 0, "remove LNet drop rule\n"
         "usage: net_drop_del <[-a | --all] |\n"
         "                    <-s | --source NID>\n"
index e544588..9349b6b 100644 (file)
@@ -1315,6 +1315,57 @@ fault_attr_ptl_parse(char *ptl_str, __u64 *mask_p)
 }
 
 static int
+fault_attr_health_error_parse(char *error, __u32 *mask)
+{
+       if (!strcasecmp(error, "local_interrupt")) {
+               *mask |= HSTATUS_LOCAL_INTERRUPT_BIT;
+               return 0;
+       }
+       if (!strcasecmp(error, "local_dropped")) {
+               *mask |= HSTATUS_LOCAL_DROPPED_BIT;
+               return 0;
+       }
+       if (!strcasecmp(error, "local_aborted")) {
+               *mask |= HSTATUS_LOCAL_ABORTED_BIT;
+               return 0;
+       }
+       if (!strcasecmp(error, "local_no_route")) {
+               *mask |= HSTATUS_LOCAL_NO_ROUTE_BIT;
+               return 0;
+       }
+       if (!strcasecmp(error, "local_error")) {
+               *mask |= HSTATUS_LOCAL_ERROR_BIT;
+               return 0;
+       }
+       if (!strcasecmp(error, "local_timeout")) {
+               *mask |= HSTATUS_LOCAL_TIMEOUT_BIT;
+               return 0;
+       }
+       if (!strcasecmp(error, "remote_error")) {
+               *mask |= HSTATUS_REMOTE_ERROR_BIT;
+               return 0;
+       }
+       if (!strcasecmp(error, "remote_dropped")) {
+               *mask |= HSTATUS_REMOTE_DROPPED_BIT;
+               return 0;
+       }
+       if (!strcasecmp(error, "remote_timeout")) {
+               *mask |= HSTATUS_REMOTE_TIMEOUT_BIT;
+               return 0;
+       }
+       if (!strcasecmp(error, "network_timeout")) {
+               *mask |= HSTATUS_NETWORK_TIMEOUT_BIT;
+               return 0;
+       }
+       if (!strcasecmp(error, "random")) {
+               *mask = HSTATUS_RANDOM;
+               return 0;
+       }
+
+       return -1;
+}
+
+static int
 fault_simul_rule_add(__u32 opc, char *name, int argc, char **argv)
 {
        struct libcfs_ioctl_data  data = { { 0 } };
@@ -1327,9 +1378,11 @@ fault_simul_rule_add(__u32 opc, char *name, int argc, char **argv)
        { .name = "dest",     .has_arg = required_argument, .val = 'd' },
        { .name = "rate",     .has_arg = required_argument, .val = 'r' },
        { .name = "interval", .has_arg = required_argument, .val = 'i' },
+       { .name = "random",   .has_arg = no_argument,       .val = 'n' },
        { .name = "latency",  .has_arg = required_argument, .val = 'l' },
        { .name = "portal",   .has_arg = required_argument, .val = 'p' },
        { .name = "message",  .has_arg = required_argument, .val = 'm' },
+       { .name = "health_error",  .has_arg = required_argument, .val = 'e' },
        { .name = NULL } };
 
        if (argc == 1) {
@@ -1338,7 +1391,7 @@ fault_simul_rule_add(__u32 opc, char *name, int argc, char **argv)
                return -1;
        }
 
-       optstr = opc == LNET_CTL_DROP_ADD ? "s:d:r:i:p:m:" : "s:d:r:l:p:m:";
+       optstr = opc == LNET_CTL_DROP_ADD ? "s:d:r:i:p:m:e:n" : "s:d:r:l:p:m:";
        memset(&attr, 0, sizeof(attr));
        while (1) {
                char c = getopt_long(argc, argv, optstr, opts, NULL);
@@ -1366,6 +1419,20 @@ fault_simul_rule_add(__u32 opc, char *name, int argc, char **argv)
                                attr.u.delay.la_rate = strtoul(optarg, NULL, 0);
                        break;
 
+               case 'e':
+                       if (opc == LNET_CTL_DROP_ADD) {
+                               rc = fault_attr_health_error_parse(optarg,
+                                       &attr.u.drop.da_health_error_mask);
+                               if (rc)
+                                       goto getopt_failed;
+                       }
+                       break;
+
+               case 'n':
+                       if (opc == LNET_CTL_DROP_ADD)
+                               attr.u.drop.da_random = true;
+                       break;
+
                case 'i': /* time interval (# seconds) for message drop */
                        if (opc == LNET_CTL_DROP_ADD)
                                attr.u.drop.da_interval = strtoul(optarg,
@@ -1408,6 +1475,12 @@ fault_simul_rule_add(__u32 opc, char *name, int argc, char **argv)
                                "but not both at the same time.\n");
                        return -1;
                }
+
+               if (attr.u.drop.da_random &&
+                   attr.u.drop.da_interval == 0) {
+                       fprintf(stderr, "please provide an interval to randomize\n");
+                       return -1;
+               }
        } else if (opc == LNET_CTL_DELAY_ADD) {
                if (!((attr.u.delay.la_rate == 0) ^
                      (attr.u.delay.la_interval == 0))) {