+/* called with res_lock held */
+void
+lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
+{
+ struct lnet_rsp_tracker *rspt;
+
+ /*
+ * msg has a refcount on the MD so the MD is not going away.
+ * The rspt queue for the cpt is protected by
+ * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
+ */
+ if (!md->md_rspt_ptr)
+ return;
+
+ rspt = md->md_rspt_ptr;
+
+ /* debug code */
+ LASSERT(rspt->rspt_cpt == cpt);
+
+ md->md_rspt_ptr = NULL;
+
+ if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
+ /*
+ * The monitor thread has invalidated this handle because the
+ * response timed out, but it failed to lookup the MD. That
+ * means this response tracker is on the zombie list. We can
+ * safely remove it under the resource lock (held by caller) and
+ * free the response tracker block.
+ */
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, cpt);
+ } else {
+ /*
+ * invalidate the handle to indicate that a response has been
+ * received, which will then lead the monitor thread to clean up
+ * the rspt block.
+ */
+ LNetInvalidateMDHandle(&rspt->rspt_mdh);
+ }
+}
+
+void
+lnet_clean_zombie_rstqs(void)
+{
+ struct lnet_rsp_tracker *rspt, *tmp;
+ int i;
+
+ cfs_cpt_for_each(i, lnet_cpt_table()) {
+ list_for_each_entry_safe(rspt, tmp,
+ the_lnet.ln_mt_zombie_rstqs[i],
+ rspt_on_list) {
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, i);
+ }
+ }
+
+ cfs_percpt_free(the_lnet.ln_mt_zombie_rstqs);
+}
+
+static void
+lnet_finalize_expired_responses(void)
+{
+ struct lnet_libmd *md;
+ struct list_head local_queue;
+ struct lnet_rsp_tracker *rspt, *tmp;
+ ktime_t now;
+ int i;
+
+ if (the_lnet.ln_mt_rstq == NULL)
+ return;
+
+ cfs_cpt_for_each(i, lnet_cpt_table()) {
+ INIT_LIST_HEAD(&local_queue);
+
+ lnet_net_lock(i);
+ if (!the_lnet.ln_mt_rstq[i]) {
+ lnet_net_unlock(i);
+ continue;
+ }
+ list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
+ lnet_net_unlock(i);
+
+ now = ktime_get();
+
+ list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
+ /*
+ * The rspt mdh will be invalidated when a response
+ * is received or whenever we want to discard the
+ * block the monitor thread will walk the queue
+ * and clean up any rsts with an invalid mdh.
+ * The monitor thread will walk the queue until
+ * the first unexpired rspt block. This means that
+ * some rspt blocks which received their
+ * corresponding responses will linger in the
+ * queue until they are cleaned up eventually.
+ */
+ lnet_res_lock(i);
+ if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
+ lnet_res_unlock(i);
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, i);
+ continue;
+ }
+
+ if (ktime_compare(now, rspt->rspt_deadline) >= 0 ||
+ the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN) {
+ struct lnet_peer_ni *lpni;
+ lnet_nid_t nid;
+
+ md = lnet_handle2md(&rspt->rspt_mdh);
+ if (!md) {
+ /* MD has been queued for unlink, but
+ * rspt hasn't been detached (Note we've
+ * checked above that the rspt_mdh is
+ * valid). Since we cannot lookup the MD
+ * we're unable to detach the rspt
+ * ourselves. Thus, move the rspt to the
+ * zombie list where we'll wait for
+ * either:
+ * 1. The remaining operations on the
+ * MD to complete. In this case the
+ * final operation will result in
+ * lnet_msg_detach_md()->
+ * lnet_detach_rsp_tracker() where
+ * we will clean up this response
+ * tracker.
+ * 2. LNet to shutdown. In this case
+ * we'll wait until after all LND Nets
+ * have shutdown and then we can
+ * safely free any remaining response
+ * tracker blocks on the zombie list.
+ * Note: We need to hold the resource
+ * lock when adding to the zombie list
+ * because we may have concurrent access
+ * with lnet_detach_rsp_tracker().
+ */
+ LNetInvalidateMDHandle(&rspt->rspt_mdh);
+ list_move(&rspt->rspt_on_list,
+ the_lnet.ln_mt_zombie_rstqs[i]);
+ lnet_res_unlock(i);
+ continue;
+ }
+ LASSERT(md->md_rspt_ptr == rspt);
+ md->md_rspt_ptr = NULL;
+ lnet_res_unlock(i);
+
+ LNetMDUnlink(rspt->rspt_mdh);
+
+ nid = rspt->rspt_next_hop_nid;
+
+ list_del(&rspt->rspt_on_list);
+ lnet_rspt_free(rspt, i);
+
+ /* If we're shutting down we just want to clean
+ * up the rspt blocks
+ */
+ if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
+ continue;
+
+ lnet_net_lock(i);
+ the_lnet.ln_counters[i]->lct_health.lch_response_timeout_count++;
+ lnet_net_unlock(i);
+
+ CDEBUG(D_NET,
+ "Response timeout: md = %p: nid = %s\n",
+ md, libcfs_nid2str(nid));
+
+ /*
+ * If there is a timeout on the response
+ * from the next hop decrement its health
+ * value so that we don't use it
+ */
+ lnet_net_lock(0);
+ lpni = lnet_find_peer_ni_locked(nid);
+ if (lpni) {
+ lnet_handle_remote_failure_locked(lpni);
+ lnet_peer_ni_decref_locked(lpni);
+ }
+ lnet_net_unlock(0);
+ } else {
+ lnet_res_unlock(i);
+ break;
+ }
+ }
+
+ if (!list_empty(&local_queue)) {
+ lnet_net_lock(i);
+ list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
+ lnet_net_unlock(i);
+ }
+ }
+}
+