Whamcloud - gitweb
LU-9120 lnet: add monitor thread 63/32763/11
authorAmir Shehata <amir.shehata@intel.com>
Thu, 31 May 2018 00:20:10 +0000 (17:20 -0700)
committerAmir Shehata <ashehata@whamcloud.com>
Fri, 17 Aug 2018 19:53:34 +0000 (19:53 +0000)
Refactored the router checker thread to be the monitor thread.
The monitor thread will check router aliveness, expires messages
on the active list, recover local and remote NIs and resend messages.

In this patch it only checks router aliveness.

A deadline on the message is also added to keep track of when this
message should expire.

Test-Parameters: forbuildonly
Signed-off-by: Amir Shehata <ashehata@whamcloud.com>
Change-Id: I712cad13d55328400ce61749967979673c4d673f
Reviewed-on: https://review.whamcloud.com/32763
Tested-by: Jenkins
Reviewed-by: Sonia Sharma <sharmaso@whamcloud.com>
Reviewed-by: Olaf Weber <olaf.weber@hpe.com>
Reviewed-by: Chris Horn <hornc@cray.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/lnet/router.c

index 2fd3292..f4868e0 100644 (file)
@@ -823,8 +823,15 @@ int lnet_sock_connect(struct socket **sockp, int *fatal,
 int lnet_peers_start_down(void);
 int lnet_peer_buffer_credits(struct lnet_net *net);
 
-int lnet_router_checker_start(void);
-void lnet_router_checker_stop(void);
+int lnet_monitor_thr_start(void);
+void lnet_monitor_thr_stop(void);
+
+bool lnet_router_checker_active(void);
+void lnet_check_routers(void);
+int lnet_router_pre_mt_start(void);
+void lnet_router_post_mt_start(void);
+void lnet_prune_rc_data(int wait_unlink);
+void lnet_router_cleanup(void);
 void lnet_router_ni_update_locked(struct lnet_peer_ni *gw, __u32 net);
 void lnet_swap_pinginfo(struct lnet_ping_buffer *pbuf);
 
index 7528c3c..3817bcd 100644 (file)
@@ -82,6 +82,12 @@ struct lnet_msg {
        lnet_nid_t              msg_src_nid_param;
        lnet_nid_t              msg_rtr_nid_param;
 
+       /*
+        * Deadline for the message after which it will be finalized if it
+        * has not completed.
+        */
+       ktime_t                 msg_deadline;
+
        /* committed for sending */
        unsigned int            msg_tx_committed:1;
        /* CPT # this message committed for sending */
@@ -892,9 +898,9 @@ struct lnet_msg_container {
 #define LNET_DC_STATE_STOPPING         2       /* telling thread to stop */
 
 /* Router Checker states */
-#define LNET_RC_STATE_SHUTDOWN         0       /* not started */
-#define LNET_RC_STATE_RUNNING          1       /* started up OK */
-#define LNET_RC_STATE_STOPPING         2       /* telling thread to stop */
+#define LNET_MT_STATE_SHUTDOWN         0       /* not started */
+#define LNET_MT_STATE_RUNNING          1       /* started up OK */
+#define LNET_MT_STATE_STOPPING         2       /* telling thread to stop */
 
 /* LNet states */
 #define LNET_STATE_SHUTDOWN            0       /* not started */
@@ -998,8 +1004,8 @@ struct lnet {
        /* discovery startup/shutdown state */
        int                             ln_dc_state;
 
-       /* router checker startup/shutdown state */
-       int                             ln_rc_state;
+       /* monitor thread startup/shutdown state */
+       int                             ln_mt_state;
        /* router checker's event queue */
        struct lnet_handle_eq           ln_rc_eqh;
        /* rcd still pending on net */
@@ -1007,7 +1013,7 @@ struct lnet {
        /* rcd ready for free */
        struct list_head                ln_rcd_zombie;
        /* serialise startup/shutdown */
-       struct semaphore                ln_rc_signal;
+       struct semaphore                ln_mt_signal;
 
        struct mutex                    ln_api_mutex;
        struct mutex                    ln_lnd_mutex;
@@ -1035,10 +1041,11 @@ struct lnet {
         */
        bool                            ln_nis_from_mod_params;
 
-       /* waitq for router checker.  As long as there are no routes in
-        * the list, the router checker will sleep on this queue.  when
-        * routes are added the thread will wake up */
-       wait_queue_head_t               ln_rc_waitq;
+       /*
+        * waitq for the monitor thread. The monitor thread takes care of
+        * checking routes, timedout messages and resending messages.
+        */
+       wait_queue_head_t               ln_mt_waitq;
 };
 
 #endif
index 8b3dbad..80e0aeb 100644 (file)
@@ -302,7 +302,7 @@ lnet_init_locks(void)
        spin_lock_init(&the_lnet.ln_eq_wait_lock);
        spin_lock_init(&the_lnet.ln_msg_resend_lock);
        init_waitqueue_head(&the_lnet.ln_eq_waitq);
-       init_waitqueue_head(&the_lnet.ln_rc_waitq);
+       init_waitqueue_head(&the_lnet.ln_mt_waitq);
        mutex_init(&the_lnet.ln_lnd_mutex);
 }
 
@@ -2304,13 +2304,13 @@ LNetNIInit(lnet_pid_t requested_pid)
 
        lnet_ping_target_update(pbuf, ping_mdh);
 
-       rc = lnet_router_checker_start();
+       rc = lnet_monitor_thr_start();
        if (rc != 0)
                goto err_stop_ping;
 
        rc = lnet_push_target_init();
        if (rc != 0)
-               goto err_stop_router_checker;
+               goto err_stop_monitor_thr;
 
        rc = lnet_peer_discovery_start();
        if (rc != 0)
@@ -2325,8 +2325,8 @@ LNetNIInit(lnet_pid_t requested_pid)
 
 err_destroy_push_target:
        lnet_push_target_fini();
-err_stop_router_checker:
-       lnet_router_checker_stop();
+err_stop_monitor_thr:
+       lnet_monitor_thr_stop();
 err_stop_ping:
        lnet_ping_target_fini();
 err_acceptor_stop:
@@ -2378,7 +2378,7 @@ LNetNIFini()
                lnet_router_debugfs_init();
                lnet_peer_discovery_stop();
                lnet_push_target_fini();
-               lnet_router_checker_stop();
+               lnet_monitor_thr_stop();
                lnet_ping_target_fini();
 
                /* Teardown fns that use my own API functions BEFORE here */
index c23c9ab..c82c516 100644 (file)
@@ -1006,6 +1006,9 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send)
                }
        }
 
+       /* unset the tx_delay flag as we're going to send it now */
+       msg->msg_tx_delayed = 0;
+
        if (do_send) {
                lnet_net_unlock(cpt);
                lnet_ni_send(ni, msg);
@@ -1101,6 +1104,9 @@ lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
        msg->msg_niov = rbp->rbp_npages;
        msg->msg_kiov = &rb->rb_kiov[0];
 
+       /* unset the msg-rx_delayed flag since we're receiving the message */
+       msg->msg_rx_delayed = 0;
+
        if (do_recv) {
                int cpt = msg->msg_rx_cpt;
 
@@ -2616,6 +2622,104 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
        return 0;
 }
 
+static int
+lnet_monitor_thread(void *arg)
+{
+       /*
+        * The monitor thread takes care of the following:
+        *  1. Checks the aliveness of routers
+        *  2. Checks if there are messages on the resend queue to resend
+        *     them.
+        *  3. Check if there are any NIs on the local recovery queue and
+        *     pings them
+        *  4. Checks if there are any NIs on the remote recovery queue
+        *     and pings them.
+        */
+       cfs_block_allsigs();
+
+       while (the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING) {
+               if (lnet_router_checker_active())
+                       lnet_check_routers();
+
+               /*
+                * TODO do we need to check if we should sleep without
+                * timeout?  Technically, an active system will always
+                * have messages in flight so this check will always
+                * evaluate to false. And on an idle system do we care
+                * if we wake up every 1 second? Although, we've seen
+                * cases where we get a complaint that an idle thread
+                * is waking up unnecessarily.
+                */
+               wait_event_interruptible_timeout(the_lnet.ln_mt_waitq,
+                                               false,
+                                               cfs_time_seconds(1));
+       }
+
+       /* clean up the router checker */
+       lnet_prune_rc_data(1);
+
+       /* Shutting down */
+       the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+
+       /* signal that the monitor thread is exiting */
+       up(&the_lnet.ln_mt_signal);
+
+       return 0;
+}
+
+int lnet_monitor_thr_start(void)
+{
+       int rc;
+       struct task_struct *task;
+
+       LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN);
+
+       sema_init(&the_lnet.ln_mt_signal, 0);
+
+       /* Pre monitor thread start processing */
+       rc = lnet_router_pre_mt_start();
+       if (!rc)
+               return rc;
+
+       the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING;
+       task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread");
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CERROR("Can't start monitor thread: %d\n", rc);
+               /* block until event callback signals exit */
+               down(&the_lnet.ln_mt_signal);
+
+               /* clean up */
+               lnet_router_cleanup();
+               the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+               return -ENOMEM;
+       }
+
+       /* post monitor thread start processing */
+       lnet_router_post_mt_start();
+
+       return 0;
+}
+
+void lnet_monitor_thr_stop(void)
+{
+       if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
+               return;
+
+       LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
+       the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
+
+       /* tell the monitor thread that we're shutting down */
+       wake_up(&the_lnet.ln_mt_waitq);
+
+       /* block until monitor thread signals that it's done */
+       down(&the_lnet.ln_mt_signal);
+       LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN);
+
+       lnet_router_cleanup();
+       return;
+}
+
 void
 lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob,
                  __u32 msg_type)
index 383232d..2393ed5 100644 (file)
@@ -143,13 +143,17 @@ lnet_msg_commit(struct lnet_msg *msg, int cpt)
 {
        struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt];
        struct lnet_counters *counters = the_lnet.ln_counters[cpt];
+       s64 timeout_ns;
+
+       /* set the message deadline */
+       timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
+       msg->msg_deadline = ktime_add_ns(ktime_get(), timeout_ns);
 
        /* routed message can be committed for both receiving and sending */
        LASSERT(!msg->msg_tx_committed);
 
        if (msg->msg_sending) {
                LASSERT(!msg->msg_receiving);
-
                msg->msg_tx_cpt = cpt;
                msg->msg_tx_committed = 1;
                if (msg->msg_rx_committed) { /* routed message REPLY */
@@ -163,8 +167,9 @@ lnet_msg_commit(struct lnet_msg *msg, int cpt)
        }
 
        LASSERT(!msg->msg_onactivelist);
+
        msg->msg_onactivelist = 1;
-       list_add(&msg->msg_activelist, &container->msc_active);
+       list_add_tail(&msg->msg_activelist, &container->msc_active);
 
        counters->msgs_alloc++;
        if (counters->msgs_alloc > counters->msgs_max)
index 24fe7e7..c682ddf 100644 (file)
@@ -68,9 +68,6 @@ lnet_peer_buffer_credits(struct lnet_net *net)
        return net->net_tunables.lct_peer_tx_credits;
 }
 
-/* forward ref's */
-static int lnet_router_checker(void *);
-
 static int check_routers_before_use;
 module_param(check_routers_before_use, int, 0444);
 MODULE_PARM_DESC(check_routers_before_use, "Assume routers are down and ping them before use");
@@ -433,8 +430,8 @@ lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway,
        if (rnet != rnet2)
                LIBCFS_FREE(rnet, sizeof(*rnet));
 
-       /* indicate to startup the router checker if configured */
-       wake_up(&the_lnet.ln_rc_waitq);
+       /* kick start the monitor thread to handle the added route */
+       wake_up(&the_lnet.ln_mt_waitq);
 
        return rc;
 }
@@ -836,7 +833,7 @@ lnet_wait_known_routerstate(void)
        struct list_head *entry;
        int all_known;
 
-       LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
+       LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
 
        for (;;) {
                int cpt = lnet_net_lock_current();
@@ -1066,7 +1063,7 @@ lnet_ping_router_locked(struct lnet_peer_ni *rtr)
        lnet_ni_notify_locked(ni, rtr);
 
        if (!lnet_isrouter(rtr) ||
-           the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
+           the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
                /* router table changed or router checker is shutting down */
                lnet_peer_ni_decref_locked(rtr);
                return;
@@ -1121,14 +1118,9 @@ lnet_ping_router_locked(struct lnet_peer_ni *rtr)
        return;
 }
 
-int
-lnet_router_checker_start(void)
+int lnet_router_pre_mt_start(void)
 {
-       int                     rc;
-       int                     eqsz = 0;
-       struct task_struct     *task;
-
-       LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
+       int rc;
 
        if (check_routers_before_use &&
            dead_router_check_interval <= 0) {
@@ -1138,60 +1130,36 @@ lnet_router_checker_start(void)
                return -EINVAL;
        }
 
-       sema_init(&the_lnet.ln_rc_signal, 0);
-
        rc = LNetEQAlloc(0, lnet_router_checker_event, &the_lnet.ln_rc_eqh);
        if (rc != 0) {
-               CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc);
+               CERROR("Can't allocate EQ(0): %d\n", rc);
                return -ENOMEM;
        }
 
-       the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING;
-       task = kthread_run(lnet_router_checker, NULL, "router_checker");
-       if (IS_ERR(task)) {
-               rc = PTR_ERR(task);
-               CERROR("Can't start router checker thread: %d\n", rc);
-               /* block until event callback signals exit */
-               down(&the_lnet.ln_rc_signal);
-               rc = LNetEQFree(the_lnet.ln_rc_eqh);
-               LASSERT(rc == 0);
-               the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
-               return -ENOMEM;
-       }
+       return 0;
+}
 
+void lnet_router_post_mt_start(void)
+{
        if (check_routers_before_use) {
                /* Note that a helpful side-effect of pinging all known routers
                 * at startup is that it makes them drop stale connections they
                 * may have to a previous instance of me. */
                lnet_wait_known_routerstate();
        }
-
-       return 0;
 }
 
 void
-lnet_router_checker_stop (void)
+lnet_router_cleanup(void)
 {
        int rc;
 
-       if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN)
-               return;
-
-       LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING);
-       the_lnet.ln_rc_state = LNET_RC_STATE_STOPPING;
-       /* wakeup the RC thread if it's sleeping */
-       wake_up(&the_lnet.ln_rc_waitq);
-
-       /* block until event callback signals exit */
-       down(&the_lnet.ln_rc_signal);
-       LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN);
-
        rc = LNetEQFree(the_lnet.ln_rc_eqh);
        LASSERT(rc == 0);
        return;
 }
 
-static void
+void
 lnet_prune_rc_data(int wait_unlink)
 {
        struct lnet_rc_data *rcd;
@@ -1200,7 +1168,7 @@ lnet_prune_rc_data(int wait_unlink)
        struct list_head head;
        int i = 2;
 
-       if (likely(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
+       if (likely(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING &&
                   list_empty(&the_lnet.ln_rcd_deathrow) &&
                   list_empty(&the_lnet.ln_rcd_zombie)))
                return;
@@ -1209,7 +1177,7 @@ lnet_prune_rc_data(int wait_unlink)
 
        lnet_net_lock(LNET_LOCK_EX);
 
-       if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) {
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
                /* router checker is stopping, prune all */
                list_for_each_entry(lp, &the_lnet.ln_routers,
                                    lpni_rtr_list) {
@@ -1273,18 +1241,13 @@ lnet_prune_rc_data(int wait_unlink)
 }
 
 /*
- * This function is called to check if the RC should block indefinitely.
- * It's called from lnet_router_checker() as well as being passed to
- * wait_event_interruptible() to avoid the lost wake_up problem.
- *
- * When it's called from wait_event_interruptible() it is necessary to
- * also not sleep if the rc state is not running to avoid a deadlock
- * when the system is shutting down
+ * This function is called from the monitor thread to check if there are
+ * any active routers that need to be checked.
  */
-static inline bool
+inline bool
 lnet_router_checker_active(void)
 {
-       if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING)
+       if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
                return true;
 
        /* Router Checker thread needs to run when routing is enabled in
@@ -1292,79 +1255,58 @@ lnet_router_checker_active(void)
        if (the_lnet.ln_routing)
                return true;
 
+       /* if there are routers that need to be cleaned up then do so */
+       if (!list_empty(&the_lnet.ln_rcd_deathrow) ||
+           !list_empty(&the_lnet.ln_rcd_zombie))
+               return true;
+
        return !list_empty(&the_lnet.ln_routers) &&
                (live_router_check_interval > 0 ||
                 dead_router_check_interval > 0);
 }
 
-static int
-lnet_router_checker(void *arg)
+void
+lnet_check_routers(void)
 {
        struct lnet_peer_ni *rtr;
        struct list_head *entry;
+       __u64   version;
+       int     cpt;
+       int     cpt2;
 
-       cfs_block_allsigs();
-
-       while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) {
-               __u64   version;
-               int     cpt;
-               int     cpt2;
-
-               cpt = lnet_net_lock_current();
+       cpt = lnet_net_lock_current();
 rescan:
-               version = the_lnet.ln_routers_version;
-
-               list_for_each(entry, &the_lnet.ln_routers) {
-                       rtr = list_entry(entry, struct lnet_peer_ni,
-                                        lpni_rtr_list);
-
-                       cpt2 = rtr->lpni_cpt;
-                       if (cpt != cpt2) {
-                               lnet_net_unlock(cpt);
-                               cpt = cpt2;
-                               lnet_net_lock(cpt);
-                               /* the routers list has changed */
-                               if (version != the_lnet.ln_routers_version)
-                                       goto rescan;
-                       }
+       version = the_lnet.ln_routers_version;
 
-                       lnet_ping_router_locked(rtr);
+       list_for_each(entry, &the_lnet.ln_routers) {
+               rtr = list_entry(entry, struct lnet_peer_ni,
+                                       lpni_rtr_list);
 
-                       /* NB dropped lock */
-                       if (version != the_lnet.ln_routers_version) {
-                               /* the routers list has changed */
+               cpt2 = rtr->lpni_cpt;
+               if (cpt != cpt2) {
+                       lnet_net_unlock(cpt);
+                       cpt = cpt2;
+                       lnet_net_lock(cpt);
+                       /* the routers list has changed */
+                       if (version != the_lnet.ln_routers_version)
                                goto rescan;
-                       }
                }
 
-               if (the_lnet.ln_routing)
-                       lnet_update_ni_status_locked();
-
-               lnet_net_unlock(cpt);
-
-               lnet_prune_rc_data(0); /* don't wait for UNLINK */
+               lnet_ping_router_locked(rtr);
 
-               /* Call schedule_timeout() here always adds 1 to load average
-                * because kernel counts # active tasks as nr_running
-                * + nr_uninterruptible. */
-               /* if there are any routes then wakeup every second.  If
-                * there are no routes then sleep indefinitely until woken
-                * up by a user adding a route */
-               if (!lnet_router_checker_active())
-                       wait_event_interruptible(the_lnet.ln_rc_waitq,
-                                                lnet_router_checker_active());
-               else
-                       wait_event_interruptible_timeout(the_lnet.ln_rc_waitq,
-                                                        false,
-                                                        cfs_time_seconds(1));
+               /* NB dropped lock */
+               if (version != the_lnet.ln_routers_version) {
+                       /* the routers list has changed */
+                       goto rescan;
+               }
        }
 
-       lnet_prune_rc_data(1); /* wait for UNLINK */
+       if (the_lnet.ln_routing)
+               lnet_update_ni_status_locked();
 
-       the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN;
-       up(&the_lnet.ln_rc_signal);
-       /* The unlink event callback will signal final completion */
-       return 0;
+       lnet_net_unlock(cpt);
+
+       lnet_prune_rc_data(0); /* don't wait for UNLINK */
 }
 
 void