Whamcloud - gitweb
LU-12222 lnet: Check if we're sending to ourselves
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index c1d4e84..262e432 100644 (file)
@@ -42,8 +42,6 @@
 #include <linux/nsproxy.h>
 #include <net/net_namespace.h>
 
-extern unsigned int lnet_current_net_count;
-
 static int local_nid_dist_zero = 1;
 module_param(local_nid_dist_zero, int, 0444);
 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
@@ -1910,8 +1908,11 @@ lnet_handle_spec_local_nmr_dst(struct lnet_send_data *sd)
  * Local Destination
  * MR Peer
  *
- * Run the selection algorithm on the peer NIs unless we're sending
- * a response, in this case just send to the destination
+ * Don't run the selection algorithm on the peer NIs. By specifying the
+ * local NID, we're also saying that we should always use the destination NID
+ * provided. This handles the case where we should be using the same
+ * destination NID for the all the messages which belong to the same RPC
+ * request.
  */
 static int
 lnet_handle_spec_local_mr_dst(struct lnet_send_data *sd)
@@ -1924,17 +1925,6 @@ lnet_handle_spec_local_mr_dst(struct lnet_send_data *sd)
                return -EINVAL;
        }
 
-       /*
-        * only run the selection algorithm to pick the peer_ni if we're
-        * sending a GET or a PUT. Responses are sent to the same
-        * destination NID provided.
-        */
-       if (!(sd->sd_send_case & SND_RESP)) {
-               sd->sd_best_lpni =
-                 lnet_find_best_lpni_on_net(sd, sd->sd_peer,
-                                            sd->sd_best_ni->ni_net->net_id);
-       }
-
        if (sd->sd_best_lpni &&
            sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid)
                return lnet_handle_lo_send(sd);
@@ -2009,15 +1999,21 @@ lnet_initiate_peer_discovery(struct lnet_peer_ni *lpni,
        }
        /* The peer may have changed. */
        peer = lpni->lpni_peer_net->lpn_peer;
+       spin_lock(&peer->lp_lock);
+       if (lnet_peer_is_uptodate_locked(peer)) {
+               spin_unlock(&peer->lp_lock);
+               lnet_peer_ni_decref_locked(lpni);
+               return 0;
+       }
        /* queue message and return */
        msg->msg_rtr_nid_param = rtr_nid;
        msg->msg_sending = 0;
        msg->msg_txpeer = NULL;
-       spin_lock(&peer->lp_lock);
        list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
+       primary_nid = peer->lp_primary_nid;
        spin_unlock(&peer->lp_lock);
+
        lnet_peer_ni_decref_locked(lpni);
-       primary_nid = peer->lp_primary_nid;
 
        CDEBUG(D_NET, "msg %p delayed. %s pending discovery\n",
                msg, libcfs_nid2str(primary_nid));
@@ -2638,13 +2634,20 @@ lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
 again:
 
        /*
-        * If we're being asked to send to the loopback interface, there
-        * is no need to go through any selection. We can just shortcut
-        * the entire process and send over lolnd
+        * If we're sending to ourselves then there is no need to go through
+        * any selection. We can shortcut the entire process and send over
+        * lolnd.
+        *
+        * However, we make two exceptions to this rule:
+        * 1. If the src_nid is specified then our API defines that we must send
+        *    via that interface.
+        * 2. Recovery messages must be sent to the lnet_ni that is being
+        *    recovered.
         */
        send_data.sd_msg = msg;
        send_data.sd_cpt = cpt;
-       if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) {
+       if (src_nid == LNET_NID_ANY && !msg->msg_recovery &&
+           lnet_nid2ni_locked(dst_nid, cpt)) {
                rc = lnet_handle_lo_send(&send_data);
                lnet_net_unlock(cpt);
                return rc;
@@ -2671,11 +2674,10 @@ again:
        msg->msg_src_nid_param = src_nid;
 
        /*
-        * Now that we have a peer_ni, check if we want to discover
-        * the peer. Traffic to the LNET_RESERVED_PORTAL should not
-        * trigger discovery.
+        * If necessary, perform discovery on the peer that owns this peer_ni.
+        * Note, this can result in the ownership of this peer_ni changing
+        * to another peer object.
         */
-       peer = lpni->lpni_peer_net->lpn_peer;
        rc = lnet_initiate_peer_discovery(lpni, msg, rtr_nid, cpt);
        if (rc) {
                lnet_peer_ni_decref_locked(lpni);
@@ -2684,6 +2686,8 @@ again:
        }
        lnet_peer_ni_decref_locked(lpni);
 
+       peer = lpni->lpni_peer_net->lpn_peer;
+
        /*
         * Identify the different send cases
         */
@@ -2804,25 +2808,57 @@ lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
                return;
 
        rspt = md->md_rspt_ptr;
-       md->md_rspt_ptr = NULL;
 
        /* debug code */
        LASSERT(rspt->rspt_cpt == cpt);
 
-       /*
-        * invalidate the handle to indicate that a response has been
-        * received, which will then lead the monitor thread to clean up
-        * the rspt block.
-        */
-       LNetInvalidateMDHandle(&rspt->rspt_mdh);
+       md->md_rspt_ptr = NULL;
+
+       if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
+               /*
+                * The monitor thread has invalidated this handle because the
+                * response timed out, but it failed to lookup the MD. That
+                * means this response tracker is on the zombie list. We can
+                * safely remove it under the resource lock (held by caller) and
+                * free the response tracker block.
+                */
+               list_del(&rspt->rspt_on_list);
+               lnet_rspt_free(rspt, cpt);
+       } else {
+               /*
+                * invalidate the handle to indicate that a response has been
+                * received, which will then lead the monitor thread to clean up
+                * the rspt block.
+                */
+               LNetInvalidateMDHandle(&rspt->rspt_mdh);
+       }
+}
+
+void
+lnet_clean_zombie_rstqs(void)
+{
+       struct lnet_rsp_tracker *rspt, *tmp;
+       int i;
+
+       cfs_cpt_for_each(i, lnet_cpt_table()) {
+               list_for_each_entry_safe(rspt, tmp,
+                                        the_lnet.ln_mt_zombie_rstqs[i],
+                                        rspt_on_list) {
+                       list_del(&rspt->rspt_on_list);
+                       lnet_rspt_free(rspt, i);
+               }
+       }
+
+       cfs_percpt_free(the_lnet.ln_mt_zombie_rstqs);
 }
 
 static void
-lnet_finalize_expired_responses(bool force)
+lnet_finalize_expired_responses(void)
 {
        struct lnet_libmd *md;
        struct list_head local_queue;
        struct lnet_rsp_tracker *rspt, *tmp;
+       ktime_t now;
        int i;
 
        if (the_lnet.ln_mt_rstq == NULL)
@@ -2839,6 +2875,8 @@ lnet_finalize_expired_responses(bool force)
                list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
                lnet_net_unlock(i);
 
+               now = ktime_get();
+
                list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
                        /*
                         * The rspt mdh will be invalidated when a response
@@ -2854,41 +2892,73 @@ lnet_finalize_expired_responses(bool force)
                        lnet_res_lock(i);
                        if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
                                lnet_res_unlock(i);
-                               list_del_init(&rspt->rspt_on_list);
+                               list_del(&rspt->rspt_on_list);
                                lnet_rspt_free(rspt, i);
                                continue;
                        }
 
-                       if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 ||
-                           force) {
+                       if (ktime_compare(now, rspt->rspt_deadline) >= 0 ||
+                           the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN) {
                                struct lnet_peer_ni *lpni;
                                lnet_nid_t nid;
 
                                md = lnet_handle2md(&rspt->rspt_mdh);
                                if (!md) {
+                                       /* MD has been queued for unlink, but
+                                        * rspt hasn't been detached (Note we've
+                                        * checked above that the rspt_mdh is
+                                        * valid). Since we cannot lookup the MD
+                                        * we're unable to detach the rspt
+                                        * ourselves. Thus, move the rspt to the
+                                        * zombie list where we'll wait for
+                                        * either:
+                                        *   1. The remaining operations on the
+                                        *   MD to complete. In this case the
+                                        *   final operation will result in
+                                        *   lnet_msg_detach_md()->
+                                        *   lnet_detach_rsp_tracker() where
+                                        *   we will clean up this response
+                                        *   tracker.
+                                        *   2. LNet to shutdown. In this case
+                                        *   we'll wait until after all LND Nets
+                                        *   have shutdown and then we can
+                                        *   safely free any remaining response
+                                        *   tracker blocks on the zombie list.
+                                        * Note: We need to hold the resource
+                                        * lock when adding to the zombie list
+                                        * because we may have concurrent access
+                                        * with lnet_detach_rsp_tracker().
+                                        */
                                        LNetInvalidateMDHandle(&rspt->rspt_mdh);
+                                       list_move(&rspt->rspt_on_list,
+                                                 the_lnet.ln_mt_zombie_rstqs[i]);
                                        lnet_res_unlock(i);
-                                       list_del_init(&rspt->rspt_on_list);
-                                       lnet_rspt_free(rspt, i);
                                        continue;
                                }
                                LASSERT(md->md_rspt_ptr == rspt);
                                md->md_rspt_ptr = NULL;
                                lnet_res_unlock(i);
 
+                               LNetMDUnlink(rspt->rspt_mdh);
+
+                               nid = rspt->rspt_next_hop_nid;
+
+                               list_del(&rspt->rspt_on_list);
+                               lnet_rspt_free(rspt, i);
+
+                               /* If we're shutting down we just want to clean
+                                * up the rspt blocks
+                                */
+                               if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
+                                       continue;
+
                                lnet_net_lock(i);
                                the_lnet.ln_counters[i]->lct_health.lch_response_timeout_count++;
                                lnet_net_unlock(i);
 
-                               list_del_init(&rspt->rspt_on_list);
-
-                               nid = rspt->rspt_next_hop_nid;
-
                                CDEBUG(D_NET,
                                       "Response timeout: md = %p: nid = %s\n",
                                       md, libcfs_nid2str(nid));
-                               LNetMDUnlink(rspt->rspt_mdh);
-                               lnet_rspt_free(rspt, i);
 
                                /*
                                 * If there is a timeout on the response
@@ -2908,10 +2978,11 @@ lnet_finalize_expired_responses(bool force)
                        }
                }
 
-               lnet_net_lock(i);
-               if (!list_empty(&local_queue))
+               if (!list_empty(&local_queue)) {
+                       lnet_net_lock(i);
                        list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
-               lnet_net_unlock(i);
+                       lnet_net_unlock(i);
+               }
        }
 }
 
@@ -3184,26 +3255,6 @@ lnet_recover_local_nis(void)
        lnet_net_unlock(0);
 }
 
-static struct list_head **
-lnet_create_array_of_queues(void)
-{
-       struct list_head **qs;
-       struct list_head *q;
-       int i;
-
-       qs = cfs_percpt_alloc(lnet_cpt_table(),
-                             sizeof(struct list_head));
-       if (!qs) {
-               CERROR("Failed to allocate queues\n");
-               return NULL;
-       }
-
-       cfs_percpt_for_each(q, i, qs)
-               INIT_LIST_HEAD(q);
-
-       return qs;
-}
-
 static int
 lnet_resendqs_create(void)
 {
@@ -3468,7 +3519,7 @@ lnet_monitor_thread(void *arg)
                lnet_resend_pending_msgs();
 
                if (now >= rsp_timeout) {
-                       lnet_finalize_expired_responses(false);
+                       lnet_finalize_expired_responses();
                        rsp_timeout = now + (lnet_transaction_timeout / 2);
                }
 
@@ -3495,9 +3546,13 @@ lnet_monitor_thread(void *arg)
                               min((unsigned int) alive_router_check_interval /
                                        lnet_current_net_count,
                                   lnet_transaction_timeout / 2));
-               wait_event_interruptible_timeout(the_lnet.ln_mt_waitq,
-                                               false,
-                                               cfs_time_seconds(interval));
+               wait_for_completion_interruptible_timeout(
+                       &the_lnet.ln_mt_wait_complete,
+                       cfs_time_seconds(interval));
+               /* Must re-init the completion before testing anything,
+                * including ln_mt_state.
+                */
+               reinit_completion(&the_lnet.ln_mt_wait_complete);
        }
 
        /* Shutting down */
@@ -3652,6 +3707,7 @@ lnet_mt_event_handler(struct lnet_event *event)
        case LNET_EVENT_UNLINK:
                CDEBUG(D_NET, "%s recovery ping unlinked\n",
                       libcfs_nid2str(ev_info->mt_nid));
+               /* fallthrough */
        case LNET_EVENT_REPLY:
                lnet_handle_recovery_reply(ev_info, event->status,
                                           event->type == LNET_EVENT_UNLINK);
@@ -3690,7 +3746,7 @@ lnet_rsp_tracker_create(void)
 static void
 lnet_rsp_tracker_clean(void)
 {
-       lnet_finalize_expired_responses(true);
+       lnet_finalize_expired_responses();
 
        cfs_percpt_free(the_lnet.ln_mt_rstq);
        the_lnet.ln_mt_rstq = NULL;
@@ -3761,7 +3817,7 @@ void lnet_monitor_thr_stop(void)
        lnet_net_unlock(LNET_LOCK_EX);
 
        /* tell the monitor thread that we're shutting down */
-       wake_up(&the_lnet.ln_mt_waitq);
+       complete(&the_lnet.ln_mt_wait_complete);
 
        /* block until monitor thread signals that it's done */
        down(&the_lnet.ln_mt_signal);
@@ -3772,8 +3828,6 @@ void lnet_monitor_thr_stop(void)
        lnet_clean_local_ni_recoveryq();
        lnet_clean_peer_ni_recoveryq();
        lnet_clean_resendqs();
-
-       return;
 }
 
 void
@@ -5046,9 +5100,9 @@ LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
                         * current net namespace.
                         * If not, assign order above 0xffff0000,
                         * to make this ni not a priority. */
-                       if (!net_eq(ni->ni_net_ns, current->nsproxy->net_ns))
-                               order += 0xffff0000;
-
+                       if (current->nsproxy &&
+                           !net_eq(ni->ni_net_ns, current->nsproxy->net_ns))
+                                       order += 0xffff0000;
                        if (srcnidp != NULL)
                                *srcnidp = ni->ni_nid;
                        if (orderp != NULL)