From: Amir Shehata Date: Thu, 31 May 2018 00:20:10 +0000 (-0700) Subject: LU-9120 lnet: add monitor thread X-Git-Tag: 2.11.55~65^2^2~23 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=b01e6fce1c988139b5fe59484c7568362992f37b LU-9120 lnet: add monitor thread Refactored the router checker thread to be the monitor thread. The monitor thread will check router aliveness, expires messages on the active list, recover local and remote NIs and resend messages. In this patch it only checks router aliveness. A deadline on the message is also added to keep track of when this message should expire. Test-Parameters: forbuildonly Signed-off-by: Amir Shehata Change-Id: I712cad13d55328400ce61749967979673c4d673f Reviewed-on: https://review.whamcloud.com/32763 Tested-by: Jenkins Reviewed-by: Sonia Sharma Reviewed-by: Olaf Weber Reviewed-by: Chris Horn --- diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index 2fd3292..f4868e0 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -823,8 +823,15 @@ int lnet_sock_connect(struct socket **sockp, int *fatal, int lnet_peers_start_down(void); int lnet_peer_buffer_credits(struct lnet_net *net); -int lnet_router_checker_start(void); -void lnet_router_checker_stop(void); +int lnet_monitor_thr_start(void); +void lnet_monitor_thr_stop(void); + +bool lnet_router_checker_active(void); +void lnet_check_routers(void); +int lnet_router_pre_mt_start(void); +void lnet_router_post_mt_start(void); +void lnet_prune_rc_data(int wait_unlink); +void lnet_router_cleanup(void); void lnet_router_ni_update_locked(struct lnet_peer_ni *gw, __u32 net); void lnet_swap_pinginfo(struct lnet_ping_buffer *pbuf); diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 7528c3c..3817bcd 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -82,6 +82,12 @@ struct lnet_msg { lnet_nid_t msg_src_nid_param; lnet_nid_t msg_rtr_nid_param; + /* + * Deadline for the message after which it will be finalized if it + * has not completed. + */ + ktime_t msg_deadline; + /* committed for sending */ unsigned int msg_tx_committed:1; /* CPT # this message committed for sending */ @@ -892,9 +898,9 @@ struct lnet_msg_container { #define LNET_DC_STATE_STOPPING 2 /* telling thread to stop */ /* Router Checker states */ -#define LNET_RC_STATE_SHUTDOWN 0 /* not started */ -#define LNET_RC_STATE_RUNNING 1 /* started up OK */ -#define LNET_RC_STATE_STOPPING 2 /* telling thread to stop */ +#define LNET_MT_STATE_SHUTDOWN 0 /* not started */ +#define LNET_MT_STATE_RUNNING 1 /* started up OK */ +#define LNET_MT_STATE_STOPPING 2 /* telling thread to stop */ /* LNet states */ #define LNET_STATE_SHUTDOWN 0 /* not started */ @@ -998,8 +1004,8 @@ struct lnet { /* discovery startup/shutdown state */ int ln_dc_state; - /* router checker startup/shutdown state */ - int ln_rc_state; + /* monitor thread startup/shutdown state */ + int ln_mt_state; /* router checker's event queue */ struct lnet_handle_eq ln_rc_eqh; /* rcd still pending on net */ @@ -1007,7 +1013,7 @@ struct lnet { /* rcd ready for free */ struct list_head ln_rcd_zombie; /* serialise startup/shutdown */ - struct semaphore ln_rc_signal; + struct semaphore ln_mt_signal; struct mutex ln_api_mutex; struct mutex ln_lnd_mutex; @@ -1035,10 +1041,11 @@ struct lnet { */ bool ln_nis_from_mod_params; - /* waitq for router checker. As long as there are no routes in - * the list, the router checker will sleep on this queue. when - * routes are added the thread will wake up */ - wait_queue_head_t ln_rc_waitq; + /* + * waitq for the monitor thread. The monitor thread takes care of + * checking routes, timedout messages and resending messages. + */ + wait_queue_head_t ln_mt_waitq; }; #endif diff --git a/lnet/lnet/api-ni.c b/lnet/lnet/api-ni.c index 8b3dbad..80e0aeb 100644 --- a/lnet/lnet/api-ni.c +++ b/lnet/lnet/api-ni.c @@ -302,7 +302,7 @@ lnet_init_locks(void) spin_lock_init(&the_lnet.ln_eq_wait_lock); spin_lock_init(&the_lnet.ln_msg_resend_lock); init_waitqueue_head(&the_lnet.ln_eq_waitq); - init_waitqueue_head(&the_lnet.ln_rc_waitq); + init_waitqueue_head(&the_lnet.ln_mt_waitq); mutex_init(&the_lnet.ln_lnd_mutex); } @@ -2304,13 +2304,13 @@ LNetNIInit(lnet_pid_t requested_pid) lnet_ping_target_update(pbuf, ping_mdh); - rc = lnet_router_checker_start(); + rc = lnet_monitor_thr_start(); if (rc != 0) goto err_stop_ping; rc = lnet_push_target_init(); if (rc != 0) - goto err_stop_router_checker; + goto err_stop_monitor_thr; rc = lnet_peer_discovery_start(); if (rc != 0) @@ -2325,8 +2325,8 @@ LNetNIInit(lnet_pid_t requested_pid) err_destroy_push_target: lnet_push_target_fini(); -err_stop_router_checker: - lnet_router_checker_stop(); +err_stop_monitor_thr: + lnet_monitor_thr_stop(); err_stop_ping: lnet_ping_target_fini(); err_acceptor_stop: @@ -2378,7 +2378,7 @@ LNetNIFini() lnet_router_debugfs_init(); lnet_peer_discovery_stop(); lnet_push_target_fini(); - lnet_router_checker_stop(); + lnet_monitor_thr_stop(); lnet_ping_target_fini(); /* Teardown fns that use my own API functions BEFORE here */ diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index c23c9ab5..c82c516 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -1006,6 +1006,9 @@ lnet_post_send_locked(struct lnet_msg *msg, int do_send) } } + /* unset the tx_delay flag as we're going to send it now */ + msg->msg_tx_delayed = 0; + if (do_send) { lnet_net_unlock(cpt); lnet_ni_send(ni, msg); @@ -1101,6 +1104,9 @@ lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv) msg->msg_niov = rbp->rbp_npages; msg->msg_kiov = &rb->rb_kiov[0]; + /* unset the msg-rx_delayed flag since we're receiving the message */ + msg->msg_rx_delayed = 0; + if (do_recv) { int cpt = msg->msg_rx_cpt; @@ -2616,6 +2622,104 @@ lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid) return 0; } +static int +lnet_monitor_thread(void *arg) +{ + /* + * The monitor thread takes care of the following: + * 1. Checks the aliveness of routers + * 2. Checks if there are messages on the resend queue to resend + * them. + * 3. Check if there are any NIs on the local recovery queue and + * pings them + * 4. Checks if there are any NIs on the remote recovery queue + * and pings them. + */ + cfs_block_allsigs(); + + while (the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING) { + if (lnet_router_checker_active()) + lnet_check_routers(); + + /* + * TODO do we need to check if we should sleep without + * timeout? Technically, an active system will always + * have messages in flight so this check will always + * evaluate to false. And on an idle system do we care + * if we wake up every 1 second? Although, we've seen + * cases where we get a complaint that an idle thread + * is waking up unnecessarily. + */ + wait_event_interruptible_timeout(the_lnet.ln_mt_waitq, + false, + cfs_time_seconds(1)); + } + + /* clean up the router checker */ + lnet_prune_rc_data(1); + + /* Shutting down */ + the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN; + + /* signal that the monitor thread is exiting */ + up(&the_lnet.ln_mt_signal); + + return 0; +} + +int lnet_monitor_thr_start(void) +{ + int rc; + struct task_struct *task; + + LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN); + + sema_init(&the_lnet.ln_mt_signal, 0); + + /* Pre monitor thread start processing */ + rc = lnet_router_pre_mt_start(); + if (!rc) + return rc; + + the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING; + task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CERROR("Can't start monitor thread: %d\n", rc); + /* block until event callback signals exit */ + down(&the_lnet.ln_mt_signal); + + /* clean up */ + lnet_router_cleanup(); + the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN; + return -ENOMEM; + } + + /* post monitor thread start processing */ + lnet_router_post_mt_start(); + + return 0; +} + +void lnet_monitor_thr_stop(void) +{ + if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN) + return; + + LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING); + the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING; + + /* tell the monitor thread that we're shutting down */ + wake_up(&the_lnet.ln_mt_waitq); + + /* block until monitor thread signals that it's done */ + down(&the_lnet.ln_mt_signal); + LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN); + + lnet_router_cleanup(); + return; +} + void lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob, __u32 msg_type) diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 383232d..2393ed5 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -143,13 +143,17 @@ lnet_msg_commit(struct lnet_msg *msg, int cpt) { struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt]; struct lnet_counters *counters = the_lnet.ln_counters[cpt]; + s64 timeout_ns; + + /* set the message deadline */ + timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC; + msg->msg_deadline = ktime_add_ns(ktime_get(), timeout_ns); /* routed message can be committed for both receiving and sending */ LASSERT(!msg->msg_tx_committed); if (msg->msg_sending) { LASSERT(!msg->msg_receiving); - msg->msg_tx_cpt = cpt; msg->msg_tx_committed = 1; if (msg->msg_rx_committed) { /* routed message REPLY */ @@ -163,8 +167,9 @@ lnet_msg_commit(struct lnet_msg *msg, int cpt) } LASSERT(!msg->msg_onactivelist); + msg->msg_onactivelist = 1; - list_add(&msg->msg_activelist, &container->msc_active); + list_add_tail(&msg->msg_activelist, &container->msc_active); counters->msgs_alloc++; if (counters->msgs_alloc > counters->msgs_max) diff --git a/lnet/lnet/router.c b/lnet/lnet/router.c index 24fe7e7..c682ddf 100644 --- a/lnet/lnet/router.c +++ b/lnet/lnet/router.c @@ -68,9 +68,6 @@ lnet_peer_buffer_credits(struct lnet_net *net) return net->net_tunables.lct_peer_tx_credits; } -/* forward ref's */ -static int lnet_router_checker(void *); - static int check_routers_before_use; module_param(check_routers_before_use, int, 0444); MODULE_PARM_DESC(check_routers_before_use, "Assume routers are down and ping them before use"); @@ -433,8 +430,8 @@ lnet_add_route(__u32 net, __u32 hops, lnet_nid_t gateway, if (rnet != rnet2) LIBCFS_FREE(rnet, sizeof(*rnet)); - /* indicate to startup the router checker if configured */ - wake_up(&the_lnet.ln_rc_waitq); + /* kick start the monitor thread to handle the added route */ + wake_up(&the_lnet.ln_mt_waitq); return rc; } @@ -836,7 +833,7 @@ lnet_wait_known_routerstate(void) struct list_head *entry; int all_known; - LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING); + LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING); for (;;) { int cpt = lnet_net_lock_current(); @@ -1066,7 +1063,7 @@ lnet_ping_router_locked(struct lnet_peer_ni *rtr) lnet_ni_notify_locked(ni, rtr); if (!lnet_isrouter(rtr) || - the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) { + the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) { /* router table changed or router checker is shutting down */ lnet_peer_ni_decref_locked(rtr); return; @@ -1121,14 +1118,9 @@ lnet_ping_router_locked(struct lnet_peer_ni *rtr) return; } -int -lnet_router_checker_start(void) +int lnet_router_pre_mt_start(void) { - int rc; - int eqsz = 0; - struct task_struct *task; - - LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN); + int rc; if (check_routers_before_use && dead_router_check_interval <= 0) { @@ -1138,60 +1130,36 @@ lnet_router_checker_start(void) return -EINVAL; } - sema_init(&the_lnet.ln_rc_signal, 0); - rc = LNetEQAlloc(0, lnet_router_checker_event, &the_lnet.ln_rc_eqh); if (rc != 0) { - CERROR("Can't allocate EQ(%d): %d\n", eqsz, rc); + CERROR("Can't allocate EQ(0): %d\n", rc); return -ENOMEM; } - the_lnet.ln_rc_state = LNET_RC_STATE_RUNNING; - task = kthread_run(lnet_router_checker, NULL, "router_checker"); - if (IS_ERR(task)) { - rc = PTR_ERR(task); - CERROR("Can't start router checker thread: %d\n", rc); - /* block until event callback signals exit */ - down(&the_lnet.ln_rc_signal); - rc = LNetEQFree(the_lnet.ln_rc_eqh); - LASSERT(rc == 0); - the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN; - return -ENOMEM; - } + return 0; +} +void lnet_router_post_mt_start(void) +{ if (check_routers_before_use) { /* Note that a helpful side-effect of pinging all known routers * at startup is that it makes them drop stale connections they * may have to a previous instance of me. */ lnet_wait_known_routerstate(); } - - return 0; } void -lnet_router_checker_stop (void) +lnet_router_cleanup(void) { int rc; - if (the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN) - return; - - LASSERT (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING); - the_lnet.ln_rc_state = LNET_RC_STATE_STOPPING; - /* wakeup the RC thread if it's sleeping */ - wake_up(&the_lnet.ln_rc_waitq); - - /* block until event callback signals exit */ - down(&the_lnet.ln_rc_signal); - LASSERT(the_lnet.ln_rc_state == LNET_RC_STATE_SHUTDOWN); - rc = LNetEQFree(the_lnet.ln_rc_eqh); LASSERT(rc == 0); return; } -static void +void lnet_prune_rc_data(int wait_unlink) { struct lnet_rc_data *rcd; @@ -1200,7 +1168,7 @@ lnet_prune_rc_data(int wait_unlink) struct list_head head; int i = 2; - if (likely(the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING && + if (likely(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING && list_empty(&the_lnet.ln_rcd_deathrow) && list_empty(&the_lnet.ln_rcd_zombie))) return; @@ -1209,7 +1177,7 @@ lnet_prune_rc_data(int wait_unlink) lnet_net_lock(LNET_LOCK_EX); - if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) { + if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) { /* router checker is stopping, prune all */ list_for_each_entry(lp, &the_lnet.ln_routers, lpni_rtr_list) { @@ -1273,18 +1241,13 @@ lnet_prune_rc_data(int wait_unlink) } /* - * This function is called to check if the RC should block indefinitely. - * It's called from lnet_router_checker() as well as being passed to - * wait_event_interruptible() to avoid the lost wake_up problem. - * - * When it's called from wait_event_interruptible() it is necessary to - * also not sleep if the rc state is not running to avoid a deadlock - * when the system is shutting down + * This function is called from the monitor thread to check if there are + * any active routers that need to be checked. */ -static inline bool +inline bool lnet_router_checker_active(void) { - if (the_lnet.ln_rc_state != LNET_RC_STATE_RUNNING) + if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) return true; /* Router Checker thread needs to run when routing is enabled in @@ -1292,79 +1255,58 @@ lnet_router_checker_active(void) if (the_lnet.ln_routing) return true; + /* if there are routers that need to be cleaned up then do so */ + if (!list_empty(&the_lnet.ln_rcd_deathrow) || + !list_empty(&the_lnet.ln_rcd_zombie)) + return true; + return !list_empty(&the_lnet.ln_routers) && (live_router_check_interval > 0 || dead_router_check_interval > 0); } -static int -lnet_router_checker(void *arg) +void +lnet_check_routers(void) { struct lnet_peer_ni *rtr; struct list_head *entry; + __u64 version; + int cpt; + int cpt2; - cfs_block_allsigs(); - - while (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING) { - __u64 version; - int cpt; - int cpt2; - - cpt = lnet_net_lock_current(); + cpt = lnet_net_lock_current(); rescan: - version = the_lnet.ln_routers_version; - - list_for_each(entry, &the_lnet.ln_routers) { - rtr = list_entry(entry, struct lnet_peer_ni, - lpni_rtr_list); - - cpt2 = rtr->lpni_cpt; - if (cpt != cpt2) { - lnet_net_unlock(cpt); - cpt = cpt2; - lnet_net_lock(cpt); - /* the routers list has changed */ - if (version != the_lnet.ln_routers_version) - goto rescan; - } + version = the_lnet.ln_routers_version; - lnet_ping_router_locked(rtr); + list_for_each(entry, &the_lnet.ln_routers) { + rtr = list_entry(entry, struct lnet_peer_ni, + lpni_rtr_list); - /* NB dropped lock */ - if (version != the_lnet.ln_routers_version) { - /* the routers list has changed */ + cpt2 = rtr->lpni_cpt; + if (cpt != cpt2) { + lnet_net_unlock(cpt); + cpt = cpt2; + lnet_net_lock(cpt); + /* the routers list has changed */ + if (version != the_lnet.ln_routers_version) goto rescan; - } } - if (the_lnet.ln_routing) - lnet_update_ni_status_locked(); - - lnet_net_unlock(cpt); - - lnet_prune_rc_data(0); /* don't wait for UNLINK */ + lnet_ping_router_locked(rtr); - /* Call schedule_timeout() here always adds 1 to load average - * because kernel counts # active tasks as nr_running - * + nr_uninterruptible. */ - /* if there are any routes then wakeup every second. If - * there are no routes then sleep indefinitely until woken - * up by a user adding a route */ - if (!lnet_router_checker_active()) - wait_event_interruptible(the_lnet.ln_rc_waitq, - lnet_router_checker_active()); - else - wait_event_interruptible_timeout(the_lnet.ln_rc_waitq, - false, - cfs_time_seconds(1)); + /* NB dropped lock */ + if (version != the_lnet.ln_routers_version) { + /* the routers list has changed */ + goto rescan; + } } - lnet_prune_rc_data(1); /* wait for UNLINK */ + if (the_lnet.ln_routing) + lnet_update_ni_status_locked(); - the_lnet.ln_rc_state = LNET_RC_STATE_SHUTDOWN; - up(&the_lnet.ln_rc_signal); - /* The unlink event callback will signal final completion */ - return 0; + lnet_net_unlock(cpt); + + lnet_prune_rc_data(0); /* don't wait for UNLINK */ } void