Whamcloud - gitweb
LU-9480 lnet: add discovery thread 86/25786/23
authorOlaf Weber <olaf@sgi.com>
Fri, 27 Jan 2017 15:32:11 +0000 (16:32 +0100)
committerAmir Shehata <amir.shehata@intel.com>
Tue, 22 Aug 2017 16:25:51 +0000 (16:25 +0000)
Add the discovery thread, which will be used to handle peer
discovery. This change adds the thread and the infrastructure
that starts and stops it. The thread itself does trivial work.

Peer Discovery gets its own event queue (ln_dc_eqh), a queue
for peers that are to be discovered (ln_dc_request), a queue
for peers waiting for an event (ln_dc_working), a wait queue
head so the thread can sleep (ln_dc_waitq), and start/stop
state (ln_dc_state).

Peer discovery is started from lnet_select_pathway(), for
GET and PUT messages not sent to the LNET_RESERVED_PORTAL.
This criterion means that discovery will not be triggered by
the messages used in discovery, and neither will an LNet ping
trigger it.

Test-Parameters: trivial
Signed-off-by: Olaf Weber <olaf@sgi.com>
Signed-off-by: Amir Shehata <amir.shehata@intel.com>
Change-Id: I38a48ab7f61c8ef1b994cd17069729f243912bdf
Reviewed-on: https://review.whamcloud.com/25786
Reviewed-by: Olaf Weber <olaf.weber@hpe.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/lnet/api-ni.c
lnet/lnet/lib-move.c
lnet/lnet/peer.c

index 0e26973..6725551 100644 (file)
@@ -530,6 +530,7 @@ int lnet_lib_init(void);
 void lnet_lib_exit(void);
 
 extern unsigned int lnet_numa_range;
+extern unsigned int lnet_peer_discovery_disabled;
 extern int portal_rotor;
 
 int lnet_notify(struct lnet_ni *ni, lnet_nid_t peer, int alive,
@@ -859,6 +860,9 @@ struct lnet_peer_ni *lnet_nid2peerni_ex(lnet_nid_t nid, int cpt);
 struct lnet_peer_ni *lnet_find_peer_ni_locked(lnet_nid_t nid);
 void lnet_peer_net_added(struct lnet_net *net);
 lnet_nid_t lnet_peer_primary_nid_locked(lnet_nid_t nid);
+int lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt);
+int lnet_peer_discovery_start(void);
+void lnet_peer_discovery_stop(void);
 void lnet_peer_tables_cleanup(struct lnet_net *net);
 void lnet_peer_uninit(void);
 int lnet_peer_tables_create(void);
@@ -950,4 +954,6 @@ lnet_peer_ni_is_primary(struct lnet_peer_ni *lpni)
        return lpni->lpni_nid == lpni->lpni_peer_net->lpn_peer->lp_primary_nid;
 }
 
+bool lnet_peer_is_uptodate(struct lnet_peer *lp);
+
 #endif
index 8bfa42e..03cfe48 100644 (file)
@@ -543,10 +543,61 @@ struct lnet_peer {
 
        /* peer state flags */
        unsigned                lp_state;
+
+       /* link on discovery-related lists */
+       struct list_head        lp_dc_list;
+
+       /* tasks waiting on discovery of this peer */
+       wait_queue_head_t       lp_dc_waitq;
 };
 
-#define LNET_PEER_MULTI_RAIL   (1 << 0)
-#define LNET_PEER_CONFIGURED   (1 << 1)
+/*
+ * The status flags in lp_state. Their semantics have chosen so that
+ * lp_state can be zero-initialized.
+ *
+ * A peer is marked MULTI_RAIL in two cases: it was configured using DLC
+ * as multi-rail aware, or the LNET_PING_FEAT_MULTI_RAIL bit was set.
+ *
+ * A peer is marked NO_DISCOVERY if the LNET_PING_FEAT_DISCOVERY bit was
+ * NOT set when the peer was pinged by discovery.
+ */
+#define LNET_PEER_MULTI_RAIL   (1 << 0)        /* Multi-rail aware */
+#define LNET_PEER_NO_DISCOVERY (1 << 1)        /* Peer disabled discovery */
+/*
+ * A peer is marked CONFIGURED if it was configured by DLC.
+ *
+ * In addition, a peer is marked DISCOVERED if it has fully passed
+ * through Peer Discovery.
+ *
+ * When Peer Discovery is disabled, the discovery thread will mark
+ * peers REDISCOVER to indicate that they should be re-examined if
+ * discovery is (re)enabled on the node.
+ *
+ * A peer that was created as the result of inbound traffic will not
+ * be marked at all.
+ */
+#define LNET_PEER_CONFIGURED   (1 << 2)        /* Configured via DLC */
+#define LNET_PEER_DISCOVERED   (1 << 3)        /* Peer was discovered */
+#define LNET_PEER_REDISCOVER   (1 << 4)        /* Discovery was disabled */
+/*
+ * A peer is marked DISCOVERING when discovery is in progress.
+ * The other flags below correspond to stages of discovery.
+ */
+#define LNET_PEER_DISCOVERING  (1 << 5)        /* Discovering */
+#define LNET_PEER_DATA_PRESENT (1 << 6)        /* Remote peer data present */
+#define LNET_PEER_NIDS_UPTODATE        (1 << 7)        /* Remote peer info uptodate */
+#define LNET_PEER_PING_SENT    (1 << 8)        /* Waiting for REPLY to Ping */
+#define LNET_PEER_PUSH_SENT    (1 << 9)        /* Waiting for ACK of Push */
+#define LNET_PEER_PING_FAILED  (1 << 10)       /* Ping send failure */
+#define LNET_PEER_PUSH_FAILED  (1 << 11)       /* Push send failure */
+/*
+ * A ping can be forced as a way to fix up state, or as a manual
+ * intervention by an admin.
+ * A push can be forced in circumstances that would normally not
+ * allow for one to happen.
+ */
+#define LNET_PEER_FORCE_PING   (1 << 12)       /* Forced Ping */
+#define LNET_PEER_FORCE_PUSH   (1 << 13)       /* Forced Push */
 
 struct lnet_peer_net {
        /* chain on lp_peer_nets */
@@ -767,6 +818,11 @@ struct lnet_msg_container {
        void                    **msc_finalizers;
 };
 
+/* Peer Discovery states */
+#define LNET_DC_STATE_SHUTDOWN         0       /* not started */
+#define LNET_DC_STATE_RUNNING          1       /* started up OK */
+#define LNET_DC_STATE_STOPPING         2       /* telling thread to stop */
+
 /* Router Checker states */
 #define LNET_RC_STATE_SHUTDOWN         0       /* not started */
 #define LNET_RC_STATE_RUNNING          1       /* started up OK */
@@ -844,6 +900,17 @@ typedef struct lnet {
        struct lnet_ping_buffer         *ln_ping_target;
        atomic_t                        ln_ping_target_seqno;
 
+       /* discovery event queue handle */
+       lnet_handle_eq_t                ln_dc_eqh;
+       /* discovery requests */
+       struct list_head                ln_dc_request;
+       /* discovery working list */
+       struct list_head                ln_dc_working;
+       /* discovery thread wait queue */
+       wait_queue_head_t               ln_dc_waitq;
+       /* discovery startup/shutdown state */
+       int                             ln_dc_state;
+
        /* router checker startup/shutdown state */
        int                             ln_rc_state;
        /* router checker's event queue */
index ee36085..f5acbc9 100644 (file)
@@ -75,6 +75,13 @@ module_param_call(lnet_interfaces_max, intf_max_set, param_get_int,
 MODULE_PARM_DESC(lnet_interfaces_max,
                "Maximum number of interfaces in a node.");
 
+unsigned lnet_peer_discovery_disabled = 0;
+static int discovery_set(const char *val, struct kernel_param *kp);
+module_param_call(lnet_peer_discovery_disabled, discovery_set, param_get_int,
+                 &lnet_peer_discovery_disabled, S_IRUGO|S_IWUSR);
+MODULE_PARM_DESC(lnet_peer_discovery_disabled,
+               "Set to 1 to disable peer discovery on this node.");
+
 /*
  * This sequence number keeps track of how many times DLC was used to
  * update the local NIs. It is incremented when a NI is added or
@@ -88,6 +95,23 @@ static int lnet_ping(struct lnet_process_id id, signed long timeout,
                     struct lnet_process_id __user *ids, int n_ids);
 
 static int
+discovery_set(const char *val, struct kernel_param *kp)
+{
+       int rc;
+       unsigned long value;
+
+       rc = kstrtoul(val, 0, &value);
+       if (rc) {
+               CERROR("Invalid module parameter value for 'lnet_peer_discovery_disabled'\n");
+               return rc;
+       }
+
+       *(unsigned *)kp->arg = (value) ? 1 : 0;
+
+       return 0;
+}
+
+static int
 intf_max_set(const char *val, struct kernel_param *kp)
 {
        int value, rc;
@@ -1987,6 +2011,10 @@ LNetNIInit(lnet_pid_t requested_pid)
        if (rc != 0)
                goto err_stop_ping;
 
+       rc = lnet_peer_discovery_start();
+       if (rc != 0)
+               goto err_stop_router_checker;
+
        lnet_fault_init();
        lnet_proc_init();
 
@@ -1994,6 +2022,8 @@ LNetNIInit(lnet_pid_t requested_pid)
 
        return 0;
 
+err_stop_router_checker:
+       lnet_router_checker_stop();
 err_stop_ping:
        lnet_ping_target_fini();
 err_acceptor_stop:
@@ -2043,6 +2073,7 @@ LNetNIFini()
                lnet_fault_fini();
 
                lnet_proc_fini();
+               lnet_peer_discovery_stop();
                lnet_router_checker_stop();
                lnet_ping_target_fini();
 
index 19757ca..65a4635 100644 (file)
@@ -1389,6 +1389,27 @@ lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *cur_ni,
        return best_ni;
 }
 
+/*
+ * Traffic to the LNET_RESERVED_PORTAL may not trigger peer discovery,
+ * because such traffic is required to perform discovery. We therefore
+ * exclude all GET and PUT on that portal. We also exclude all ACK and
+ * REPLY traffic, but that is because the portal is not tracked in the
+ * message structure for these message types. We could restrict this
+ * further by also checking for LNET_PROTO_PING_MATCHBITS.
+ */
+static bool
+lnet_msg_discovery(struct lnet_msg *msg)
+{
+       if (msg->msg_type == LNET_MSG_PUT) {
+               if (msg->msg_hdr.msg.put.ptl_index != LNET_RESERVED_PORTAL)
+                       return true;
+       } else if (msg->msg_type == LNET_MSG_GET) {
+               if (msg->msg_hdr.msg.get.ptl_index != LNET_RESERVED_PORTAL)
+                       return true;
+       }
+       return false;
+}
+
 static int
 lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
                    struct lnet_msg *msg, lnet_nid_t rtr_nid)
@@ -1401,7 +1422,6 @@ lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
        struct lnet_peer        *peer;
        struct lnet_peer_net    *peer_net;
        struct lnet_net         *local_net;
-       __u32                   seq;
        int                     cpt, cpt2, rc;
        bool                    routing;
        bool                    routing2;
@@ -1436,13 +1456,6 @@ again:
        routing2 = false;
        local_found = false;
 
-       seq = lnet_get_dlc_seq_locked();
-
-       if (the_lnet.ln_state != LNET_STATE_RUNNING) {
-               lnet_net_unlock(cpt);
-               return -ESHUTDOWN;
-       }
-
        /*
         * lnet_nid2peerni_locked() is the path that will find an
         * existing peer_ni, or create one and mark it as having been
@@ -1453,7 +1466,22 @@ again:
                lnet_net_unlock(cpt);
                return PTR_ERR(lpni);
        }
+       /*
+        * Now that we have a peer_ni, check if we want to discover
+        * the peer. Traffic to the LNET_RESERVED_PORTAL should not
+        * trigger discovery.
+        */
        peer = lpni->lpni_peer_net->lpn_peer;
+       if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) {
+               rc = lnet_discover_peer_locked(lpni, cpt);
+               if (rc) {
+                       lnet_peer_ni_decref_locked(lpni);
+                       lnet_net_unlock(cpt);
+                       return rc;
+               }
+               /* The peer may have changed. */
+               peer = lpni->lpni_peer_net->lpn_peer;
+       }
        lnet_peer_ni_decref_locked(lpni);
 
        /* If peer is not healthy then can not send anything to it */
@@ -1881,6 +1909,7 @@ send:
         */
        cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
        if (cpt != cpt2) {
+               __u32 seq = lnet_get_dlc_seq_locked();
                lnet_net_unlock(cpt);
                cpt = cpt2;
                lnet_net_lock(cpt);
index 5bb85ba..b2f73e9 100644 (file)
@@ -237,6 +237,8 @@ lnet_peer_alloc(lnet_nid_t nid)
 
        INIT_LIST_HEAD(&lp->lp_peer_list);
        INIT_LIST_HEAD(&lp->lp_peer_nets);
+       INIT_LIST_HEAD(&lp->lp_dc_list);
+       init_waitqueue_head(&lp->lp_dc_waitq);
        spin_lock_init(&lp->lp_lock);
        lp->lp_primary_nid = nid;
        lp->lp_cpt = lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
@@ -1461,6 +1463,10 @@ out_net_relock:
        return lpni;
 }
 
+/*
+ * Get a peer_ni for the given nid, create it if necessary. Takes a
+ * hold on the peer_ni.
+ */
 struct lnet_peer_ni *
 lnet_nid2peerni_locked(lnet_nid_t nid, lnet_nid_t pref, int cpt)
 {
@@ -1514,9 +1520,327 @@ out_mutex_unlock:
        mutex_unlock(&the_lnet.ln_api_mutex);
        lnet_net_lock(cpt);
 
+       /* Lock has been dropped, check again for shutdown. */
+       if (the_lnet.ln_state != LNET_STATE_RUNNING) {
+               if (!IS_ERR(lpni))
+                       lnet_peer_ni_decref_locked(lpni);
+               lpni = ERR_PTR(-ESHUTDOWN);
+       }
+
        return lpni;
 }
 
+/*
+ * Peer Discovery
+ */
+
+/*
+ * Is a peer uptodate from the point of view of discovery?
+ *
+ * If it is currently being processed, obviously not.
+ * A forced Ping or Push is also handled by the discovery thread.
+ *
+ * Otherwise look at whether the peer needs rediscovering.
+ */
+bool
+lnet_peer_is_uptodate(struct lnet_peer *lp)
+{
+       bool rc;
+
+       spin_lock(&lp->lp_lock);
+       if (lp->lp_state & (LNET_PEER_DISCOVERING |
+                           LNET_PEER_FORCE_PING |
+                           LNET_PEER_FORCE_PUSH)) {
+               rc = false;
+       } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
+               if (lnet_peer_discovery_disabled)
+                       rc = true;
+               else
+                       rc = false;
+       } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
+               if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
+                       rc = true;
+               else
+                       rc = false;
+       } else {
+               rc = false;
+       }
+       spin_unlock(&lp->lp_lock);
+
+       return rc;
+}
+
+/*
+ * Queue a peer for the attention of the discovery thread.  Call with
+ * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
+ * -EALREADY if the peer was already queued.
+ */
+static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
+{
+       int rc;
+
+       spin_lock(&lp->lp_lock);
+       if (!(lp->lp_state & LNET_PEER_DISCOVERING))
+               lp->lp_state |= LNET_PEER_DISCOVERING;
+       spin_unlock(&lp->lp_lock);
+       if (list_empty(&lp->lp_dc_list)) {
+               lnet_peer_addref_locked(lp);
+               list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
+               wake_up(&the_lnet.ln_dc_waitq);
+               rc = 0;
+       } else {
+               rc = -EALREADY;
+       }
+
+       return rc;
+}
+
+/*
+ * Discovery of a peer is complete. Wake all waiters on the peer.
+ * Call with lnet_net_lock/EX held.
+ */
+static void lnet_peer_discovery_complete(struct lnet_peer *lp)
+{
+       list_del_init(&lp->lp_dc_list);
+       wake_up_all(&lp->lp_dc_waitq);
+       lnet_peer_decref_locked(lp);
+}
+
+/*
+ * Peer discovery slow path. The ln_api_mutex is held on entry, and
+ * dropped/retaken within this function. An lnet_peer_ni is passed in
+ * because discovery could tear down an lnet_peer.
+ */
+int
+lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt)
+{
+       DEFINE_WAIT(wait);
+       struct lnet_peer *lp;
+       int rc = 0;
+
+again:
+       lnet_net_unlock(cpt);
+       lnet_net_lock(LNET_LOCK_EX);
+
+       /* We're willing to be interrupted. */
+       for (;;) {
+               lp = lpni->lpni_peer_net->lpn_peer;
+               prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
+               if (signal_pending(current))
+                       break;
+               if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
+                       break;
+               if (lnet_peer_is_uptodate(lp))
+                       break;
+               lnet_peer_queue_for_discovery(lp);
+               lnet_peer_addref_locked(lp);
+               lnet_net_unlock(LNET_LOCK_EX);
+               schedule();
+               finish_wait(&lp->lp_dc_waitq, &wait);
+               lnet_net_lock(LNET_LOCK_EX);
+               lnet_peer_decref_locked(lp);
+               /* Do not use lp beyond this point. */
+       }
+       finish_wait(&lp->lp_dc_waitq, &wait);
+
+       lnet_net_unlock(LNET_LOCK_EX);
+       lnet_net_lock(cpt);
+
+       if (signal_pending(current))
+               rc = -EINTR;
+       else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
+               rc = -ESHUTDOWN;
+       else if (!lnet_peer_is_uptodate(lp))
+               goto again;
+
+       return rc;
+}
+
+/*
+ * Event handler for the discovery EQ.
+ *
+ * Called with lnet_res_lock(cpt) held. The cpt is the
+ * lnet_cpt_of_cookie() of the md handle cookie.
+ */
+static void lnet_discovery_event_handler(lnet_event_t *event)
+{
+       wake_up(&the_lnet.ln_dc_waitq);
+}
+
+/*
+ * Wait for work to be queued or some other change that must be
+ * attended to. Returns non-zero if the discovery thread should shut
+ * down.
+ */
+static int lnet_peer_discovery_wait_for_work(void)
+{
+       int cpt;
+       int rc = 0;
+
+       DEFINE_WAIT(wait);
+
+       cpt = lnet_net_lock_current();
+       for (;;) {
+               prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
+                               TASK_INTERRUPTIBLE);
+               if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
+                       break;
+               if (!list_empty(&the_lnet.ln_dc_request))
+                       break;
+               lnet_net_unlock(cpt);
+               schedule();
+               finish_wait(&the_lnet.ln_dc_waitq, &wait);
+               cpt = lnet_net_lock_current();
+       }
+       finish_wait(&the_lnet.ln_dc_waitq, &wait);
+
+       if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
+               rc = -ESHUTDOWN;
+
+       lnet_net_unlock(cpt);
+
+       CDEBUG(D_NET, "woken: %d\n", rc);
+
+       return rc;
+}
+
+/* The discovery thread. */
+static int lnet_peer_discovery(void *arg)
+{
+       struct lnet_peer *lp;
+
+       CDEBUG(D_NET, "started\n");
+       cfs_block_allsigs();
+
+       for (;;) {
+               if (lnet_peer_discovery_wait_for_work())
+                       break;
+
+               lnet_net_lock(LNET_LOCK_EX);
+               if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
+                       break;
+               while (!list_empty(&the_lnet.ln_dc_request)) {
+                       lp = list_first_entry(&the_lnet.ln_dc_request,
+                                             struct lnet_peer, lp_dc_list);
+                       list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
+                       lnet_net_unlock(LNET_LOCK_EX);
+
+                       /* Just tag and release for now. */
+                       spin_lock(&lp->lp_lock);
+                       if (lnet_peer_discovery_disabled) {
+                               lp->lp_state |= LNET_PEER_REDISCOVER;
+                               lp->lp_state &= ~(LNET_PEER_DISCOVERED |
+                                                 LNET_PEER_NIDS_UPTODATE |
+                                                 LNET_PEER_DISCOVERING);
+                       } else {
+                               lp->lp_state |= (LNET_PEER_DISCOVERED |
+                                                LNET_PEER_NIDS_UPTODATE);
+                               lp->lp_state &= ~(LNET_PEER_REDISCOVER |
+                                                 LNET_PEER_DISCOVERING);
+                       }
+                       spin_unlock(&lp->lp_lock);
+
+                       lnet_net_lock(LNET_LOCK_EX);
+                       if (!(lp->lp_state & LNET_PEER_DISCOVERING))
+                               lnet_peer_discovery_complete(lp);
+                       if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
+                               break;
+               }
+               lnet_net_unlock(LNET_LOCK_EX);
+       }
+
+       CDEBUG(D_NET, "stopping\n");
+       /*
+        * Clean up before telling lnet_peer_discovery_stop() that
+        * we're done. Use wake_up() below to somewhat reduce the
+        * size of the thundering herd if there are multiple threads
+        * waiting on discovery of a single peer.
+        */
+       LNetEQFree(the_lnet.ln_dc_eqh);
+       LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
+
+       lnet_net_lock(LNET_LOCK_EX);
+       list_for_each_entry(lp, &the_lnet.ln_dc_request, lp_dc_list) {
+               spin_lock(&lp->lp_lock);
+               lp->lp_state |= LNET_PEER_REDISCOVER;
+               lp->lp_state &= ~(LNET_PEER_DISCOVERED |
+                                 LNET_PEER_DISCOVERING |
+                                 LNET_PEER_NIDS_UPTODATE);
+               spin_unlock(&lp->lp_lock);
+               lnet_peer_discovery_complete(lp);
+       }
+       list_for_each_entry(lp, &the_lnet.ln_dc_working, lp_dc_list) {
+               spin_lock(&lp->lp_lock);
+               lp->lp_state |= LNET_PEER_REDISCOVER;
+               lp->lp_state &= ~(LNET_PEER_DISCOVERED |
+                                 LNET_PEER_DISCOVERING |
+                                 LNET_PEER_NIDS_UPTODATE);
+               spin_unlock(&lp->lp_lock);
+               lnet_peer_discovery_complete(lp);
+       }
+       lnet_net_unlock(LNET_LOCK_EX);
+
+       the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
+       wake_up(&the_lnet.ln_dc_waitq);
+
+       CDEBUG(D_NET, "stopped\n");
+
+       return 0;
+}
+
+/* ln_api_mutex is held on entry. */
+int lnet_peer_discovery_start(void)
+{
+       struct task_struct *task;
+       int rc;
+
+       if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
+               return -EALREADY;
+
+       INIT_LIST_HEAD(&the_lnet.ln_dc_request);
+       INIT_LIST_HEAD(&the_lnet.ln_dc_working);
+       init_waitqueue_head(&the_lnet.ln_dc_waitq);
+
+       rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
+       if (rc != 0) {
+               CERROR("Can't allocate discovery EQ: %d\n", rc);
+               return rc;
+       }
+
+       the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
+       task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
+       if (IS_ERR(task)) {
+               rc = PTR_ERR(task);
+               CERROR("Can't start peer discovery thread: %d\n", rc);
+
+               LNetEQFree(the_lnet.ln_dc_eqh);
+               LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
+
+               the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
+       }
+
+       return rc;
+}
+
+/* ln_api_mutex is held on entry. */
+void lnet_peer_discovery_stop(void)
+{
+       if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
+               return;
+
+       LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
+       the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
+       wake_up(&the_lnet.ln_dc_waitq);
+
+       wait_event(the_lnet.ln_dc_waitq,
+                  the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
+
+       LASSERT(list_empty(&the_lnet.ln_dc_request));
+       LASSERT(list_empty(&the_lnet.ln_dc_working));
+}
+
+/* Debugging */
+
 void
 lnet_debug_peer(lnet_nid_t nid)
 {
@@ -1548,6 +1872,8 @@ lnet_debug_peer(lnet_nid_t nid)
        lnet_net_unlock(cpt);
 }
 
+/* Gathering information for userspace. */
+
 int lnet_get_peer_ni_info(__u32 peer_index, __u64 *nid,
                          char aliveness[LNET_MAX_STR_LEN],
                          __u32 *cpt_iter, __u32 *refcount,