+/*
+ * Peer Discovery
+ */
+
+/*
+ * Is a peer uptodate from the point of view of discovery?
+ *
+ * If it is currently being processed, obviously not.
+ * A forced Ping or Push is also handled by the discovery thread.
+ *
+ * Otherwise look at whether the peer needs rediscovering.
+ */
+bool
+lnet_peer_is_uptodate(struct lnet_peer *lp)
+{
+ bool rc;
+
+ spin_lock(&lp->lp_lock);
+ if (lp->lp_state & (LNET_PEER_DISCOVERING |
+ LNET_PEER_FORCE_PING |
+ LNET_PEER_FORCE_PUSH)) {
+ rc = false;
+ } else if (lp->lp_state & LNET_PEER_REDISCOVER) {
+ if (lnet_peer_discovery_disabled)
+ rc = true;
+ else
+ rc = false;
+ } else if (lp->lp_state & LNET_PEER_DISCOVERED) {
+ if (lp->lp_state & LNET_PEER_NIDS_UPTODATE)
+ rc = true;
+ else
+ rc = false;
+ } else {
+ rc = false;
+ }
+ spin_unlock(&lp->lp_lock);
+
+ return rc;
+}
+
+/*
+ * Queue a peer for the attention of the discovery thread. Call with
+ * lnet_net_lock/EX held. Returns 0 if the peer was queued, and
+ * -EALREADY if the peer was already queued.
+ */
+static int lnet_peer_queue_for_discovery(struct lnet_peer *lp)
+{
+ int rc;
+
+ spin_lock(&lp->lp_lock);
+ if (!(lp->lp_state & LNET_PEER_DISCOVERING))
+ lp->lp_state |= LNET_PEER_DISCOVERING;
+ spin_unlock(&lp->lp_lock);
+ if (list_empty(&lp->lp_dc_list)) {
+ lnet_peer_addref_locked(lp);
+ list_add_tail(&lp->lp_dc_list, &the_lnet.ln_dc_request);
+ wake_up(&the_lnet.ln_dc_waitq);
+ rc = 0;
+ } else {
+ rc = -EALREADY;
+ }
+
+ return rc;
+}
+
+/*
+ * Discovery of a peer is complete. Wake all waiters on the peer.
+ * Call with lnet_net_lock/EX held.
+ */
+static void lnet_peer_discovery_complete(struct lnet_peer *lp)
+{
+ list_del_init(&lp->lp_dc_list);
+ wake_up_all(&lp->lp_dc_waitq);
+ lnet_peer_decref_locked(lp);
+}
+
+/*
+ * Peer discovery slow path. The ln_api_mutex is held on entry, and
+ * dropped/retaken within this function. An lnet_peer_ni is passed in
+ * because discovery could tear down an lnet_peer.
+ */
+int
+lnet_discover_peer_locked(struct lnet_peer_ni *lpni, int cpt)
+{
+ DEFINE_WAIT(wait);
+ struct lnet_peer *lp;
+ int rc = 0;
+
+again:
+ lnet_net_unlock(cpt);
+ lnet_net_lock(LNET_LOCK_EX);
+
+ /* We're willing to be interrupted. */
+ for (;;) {
+ lp = lpni->lpni_peer_net->lpn_peer;
+ prepare_to_wait(&lp->lp_dc_waitq, &wait, TASK_INTERRUPTIBLE);
+ if (signal_pending(current))
+ break;
+ if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
+ break;
+ if (lnet_peer_is_uptodate(lp))
+ break;
+ lnet_peer_queue_for_discovery(lp);
+ lnet_peer_addref_locked(lp);
+ lnet_net_unlock(LNET_LOCK_EX);
+ schedule();
+ finish_wait(&lp->lp_dc_waitq, &wait);
+ lnet_net_lock(LNET_LOCK_EX);
+ lnet_peer_decref_locked(lp);
+ /* Do not use lp beyond this point. */
+ }
+ finish_wait(&lp->lp_dc_waitq, &wait);
+
+ lnet_net_unlock(LNET_LOCK_EX);
+ lnet_net_lock(cpt);
+
+ if (signal_pending(current))
+ rc = -EINTR;
+ else if (the_lnet.ln_dc_state != LNET_DC_STATE_RUNNING)
+ rc = -ESHUTDOWN;
+ else if (!lnet_peer_is_uptodate(lp))
+ goto again;
+
+ return rc;
+}
+
+/*
+ * Event handler for the discovery EQ.
+ *
+ * Called with lnet_res_lock(cpt) held. The cpt is the
+ * lnet_cpt_of_cookie() of the md handle cookie.
+ */
+static void lnet_discovery_event_handler(lnet_event_t *event)
+{
+ wake_up(&the_lnet.ln_dc_waitq);
+}
+
+/*
+ * Wait for work to be queued or some other change that must be
+ * attended to. Returns non-zero if the discovery thread should shut
+ * down.
+ */
+static int lnet_peer_discovery_wait_for_work(void)
+{
+ int cpt;
+ int rc = 0;
+
+ DEFINE_WAIT(wait);
+
+ cpt = lnet_net_lock_current();
+ for (;;) {
+ prepare_to_wait(&the_lnet.ln_dc_waitq, &wait,
+ TASK_INTERRUPTIBLE);
+ if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
+ break;
+ if (!list_empty(&the_lnet.ln_dc_request))
+ break;
+ lnet_net_unlock(cpt);
+ schedule();
+ finish_wait(&the_lnet.ln_dc_waitq, &wait);
+ cpt = lnet_net_lock_current();
+ }
+ finish_wait(&the_lnet.ln_dc_waitq, &wait);
+
+ if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
+ rc = -ESHUTDOWN;
+
+ lnet_net_unlock(cpt);
+
+ CDEBUG(D_NET, "woken: %d\n", rc);
+
+ return rc;
+}
+
+/* The discovery thread. */
+static int lnet_peer_discovery(void *arg)
+{
+ struct lnet_peer *lp;
+
+ CDEBUG(D_NET, "started\n");
+ cfs_block_allsigs();
+
+ for (;;) {
+ if (lnet_peer_discovery_wait_for_work())
+ break;
+
+ lnet_net_lock(LNET_LOCK_EX);
+ if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
+ break;
+ while (!list_empty(&the_lnet.ln_dc_request)) {
+ lp = list_first_entry(&the_lnet.ln_dc_request,
+ struct lnet_peer, lp_dc_list);
+ list_move(&lp->lp_dc_list, &the_lnet.ln_dc_working);
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ /* Just tag and release for now. */
+ spin_lock(&lp->lp_lock);
+ if (lnet_peer_discovery_disabled) {
+ lp->lp_state |= LNET_PEER_REDISCOVER;
+ lp->lp_state &= ~(LNET_PEER_DISCOVERED |
+ LNET_PEER_NIDS_UPTODATE |
+ LNET_PEER_DISCOVERING);
+ } else {
+ lp->lp_state |= (LNET_PEER_DISCOVERED |
+ LNET_PEER_NIDS_UPTODATE);
+ lp->lp_state &= ~(LNET_PEER_REDISCOVER |
+ LNET_PEER_DISCOVERING);
+ }
+ spin_unlock(&lp->lp_lock);
+
+ lnet_net_lock(LNET_LOCK_EX);
+ if (!(lp->lp_state & LNET_PEER_DISCOVERING))
+ lnet_peer_discovery_complete(lp);
+ if (the_lnet.ln_dc_state == LNET_DC_STATE_STOPPING)
+ break;
+ }
+ lnet_net_unlock(LNET_LOCK_EX);
+ }
+
+ CDEBUG(D_NET, "stopping\n");
+ /*
+ * Clean up before telling lnet_peer_discovery_stop() that
+ * we're done. Use wake_up() below to somewhat reduce the
+ * size of the thundering herd if there are multiple threads
+ * waiting on discovery of a single peer.
+ */
+ LNetEQFree(the_lnet.ln_dc_eqh);
+ LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
+
+ lnet_net_lock(LNET_LOCK_EX);
+ list_for_each_entry(lp, &the_lnet.ln_dc_request, lp_dc_list) {
+ spin_lock(&lp->lp_lock);
+ lp->lp_state |= LNET_PEER_REDISCOVER;
+ lp->lp_state &= ~(LNET_PEER_DISCOVERED |
+ LNET_PEER_DISCOVERING |
+ LNET_PEER_NIDS_UPTODATE);
+ spin_unlock(&lp->lp_lock);
+ lnet_peer_discovery_complete(lp);
+ }
+ list_for_each_entry(lp, &the_lnet.ln_dc_working, lp_dc_list) {
+ spin_lock(&lp->lp_lock);
+ lp->lp_state |= LNET_PEER_REDISCOVER;
+ lp->lp_state &= ~(LNET_PEER_DISCOVERED |
+ LNET_PEER_DISCOVERING |
+ LNET_PEER_NIDS_UPTODATE);
+ spin_unlock(&lp->lp_lock);
+ lnet_peer_discovery_complete(lp);
+ }
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
+ wake_up(&the_lnet.ln_dc_waitq);
+
+ CDEBUG(D_NET, "stopped\n");
+
+ return 0;
+}
+
+/* ln_api_mutex is held on entry. */
+int lnet_peer_discovery_start(void)
+{
+ struct task_struct *task;
+ int rc;
+
+ if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN)
+ return -EALREADY;
+
+ INIT_LIST_HEAD(&the_lnet.ln_dc_request);
+ INIT_LIST_HEAD(&the_lnet.ln_dc_working);
+ init_waitqueue_head(&the_lnet.ln_dc_waitq);
+
+ rc = LNetEQAlloc(0, lnet_discovery_event_handler, &the_lnet.ln_dc_eqh);
+ if (rc != 0) {
+ CERROR("Can't allocate discovery EQ: %d\n", rc);
+ return rc;
+ }
+
+ the_lnet.ln_dc_state = LNET_DC_STATE_RUNNING;
+ task = kthread_run(lnet_peer_discovery, NULL, "lnet_discovery");
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
+ CERROR("Can't start peer discovery thread: %d\n", rc);
+
+ LNetEQFree(the_lnet.ln_dc_eqh);
+ LNetInvalidateEQHandle(&the_lnet.ln_dc_eqh);
+
+ the_lnet.ln_dc_state = LNET_DC_STATE_SHUTDOWN;
+ }
+
+ return rc;
+}
+
+/* ln_api_mutex is held on entry. */
+void lnet_peer_discovery_stop(void)
+{
+ if (the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN)
+ return;
+
+ LASSERT(the_lnet.ln_dc_state == LNET_DC_STATE_RUNNING);
+ the_lnet.ln_dc_state = LNET_DC_STATE_STOPPING;
+ wake_up(&the_lnet.ln_dc_waitq);
+
+ wait_event(the_lnet.ln_dc_waitq,
+ the_lnet.ln_dc_state == LNET_DC_STATE_SHUTDOWN);
+
+ LASSERT(list_empty(&the_lnet.ln_dc_request));
+ LASSERT(list_empty(&the_lnet.ln_dc_working));
+}
+
+/* Debugging */
+