+static int
+lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
+ struct lnet_msg *msg, lnet_nid_t rtr_nid)
+{
+ struct lnet_peer_ni *lpni;
+ struct lnet_peer *peer;
+ struct lnet_send_data send_data;
+ int cpt, rc;
+ int md_cpt;
+ __u32 send_case = 0;
+
+ memset(&send_data, 0, sizeof(send_data));
+
+ /*
+ * get an initial CPT to use for locking. The idea here is not to
+ * serialize the calls to select_pathway, so that as many
+ * operations can run concurrently as possible. To do that we use
+ * the CPT where this call is being executed. Later on when we
+ * determine the CPT to use in lnet_message_commit, we switch the
+ * lock and check if there was any configuration change. If none,
+ * then we proceed, if there is, then we restart the operation.
+ */
+ cpt = lnet_net_lock_current();
+
+ md_cpt = lnet_cpt_of_md(msg->msg_md, msg->msg_offset);
+ if (md_cpt == CFS_CPT_ANY)
+ md_cpt = cpt;
+
+again:
+
+ /*
+ * If we're being asked to send to the loopback interface, there
+ * is no need to go through any selection. We can just shortcut
+ * the entire process and send over lolnd
+ */
+ send_data.sd_msg = msg;
+ send_data.sd_cpt = cpt;
+ if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) {
+ rc = lnet_handle_lo_send(&send_data);
+ lnet_net_unlock(cpt);
+ return rc;
+ }
+
+ /*
+ * find an existing peer_ni, or create one and mark it as having been
+ * created due to network traffic. This call will create the
+ * peer->peer_net->peer_ni tree.
+ */
+ lpni = lnet_nid2peerni_locked(dst_nid, LNET_NID_ANY, cpt);
+ if (IS_ERR(lpni)) {
+ lnet_net_unlock(cpt);
+ return PTR_ERR(lpni);
+ }
+
+ /*
+ * Cache the original src_nid. If we need to resend the message
+ * then we'll need to know whether the src_nid was originally
+ * specified for this message. If it was originally specified,
+ * then we need to keep using the same src_nid since it's
+ * continuing the same sequence of messages.
+ */
+ msg->msg_src_nid_param = src_nid;
+
+ /*
+ * If necessary, perform discovery on the peer that owns this peer_ni.
+ * Note, this can result in the ownership of this peer_ni changing
+ * to another peer object.
+ */
+ rc = lnet_initiate_peer_discovery(lpni, msg, rtr_nid, cpt);
+ if (rc) {
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(cpt);
+ return rc;
+ }
+ lnet_peer_ni_decref_locked(lpni);
+
+ peer = lpni->lpni_peer_net->lpn_peer;
+
+ /*
+ * Identify the different send cases
+ */
+ if (src_nid == LNET_NID_ANY)
+ send_case |= SRC_ANY;
+ else
+ send_case |= SRC_SPEC;
+
+ if (lnet_get_net_locked(LNET_NIDNET(dst_nid)))
+ send_case |= LOCAL_DST;
+ else
+ send_case |= REMOTE_DST;
+
+ /*
+ * if this is a non-MR peer or if we're recovering a peer ni then
+ * let's consider this an NMR case so we can hit the destination
+ * NID.
+ */
+ if (!lnet_peer_is_multi_rail(peer) || msg->msg_recovery)
+ send_case |= NMR_DST;
+ else
+ send_case |= MR_DST;
+
+ if (msg->msg_type == LNET_MSG_REPLY ||
+ msg->msg_type == LNET_MSG_ACK)
+ send_case |= SND_RESP;
+
+ /* assign parameters to the send_data */
+ send_data.sd_rtr_nid = rtr_nid;
+ send_data.sd_src_nid = src_nid;
+ send_data.sd_dst_nid = dst_nid;
+ send_data.sd_best_lpni = lpni;
+ /*
+ * keep a pointer to the final destination in case we're going to
+ * route, so we'll need to access it later
+ */
+ send_data.sd_final_dst_lpni = lpni;
+ send_data.sd_peer = peer;
+ send_data.sd_md_cpt = md_cpt;
+ send_data.sd_send_case = send_case;
+
+ rc = lnet_handle_send_case_locked(&send_data);
+
+ /*
+ * Update the local cpt since send_data.sd_cpt might've been
+ * updated as a result of calling lnet_handle_send_case_locked().
+ */
+ cpt = send_data.sd_cpt;
+
+ if (rc == REPEAT_SEND)
+ goto again;
+
+ lnet_net_unlock(cpt);
+
+ return rc;
+}
+
+int
+lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
+{
+ lnet_nid_t dst_nid = msg->msg_target.nid;
+ int rc;
+
+ /*
+ * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
+ * but we might want to use pre-determined router for ACK/REPLY
+ * in the future
+ */
+ /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
+ LASSERT(msg->msg_txpeer == NULL);
+ LASSERT(msg->msg_txni == NULL);
+ LASSERT(!msg->msg_sending);
+ LASSERT(!msg->msg_target_is_router);
+ LASSERT(!msg->msg_receiving);
+
+ msg->msg_sending = 1;
+
+ LASSERT(!msg->msg_tx_committed);
+
+ rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid);
+ if (rc < 0) {
+ if (rc == -EHOSTUNREACH)
+ msg->msg_health_status = LNET_MSG_STATUS_REMOTE_ERROR;
+ else
+ msg->msg_health_status = LNET_MSG_STATUS_LOCAL_ERROR;
+ return rc;
+ }
+
+ if (rc == LNET_CREDIT_OK)
+ lnet_ni_send(msg->msg_txni, msg);
+
+ /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */
+ return 0;
+}
+
+enum lnet_mt_event_type {
+ MT_TYPE_LOCAL_NI = 0,
+ MT_TYPE_PEER_NI
+};
+
+struct lnet_mt_event_info {
+ enum lnet_mt_event_type mt_type;
+ lnet_nid_t mt_nid;
+};
+
+/* called with res_lock held */
+void
+lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
+{
+ struct lnet_rsp_tracker *rspt;
+
+ /*
+ * msg has a refcount on the MD so the MD is not going away.
+ * The rspt queue for the cpt is protected by
+ * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
+ */
+ if (!md->md_rspt_ptr)
+ return;
+
+ rspt = md->md_rspt_ptr;
+
+ /* debug code */
+ LASSERT(rspt->rspt_cpt == cpt);
+
+ md->md_rspt_ptr = NULL;
+
+ if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
+ /*
+ * The monitor thread has invalidated this handle because the
+ * response timed out, but it failed to lookup the MD. That
+ * means this response tracker is on the zombie list. We can
+ * safely remove it under the resource lock (held by caller) and
+ * free the response tracker block.
+ */
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, cpt);
+ } else {
+ /*
+ * invalidate the handle to indicate that a response has been
+ * received, which will then lead the monitor thread to clean up
+ * the rspt block.
+ */
+ LNetInvalidateMDHandle(&rspt->rspt_mdh);
+ }
+}
+
+void
+lnet_clean_zombie_rstqs(void)
+{
+ struct lnet_rsp_tracker *rspt, *tmp;
+ int i;
+
+ cfs_cpt_for_each(i, lnet_cpt_table()) {
+ list_for_each_entry_safe(rspt, tmp,
+ the_lnet.ln_mt_zombie_rstqs[i],
+ rspt_on_list) {
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, i);
+ }
+ }
+
+ cfs_percpt_free(the_lnet.ln_mt_zombie_rstqs);
+}
+
+static void
+lnet_finalize_expired_responses(void)
+{
+ struct lnet_libmd *md;
+ struct lnet_rsp_tracker *rspt, *tmp;
+ ktime_t now;
+ int i;
+
+ if (the_lnet.ln_mt_rstq == NULL)
+ return;
+
+ cfs_cpt_for_each(i, lnet_cpt_table()) {
+ LIST_HEAD(local_queue);
+
+ lnet_net_lock(i);
+ if (!the_lnet.ln_mt_rstq[i]) {
+ lnet_net_unlock(i);
+ continue;
+ }
+ list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
+ lnet_net_unlock(i);
+
+ now = ktime_get();
+
+ list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
+ /*
+ * The rspt mdh will be invalidated when a response
+ * is received or whenever we want to discard the
+ * block the monitor thread will walk the queue
+ * and clean up any rsts with an invalid mdh.
+ * The monitor thread will walk the queue until
+ * the first unexpired rspt block. This means that
+ * some rspt blocks which received their
+ * corresponding responses will linger in the
+ * queue until they are cleaned up eventually.
+ */
+ lnet_res_lock(i);
+ if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
+ lnet_res_unlock(i);
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, i);
+ continue;
+ }
+
+ if (ktime_compare(now, rspt->rspt_deadline) >= 0 ||
+ the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN) {
+ struct lnet_peer_ni *lpni;
+ lnet_nid_t nid;
+
+ md = lnet_handle2md(&rspt->rspt_mdh);
+ if (!md) {
+ /* MD has been queued for unlink, but
+ * rspt hasn't been detached (Note we've
+ * checked above that the rspt_mdh is
+ * valid). Since we cannot lookup the MD
+ * we're unable to detach the rspt
+ * ourselves. Thus, move the rspt to the
+ * zombie list where we'll wait for
+ * either:
+ * 1. The remaining operations on the
+ * MD to complete. In this case the
+ * final operation will result in
+ * lnet_msg_detach_md()->
+ * lnet_detach_rsp_tracker() where
+ * we will clean up this response
+ * tracker.
+ * 2. LNet to shutdown. In this case
+ * we'll wait until after all LND Nets
+ * have shutdown and then we can
+ * safely free any remaining response
+ * tracker blocks on the zombie list.
+ * Note: We need to hold the resource
+ * lock when adding to the zombie list
+ * because we may have concurrent access
+ * with lnet_detach_rsp_tracker().
+ */
+ LNetInvalidateMDHandle(&rspt->rspt_mdh);
+ list_move(&rspt->rspt_on_list,
+ the_lnet.ln_mt_zombie_rstqs[i]);
+ lnet_res_unlock(i);
+ continue;
+ }
+ LASSERT(md->md_rspt_ptr == rspt);
+ md->md_rspt_ptr = NULL;
+ lnet_res_unlock(i);
+
+ LNetMDUnlink(rspt->rspt_mdh);
+
+ nid = rspt->rspt_next_hop_nid;
+
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, i);
+
+ /* If we're shutting down we just want to clean
+ * up the rspt blocks
+ */
+ if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
+ continue;
+
+ lnet_net_lock(i);
+ the_lnet.ln_counters[i]->lct_health.lch_response_timeout_count++;
+ lnet_net_unlock(i);
+
+ CDEBUG(D_NET,
+ "Response timeout: md = %p: nid = %s\n",
+ md, libcfs_nid2str(nid));
+
+ /*
+ * If there is a timeout on the response
+ * from the next hop decrement its health
+ * value so that we don't use it
+ */
+ lnet_net_lock(0);
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (lpni) {
+ lnet_handle_remote_failure_locked(lpni);
+ lnet_peer_ni_decref_locked(lpni);
+ }
+ lnet_net_unlock(0);
+ } else {
+ lnet_res_unlock(i);
+ break;
+ }
+ }
+
+ if (!list_empty(&local_queue)) {
+ lnet_net_lock(i);
+ list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
+ lnet_net_unlock(i);
+ }
+ }
+}
+
+static void
+lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt)
+{
+ struct lnet_msg *msg;
+
+ while (!list_empty(resendq)) {
+ struct lnet_peer_ni *lpni;
+
+ msg = list_entry(resendq->next, struct lnet_msg,
+ msg_list);
+
+ list_del_init(&msg->msg_list);
+
+ lpni = lnet_find_peer_ni_locked(msg->msg_hdr.dest_nid);
+ if (!lpni) {
+ lnet_net_unlock(cpt);
+ CERROR("Expected that a peer is already created for %s\n",
+ libcfs_nid2str(msg->msg_hdr.dest_nid));
+ msg->msg_no_resend = true;
+ lnet_finalize(msg, -EFAULT);
+ lnet_net_lock(cpt);
+ } else {
+ struct lnet_peer *peer;
+ int rc;
+ lnet_nid_t src_nid = LNET_NID_ANY;
+
+ /*
+ * if this message is not being routed and the
+ * peer is non-MR then we must use the same
+ * src_nid that was used in the original send.
+ * Otherwise if we're routing the message (IE
+ * we're a router) then we can use any of our
+ * local interfaces. It doesn't matter to the
+ * final destination.
+ */
+ peer = lpni->lpni_peer_net->lpn_peer;
+ if (!msg->msg_routing &&
+ !lnet_peer_is_multi_rail(peer))
+ src_nid = le64_to_cpu(msg->msg_hdr.src_nid);
+
+ /*
+ * If we originally specified a src NID, then we
+ * must attempt to reuse it in the resend as well.
+ */
+ if (msg->msg_src_nid_param != LNET_NID_ANY)
+ src_nid = msg->msg_src_nid_param;
+ lnet_peer_ni_decref_locked(lpni);
+
+ lnet_net_unlock(cpt);
+ CDEBUG(D_NET, "resending %s->%s: %s recovery %d try# %d\n",
+ libcfs_nid2str(src_nid),
+ libcfs_id2str(msg->msg_target),
+ lnet_msgtyp2str(msg->msg_type),
+ msg->msg_recovery,
+ msg->msg_retry_count);
+ rc = lnet_send(src_nid, msg, LNET_NID_ANY);
+ if (rc) {
+ CERROR("Error sending %s to %s: %d\n",
+ lnet_msgtyp2str(msg->msg_type),
+ libcfs_id2str(msg->msg_target), rc);
+ msg->msg_no_resend = true;
+ lnet_finalize(msg, rc);
+ }
+ lnet_net_lock(cpt);
+ if (!rc)
+ the_lnet.ln_counters[cpt]->lct_health.lch_resend_count++;
+ }
+ }
+}
+
+static void
+lnet_resend_pending_msgs(void)
+{
+ int i;
+
+ cfs_cpt_for_each(i, lnet_cpt_table()) {
+ lnet_net_lock(i);
+ lnet_resend_pending_msgs_locked(the_lnet.ln_mt_resendqs[i], i);
+ lnet_net_unlock(i);
+ }
+}
+
+/* called with cpt and ni_lock held */
+static void
+lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt, bool force)
+{
+ struct lnet_handle_md recovery_mdh;
+
+ LNetInvalidateMDHandle(&recovery_mdh);
+
+ if (ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING ||
+ force) {
+ recovery_mdh = ni->ni_ping_mdh;
+ LNetInvalidateMDHandle(&ni->ni_ping_mdh);
+ }
+ lnet_ni_unlock(ni);
+ lnet_net_unlock(cpt);
+ if (!LNetMDHandleIsInvalid(recovery_mdh))
+ LNetMDUnlink(recovery_mdh);
+ lnet_net_lock(cpt);
+ lnet_ni_lock(ni);
+}
+
+static void
+lnet_recover_local_nis(void)
+{
+ struct lnet_mt_event_info *ev_info;
+ LIST_HEAD(processed_list);
+ LIST_HEAD(local_queue);
+ struct lnet_handle_md mdh;
+ struct lnet_ni *tmp;
+ struct lnet_ni *ni;
+ lnet_nid_t nid;
+ int healthv;
+ int rc;
+
+ /*
+ * splice the recovery queue on a local queue. We will iterate
+ * through the local queue and update it as needed. Once we're
+ * done with the traversal, we'll splice the local queue back on
+ * the head of the ln_mt_localNIRecovq. Any newly added local NIs
+ * will be traversed in the next iteration.
+ */
+ lnet_net_lock(0);
+ list_splice_init(&the_lnet.ln_mt_localNIRecovq,
+ &local_queue);
+ lnet_net_unlock(0);
+
+ list_for_each_entry_safe(ni, tmp, &local_queue, ni_recovery) {
+ /*
+ * if an NI is being deleted or it is now healthy, there
+ * is no need to keep it around in the recovery queue.
+ * The monitor thread is the only thread responsible for
+ * removing the NI from the recovery queue.
+ * Multiple threads can be adding NIs to the recovery
+ * queue.
+ */
+ healthv = atomic_read(&ni->ni_healthv);
+
+ lnet_net_lock(0);
+ lnet_ni_lock(ni);
+ if (ni->ni_state != LNET_NI_STATE_ACTIVE ||
+ healthv == LNET_MAX_HEALTH_VALUE) {
+ list_del_init(&ni->ni_recovery);
+ lnet_unlink_ni_recovery_mdh_locked(ni, 0, false);
+ lnet_ni_unlock(ni);
+ lnet_ni_decref_locked(ni, 0);
+ lnet_net_unlock(0);
+ continue;
+ }
+
+ /*
+ * if the local NI failed recovery we must unlink the md.
+ * But we want to keep the local_ni on the recovery queue
+ * so we can continue the attempts to recover it.
+ */
+ if (ni->ni_recovery_state & LNET_NI_RECOVERY_FAILED) {
+ lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
+ ni->ni_recovery_state &= ~LNET_NI_RECOVERY_FAILED;
+ }
+
+ lnet_ni_unlock(ni);
+ lnet_net_unlock(0);
+
+
+ CDEBUG(D_NET, "attempting to recover local ni: %s\n",
+ libcfs_nid2str(ni->ni_nid));
+
+ lnet_ni_lock(ni);
+ if (!(ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING)) {
+ ni->ni_recovery_state |= LNET_NI_RECOVERY_PENDING;
+ lnet_ni_unlock(ni);
+
+ LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
+ if (!ev_info) {
+ CERROR("out of memory. Can't recover %s\n",
+ libcfs_nid2str(ni->ni_nid));
+ lnet_ni_lock(ni);
+ ni->ni_recovery_state &=
+ ~LNET_NI_RECOVERY_PENDING;
+ lnet_ni_unlock(ni);
+ continue;
+ }
+
+ mdh = ni->ni_ping_mdh;
+ /*
+ * Invalidate the ni mdh in case it's deleted.
+ * We'll unlink the mdh in this case below.
+ */
+ LNetInvalidateMDHandle(&ni->ni_ping_mdh);
+ nid = ni->ni_nid;
+
+ /*
+ * remove the NI from the local queue and drop the
+ * reference count to it while we're recovering
+ * it. The reason for that, is that the NI could
+ * be deleted, and the way the code is structured
+ * is if we don't drop the NI, then the deletion
+ * code will enter a loop waiting for the
+ * reference count to be removed while holding the
+ * ln_mutex_lock(). When we look up the peer to
+ * send to in lnet_select_pathway() we will try to
+ * lock the ln_mutex_lock() as well, leading to
+ * a deadlock. By dropping the refcount and
+ * removing it from the list, we allow for the NI
+ * to be removed, then we use the cached NID to
+ * look it up again. If it's gone, then we just
+ * continue examining the rest of the queue.
+ */
+ lnet_net_lock(0);
+ list_del_init(&ni->ni_recovery);
+ lnet_ni_decref_locked(ni, 0);
+ lnet_net_unlock(0);
+
+ ev_info->mt_type = MT_TYPE_LOCAL_NI;
+ ev_info->mt_nid = nid;
+ rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+ ev_info, the_lnet.ln_mt_eqh, true);
+ /* lookup the nid again */
+ lnet_net_lock(0);
+ ni = lnet_nid2ni_locked(nid, 0);
+ if (!ni) {
+ /*
+ * the NI has been deleted when we dropped
+ * the ref count
+ */
+ lnet_net_unlock(0);
+ LNetMDUnlink(mdh);
+ continue;
+ }
+ /*
+ * Same note as in lnet_recover_peer_nis(). When
+ * we're sending the ping, the NI is free to be
+ * deleted or manipulated. By this point it
+ * could've been added back on the recovery queue,
+ * and a refcount taken on it.
+ * So we can't just add it blindly again or we'll
+ * corrupt the queue. We must check under lock if
+ * it's not on any list and if not then add it
+ * to the processed list, which will eventually be
+ * spliced back on to the recovery queue.
+ */
+ ni->ni_ping_mdh = mdh;
+ if (list_empty(&ni->ni_recovery)) {
+ list_add_tail(&ni->ni_recovery, &processed_list);
+ lnet_ni_addref_locked(ni, 0);
+ }
+ lnet_net_unlock(0);
+
+ lnet_ni_lock(ni);
+ if (rc)
+ ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
+ }
+ lnet_ni_unlock(ni);
+ }
+
+ /*
+ * put back the remaining NIs on the ln_mt_localNIRecovq to be
+ * reexamined in the next iteration.
+ */
+ list_splice_init(&processed_list, &local_queue);
+ lnet_net_lock(0);
+ list_splice(&local_queue, &the_lnet.ln_mt_localNIRecovq);
+ lnet_net_unlock(0);
+}
+
+static int
+lnet_resendqs_create(void)
+{
+ struct list_head **resendqs;
+ resendqs = lnet_create_array_of_queues();
+
+ if (!resendqs)
+ return -ENOMEM;
+
+ lnet_net_lock(LNET_LOCK_EX);
+ the_lnet.ln_mt_resendqs = resendqs;
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ return 0;
+}
+
+static void
+lnet_clean_local_ni_recoveryq(void)
+{
+ struct lnet_ni *ni;
+
+ /* This is only called when the monitor thread has stopped */
+ lnet_net_lock(0);
+
+ while (!list_empty(&the_lnet.ln_mt_localNIRecovq)) {
+ ni = list_entry(the_lnet.ln_mt_localNIRecovq.next,
+ struct lnet_ni, ni_recovery);
+ list_del_init(&ni->ni_recovery);
+ lnet_ni_lock(ni);
+ lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
+ lnet_ni_unlock(ni);
+ lnet_ni_decref_locked(ni, 0);
+ }
+
+ lnet_net_unlock(0);
+}
+
+static void
+lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt,
+ bool force)
+{
+ struct lnet_handle_md recovery_mdh;
+
+ LNetInvalidateMDHandle(&recovery_mdh);
+
+ if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING || force) {
+ recovery_mdh = lpni->lpni_recovery_ping_mdh;
+ LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+ }
+ spin_unlock(&lpni->lpni_lock);
+ lnet_net_unlock(cpt);
+ if (!LNetMDHandleIsInvalid(recovery_mdh))
+ LNetMDUnlink(recovery_mdh);
+ lnet_net_lock(cpt);
+ spin_lock(&lpni->lpni_lock);
+}
+
+static void
+lnet_clean_peer_ni_recoveryq(void)
+{
+ struct lnet_peer_ni *lpni, *tmp;
+
+ lnet_net_lock(LNET_LOCK_EX);
+
+ list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_mt_peerNIRecovq,
+ lpni_recovery) {
+ list_del_init(&lpni->lpni_recovery);
+ spin_lock(&lpni->lpni_lock);
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX, true);
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ }
+
+ lnet_net_unlock(LNET_LOCK_EX);
+}
+
+static void
+lnet_clean_resendqs(void)
+{
+ struct lnet_msg *msg, *tmp;
+ LIST_HEAD(msgs);
+ int i;
+
+ cfs_cpt_for_each(i, lnet_cpt_table()) {
+ lnet_net_lock(i);
+ list_splice_init(the_lnet.ln_mt_resendqs[i], &msgs);
+ lnet_net_unlock(i);
+ list_for_each_entry_safe(msg, tmp, &msgs, msg_list) {
+ list_del_init(&msg->msg_list);
+ msg->msg_no_resend = true;
+ lnet_finalize(msg, -ESHUTDOWN);
+ }
+ }
+
+ cfs_percpt_free(the_lnet.ln_mt_resendqs);
+}
+
+static void
+lnet_recover_peer_nis(void)
+{
+ struct lnet_mt_event_info *ev_info;
+ LIST_HEAD(processed_list);
+ LIST_HEAD(local_queue);
+ struct lnet_handle_md mdh;
+ struct lnet_peer_ni *lpni;
+ struct lnet_peer_ni *tmp;
+ lnet_nid_t nid;
+ int healthv;
+ int rc;
+
+ /*
+ * Always use cpt 0 for locking across all interactions with
+ * ln_mt_peerNIRecovq
+ */
+ lnet_net_lock(0);
+ list_splice_init(&the_lnet.ln_mt_peerNIRecovq,
+ &local_queue);
+ lnet_net_unlock(0);
+
+ list_for_each_entry_safe(lpni, tmp, &local_queue,
+ lpni_recovery) {
+ /*
+ * The same protection strategy is used here as is in the
+ * local recovery case.
+ */
+ lnet_net_lock(0);
+ healthv = atomic_read(&lpni->lpni_healthv);
+ spin_lock(&lpni->lpni_lock);
+ if (lpni->lpni_state & LNET_PEER_NI_DELETING ||
+ healthv == LNET_MAX_HEALTH_VALUE) {
+ list_del_init(&lpni->lpni_recovery);
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, false);
+ spin_unlock(&lpni->lpni_lock);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+ continue;
+ }
+
+ /*
+ * If the peer NI has failed recovery we must unlink the
+ * md. But we want to keep the peer ni on the recovery
+ * queue so we can try to continue recovering it
+ */
+ if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_FAILED) {
+ lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, true);
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_FAILED;
+ }
+
+ spin_unlock(&lpni->lpni_lock);
+ lnet_net_unlock(0);
+
+ /*
+ * NOTE: we're racing with peer deletion from user space.
+ * It's possible that a peer is deleted after we check its
+ * state. In this case the recovery can create a new peer
+ */
+ spin_lock(&lpni->lpni_lock);
+ if (!(lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) &&
+ !(lpni->lpni_state & LNET_PEER_NI_DELETING)) {
+ lpni->lpni_state |= LNET_PEER_NI_RECOVERY_PENDING;
+ spin_unlock(&lpni->lpni_lock);
+
+ LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
+ if (!ev_info) {
+ CERROR("out of memory. Can't recover %s\n",
+ libcfs_nid2str(lpni->lpni_nid));
+ spin_lock(&lpni->lpni_lock);
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+ spin_unlock(&lpni->lpni_lock);
+ continue;
+ }
+
+ /* look at the comments in lnet_recover_local_nis() */
+ mdh = lpni->lpni_recovery_ping_mdh;
+ LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
+ nid = lpni->lpni_nid;
+ lnet_net_lock(0);
+ list_del_init(&lpni->lpni_recovery);
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+
+ ev_info->mt_type = MT_TYPE_PEER_NI;
+ ev_info->mt_nid = nid;
+ rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
+ ev_info, the_lnet.ln_mt_eqh, true);
+ lnet_net_lock(0);
+ /*
+ * lnet_find_peer_ni_locked() grabs a refcount for
+ * us. No need to take it explicitly.
+ */
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (!lpni) {
+ lnet_net_unlock(0);
+ LNetMDUnlink(mdh);
+ continue;
+ }
+
+ lpni->lpni_recovery_ping_mdh = mdh;
+ /*
+ * While we're unlocked the lpni could've been
+ * readded on the recovery queue. In this case we
+ * don't need to add it to the local queue, since
+ * it's already on there and the thread that added
+ * it would've incremented the refcount on the
+ * peer, which means we need to decref the refcount
+ * that was implicitly grabbed by find_peer_ni_locked.
+ * Otherwise, if the lpni is still not on
+ * the recovery queue, then we'll add it to the
+ * processed list.
+ */
+ if (list_empty(&lpni->lpni_recovery))
+ list_add_tail(&lpni->lpni_recovery, &processed_list);
+ else
+ lnet_peer_ni_decref_locked(lpni);
+ lnet_net_unlock(0);
+
+ spin_lock(&lpni->lpni_lock);
+ if (rc)
+ lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
+ }
+ spin_unlock(&lpni->lpni_lock);
+ }
+
+ list_splice_init(&processed_list, &local_queue);
+ lnet_net_lock(0);
+ list_splice(&local_queue, &the_lnet.ln_mt_peerNIRecovq);
+ lnet_net_unlock(0);
+}
+
+static int
+lnet_monitor_thread(void *arg)
+{
+ time64_t recovery_timeout = 0;
+ time64_t rsp_timeout = 0;
+ int interval;
+ time64_t now;
+
+ wait_for_completion(&the_lnet.ln_started);
+ /*
+ * The monitor thread takes care of the following:
+ * 1. Checks the aliveness of routers
+ * 2. Checks if there are messages on the resend queue to resend
+ * them.
+ * 3. Check if there are any NIs on the local recovery queue and
+ * pings them
+ * 4. Checks if there are any NIs on the remote recovery queue
+ * and pings them.
+ */
+ cfs_block_allsigs();
+
+ while (the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING) {
+ now = ktime_get_real_seconds();
+
+ if (lnet_router_checker_active())
+ lnet_check_routers();
+
+ lnet_resend_pending_msgs();
+
+ if (now >= rsp_timeout) {
+ lnet_finalize_expired_responses();
+ rsp_timeout = now + (lnet_transaction_timeout / 2);
+ }
+
+ if (now >= recovery_timeout) {
+ lnet_recover_local_nis();
+ lnet_recover_peer_nis();
+ recovery_timeout = now + lnet_recovery_interval;
+ }
+
+ /*
+ * TODO do we need to check if we should sleep without
+ * timeout? Technically, an active system will always
+ * have messages in flight so this check will always
+ * evaluate to false. And on an idle system do we care
+ * if we wake up every 1 second? Although, we've seen
+ * cases where we get a complaint that an idle thread
+ * is waking up unnecessarily.
+ *
+ * Take into account the current net_count when you wake
+ * up for alive router checking, since we need to check
+ * possibly as many networks as we have configured.
+ */
+ interval = min(lnet_recovery_interval,
+ min((unsigned int) alive_router_check_interval /
+ lnet_current_net_count,
+ lnet_transaction_timeout / 2));
+ wait_for_completion_interruptible_timeout(
+ &the_lnet.ln_mt_wait_complete,
+ cfs_time_seconds(interval));
+ /* Must re-init the completion before testing anything,
+ * including ln_mt_state.
+ */
+ reinit_completion(&the_lnet.ln_mt_wait_complete);
+ }
+
+ /* Shutting down */
+ lnet_net_lock(LNET_LOCK_EX);
+ the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
+ lnet_net_unlock(LNET_LOCK_EX);
+
+ /* signal that the monitor thread is exiting */
+ up(&the_lnet.ln_mt_signal);
+
+ return 0;
+}
+
+/*
+ * lnet_send_ping
+ * Sends a ping.
+ * Returns == 0 if success
+ * Returns > 0 if LNetMDBind or prior fails
+ * Returns < 0 if LNetGet fails
+ */
+int
+lnet_send_ping(lnet_nid_t dest_nid,
+ struct lnet_handle_md *mdh, int nnis,
+ void *user_data, struct lnet_handle_eq eqh, bool recovery)