Whamcloud - gitweb
LU-13502 lnet: Ensure LNet pings and pushes are always tracked
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index cefc4b6..8c297f9 100644 (file)
@@ -338,54 +338,6 @@ lnet_copy_iov2iov(unsigned int ndiov, struct kvec *diov, unsigned int doffset,
 }
 EXPORT_SYMBOL(lnet_copy_iov2iov);
 
-int
-lnet_extract_iov(int dst_niov, struct kvec *dst,
-                int src_niov, struct kvec *src,
-                unsigned int offset, unsigned int len)
-{
-       /* Initialise 'dst' to the subset of 'src' starting at 'offset',
-        * for exactly 'len' bytes, and return the number of entries.
-        * NB not destructive to 'src' */
-       unsigned int    frag_len;
-       unsigned int    niov;
-
-       if (len == 0)                           /* no data => */
-               return (0);                     /* no frags */
-
-       LASSERT(src_niov > 0);
-       while (offset >= src->iov_len) {      /* skip initial frags */
-               offset -= src->iov_len;
-               src_niov--;
-               src++;
-               LASSERT(src_niov > 0);
-       }
-
-       niov = 1;
-       for (;;) {
-               LASSERT(src_niov > 0);
-               LASSERT((int)niov <= dst_niov);
-
-               frag_len = src->iov_len - offset;
-               dst->iov_base = ((char *)src->iov_base) + offset;
-
-               if (len <= frag_len) {
-                       dst->iov_len = len;
-                       return (niov);
-               }
-
-               dst->iov_len = frag_len;
-
-               len -= frag_len;
-               dst++;
-               src++;
-               niov++;
-               src_niov--;
-               offset = 0;
-       }
-}
-EXPORT_SYMBOL(lnet_extract_iov);
-
-
 unsigned int
 lnet_kiov_nob(unsigned int niov, struct bio_vec *kiov)
 {
@@ -2029,6 +1981,7 @@ lnet_handle_find_routed_path(struct lnet_send_data *sd,
        struct lnet_route *last_route = NULL;
        struct lnet_peer_ni *lpni = NULL;
        struct lnet_peer_ni *gwni = NULL;
+       bool route_found = false;
        lnet_nid_t src_nid = (sd->sd_src_nid != LNET_NID_ANY) ? sd->sd_src_nid :
                (sd->sd_best_ni != NULL) ? sd->sd_best_ni->ni_nid :
                LNET_NID_ANY;
@@ -2042,15 +1995,20 @@ lnet_handle_find_routed_path(struct lnet_send_data *sd,
         */
        if (sd->sd_rtr_nid != LNET_NID_ANY) {
                gwni = lnet_find_peer_ni_locked(sd->sd_rtr_nid);
-               if (!gwni) {
-                       CERROR("No peer NI for gateway %s\n",
+               if (gwni) {
+                       gw = gwni->lpni_peer_net->lpn_peer;
+                       lnet_peer_ni_decref_locked(gwni);
+                       if (gw->lp_rtr_refcount) {
+                               local_lnet = LNET_NIDNET(sd->sd_rtr_nid);
+                               route_found = true;
+                       }
+               } else {
+                       CWARN("No peer NI for gateway %s. Attempting to find an alternative route.\n",
                               libcfs_nid2str(sd->sd_rtr_nid));
-                       return -EHOSTUNREACH;
                }
-               gw = gwni->lpni_peer_net->lpn_peer;
-               lnet_peer_ni_decref_locked(gwni);
-               local_lnet = LNET_NIDNET(sd->sd_rtr_nid);
-       } else {
+       }
+
+       if (!route_found) {
                /* we've already looked up the initial lpni using dst_nid */
                lpni = sd->sd_best_lpni;
                /* the peer tree must be in existence */
@@ -2662,6 +2620,8 @@ lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
        int cpt, rc;
        int md_cpt;
        __u32 send_case = 0;
+       bool final_hop;
+       bool mr_forwarding_allowed;
 
        memset(&send_data, 0, sizeof(send_data));
 
@@ -2745,17 +2705,49 @@ again:
        else
                send_case |= REMOTE_DST;
 
+       final_hop = false;
+       if (msg->msg_routing && (send_case & LOCAL_DST))
+               final_hop = true;
+
+       /* Determine whether to allow MR forwarding for this message.
+        * NB: MR forwarding is allowed if the message originator and the
+        * destination are both MR capable, and the destination lpni that was
+        * originally chosen by the originator is unhealthy or down.
+        * We check the MR capability of the destination further below
+        */
+       mr_forwarding_allowed = false;
+       if (final_hop) {
+               struct lnet_peer *src_lp;
+               struct lnet_peer_ni *src_lpni;
+
+               src_lpni = lnet_nid2peerni_locked(msg->msg_hdr.src_nid,
+                                                 LNET_NID_ANY, cpt);
+               /* We don't fail the send if we hit any errors here. We'll just
+                * try to send it via non-multi-rail criteria
+                */
+               if (!IS_ERR(src_lpni)) {
+                       src_lp = lpni->lpni_peer_net->lpn_peer;
+                       if (lnet_peer_is_multi_rail(src_lp) &&
+                           !lnet_is_peer_ni_alive(lpni))
+                               mr_forwarding_allowed = true;
+
+               }
+               CDEBUG(D_NET, "msg %p MR forwarding %s\n", msg,
+                      mr_forwarding_allowed ? "allowed" : "not allowed");
+       }
+
        /*
         * Deal with the peer as NMR in the following cases:
         * 1. the peer is NMR
         * 2. We're trying to recover a specific peer NI
-        * 3. I'm a router sending to the final destination
+        * 3. I'm a router sending to the final destination and MR forwarding is
+        *    not allowed for this message (as determined above).
         *    In this case the source of the message would've
         *    already selected the final destination so my job
         *    is to honor the selection.
         */
        if (!lnet_peer_is_multi_rail(peer) || msg->msg_recovery ||
-           (msg->msg_routing && (send_case & LOCAL_DST)))
+           (final_hop && !mr_forwarding_allowed))
                send_case |= NMR_DST;
        else
                send_case |= MR_DST;
@@ -3617,11 +3609,11 @@ lnet_send_ping(lnet_nid_t dest_nid,
        md.length    = LNET_PING_INFO_SIZE(nnis);
        md.threshold = 2; /* GET/REPLY */
        md.max_size  = 0;
-       md.options   = LNET_MD_TRUNCATE;
+       md.options   = LNET_MD_TRUNCATE | LNET_MD_TRACK_RESPONSE;
        md.user_ptr  = user_data;
        md.handler   = handler;
 
-       rc = LNetMDBind(md, LNET_UNLINK, mdh);
+       rc = LNetMDBind(&md, LNET_UNLINK, mdh);
        if (rc) {
                lnet_ping_buffer_decref(pbuf);
                CERROR("Can't bind MD: %d\n", rc);