Whamcloud - gitweb
LU-1617 build: skip generated files in .gitignore
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
index f2ea470..fa12deb 100644 (file)
@@ -356,7 +356,7 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status)
        msg->msg_md = NULL;
 }
 
-void
+static int
 lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
 {
         lnet_handle_wire_t ack_wmd;
@@ -389,9 +389,19 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
                rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY);
 
                lnet_net_lock(cpt);
+               /*
+                * NB: message is committed for sending, we should return
+                * on success because LND will finalize this message later.
+                *
+                * Also, there is possibility that message is commited for
+                * sending and also failed before delivering to LND,
+                * i.e: ENOMEM, in that case we can't fall through either
+                * because CPT for sending can be different with CPT for
+                * receiving, so we should return back to lnet_finalize()
+                * to make sure we are locking the correct partition.
+                */
+               return rc;
 
-               if (rc == 0)
-                       return;
        } else if (status == 0 &&       /* OK so far */
                   (msg->msg_routing && !msg->msg_sending)) {
                /* not forwarded */
@@ -401,13 +411,25 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
                rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
 
                lnet_net_lock(cpt);
-
-               if (rc == 0)
-                       return;
+               /*
+                * NB: message is committed for sending, we should return
+                * on success because LND will finalize this message later.
+                *
+                * Also, there is possibility that message is commited for
+                * sending and also failed before delivering to LND,
+                * i.e: ENOMEM, in that case we can't fall through either:
+                * - The rule is message must decommit for sending first if
+                *   the it's committed for both sending and receiving
+                * - CPT for sending can be different with CPT for receiving,
+                *   so we should return back to lnet_finalize() to make
+                *   sure we are locking the correct partition.
+                */
+               return rc;
        }
 
        lnet_msg_decommit(msg, cpt, status);
        lnet_msg_free_locked(msg);
+       return 0;
 }
 
 void
@@ -416,6 +438,7 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        struct lnet_msg_container       *container;
        int                             my_slot;
        int                             cpt;
+       int                             rc;
        int                             i;
 
         LASSERT (!cfs_in_interrupt ());
@@ -449,6 +472,8 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                lnet_res_unlock(cpt);
        }
 
+ again:
+       rc = 0;
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
                /* not commited to network yet */
                LASSERT(!msg->msg_onactivelist);
@@ -474,20 +499,24 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        my_slot = -1;
        for (i = 0; i < container->msc_nfinalizers; i++) {
                if (container->msc_finalizers[i] == cfs_current())
-                       goto out;
+                       break;
 
                if (my_slot < 0 && container->msc_finalizers[i] == NULL)
                        my_slot = i;
        }
 
-       if (my_slot < 0)
-               goto out;
+       if (i < container->msc_nfinalizers || my_slot < 0) {
+               lnet_net_unlock(cpt);
+               return;
+       }
 
        container->msc_finalizers[my_slot] = cfs_current();
 #else
        LASSERT(container->msc_nfinalizers == 1);
-       if (container->msc_finalizers[0] != NULL)
-               goto out;
+       if (container->msc_finalizers[0] != NULL) {
+               lnet_net_unlock(cpt);
+               return;
+       }
 
        my_slot = i = 0;
        container->msc_finalizers[0] = (struct lnet_msg_container *)1;
@@ -501,13 +530,18 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
 
                /* NB drops and regains the lnet lock if it actually does
                 * anything, so my finalizing friends can chomp along too */
-               lnet_complete_msg_locked(msg, cpt);
+               rc = lnet_complete_msg_locked(msg, cpt);
+               if (rc != 0)
+                       break;
        }
 
        container->msc_finalizers[my_slot] = NULL;
- out:
        lnet_net_unlock(cpt);
+
+       if (rc != 0)
+               goto again;
 }
+EXPORT_SYMBOL(lnet_finalize);
 
 void
 lnet_msg_container_cleanup(struct lnet_msg_container *container)