Whamcloud - gitweb
LU-4006 lnet: abort messages whose MD has been unlinked
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index 30a70ae..406e2ed 100644 (file)
@@ -816,26 +816,30 @@ lnet_peer_alive_locked (lnet_peer_t *lp)
         return 0;
 }
 
-int
+/**
+ * \param msg The message to be sent.
+ * \param do_send True if lnet_ni_send() should be called in this function.
+ *       lnet_send() is going to lnet_net_unlock immediately after this, so
+ *       it sets do_send FALSE and I don't do the unlock/send/lock bit.
+ *
+ * \retval 0 If \a msg sent or OK to send.
+ * \retval EAGAIN If \a msg blocked for credit.
+ * \retval EHOSTUNREACH If the next hop of the message appears dead.
+ * \retval ECANCELED If the MD of the message has been unlinked.
+ */
+static int
 lnet_post_send_locked(lnet_msg_t *msg, int do_send)
 {
-       /* lnet_send is going to lnet_net_unlock immediately after this,
-        * so it sets do_send FALSE and I don't do the unlock/send/lock bit.
-        * I return EAGAIN if msg blocked, EHOSTUNREACH if msg_txpeer
-        * appears dead, and 0 if sent or OK to send */
-       struct lnet_peer        *lp = msg->msg_txpeer;
-       struct lnet_ni          *ni = lp->lp_ni;
-       struct lnet_tx_queue    *tq;
-       int                     cpt;
+       lnet_peer_t             *lp = msg->msg_txpeer;
+       lnet_ni_t               *ni = lp->lp_ni;
+       int                     cpt = msg->msg_tx_cpt;
+       struct lnet_tx_queue    *tq = ni->ni_tx_queues[cpt];
 
        /* non-lnet_send() callers have checked before */
        LASSERT(!do_send || msg->msg_tx_delayed);
        LASSERT(!msg->msg_receiving);
        LASSERT(msg->msg_tx_committed);
 
-       cpt = msg->msg_tx_cpt;
-       tq = ni->ni_tx_queues[cpt];
-
        /* NB 'lp' is always the next hop */
        if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
            lnet_peer_alive_locked(lp) == 0) {
@@ -852,6 +856,20 @@ lnet_post_send_locked(lnet_msg_t *msg, int do_send)
                return EHOSTUNREACH;
        }
 
+       if (msg->msg_md != NULL &&
+           (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED) != 0) {
+               lnet_net_unlock(cpt);
+
+               CNETERR("Aborting message for %s: LNetM[DE]Unlink() already "
+                       "called on the MD/ME.\n",
+                       libcfs_id2str(msg->msg_target));
+               if (do_send)
+                       lnet_finalize(ni, msg, -ECANCELED);
+
+               lnet_net_lock(cpt);
+               return ECANCELED;
+       }
+
         if (!msg->msg_peertxcredit) {
                 LASSERT ((lp->lp_txcredits < 0) ==
                          !cfs_list_empty(&lp->lp_txq));
@@ -1388,13 +1406,13 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
         rc = lnet_post_send_locked(msg, 0);
        lnet_net_unlock(cpt);
 
-        if (rc == EHOSTUNREACH)
-                return -EHOSTUNREACH;
+       if (rc == EHOSTUNREACH || rc == ECANCELED)
+               return -rc;
 
-        if (rc == 0)
+       if (rc == 0)
                 lnet_ni_send(src_ni, msg);
 
-        return 0;
+       return 0; /* rc == 0 or EAGAIN */
 }
 
 static void
@@ -2352,7 +2370,6 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
                lnet_res_unlock(cpt);
 
                lnet_msg_free(msg);
-
                 return -ENOENT;
         }
 
@@ -2378,11 +2395,11 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
 
        rc = lnet_send(self, msg, LNET_NID_ANY);
-        if (rc < 0) {
-                CNETERR( "Error sending GET to %s: %d\n",
-                       libcfs_id2str(target), rc);
-                lnet_finalize (NULL, msg, rc);
-        }
+       if (rc < 0) {
+               CNETERR("Error sending GET to %s: %d\n",
+                       libcfs_id2str(target), rc);
+               lnet_finalize(NULL, msg, rc);
+       }
 
         /* completion will be signalled by an event */
         return 0;