Whamcloud - gitweb
LU-2673 procfs: call lprocfs_free_xxx_stats() later
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
index 6c705f0..a252718 100644 (file)
@@ -26,6 +26,8 @@
 /*
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -74,7 +76,7 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
                ev->target.pid    = le32_to_cpu(hdr->dest_pid);
                ev->initiator.nid = LNET_NID_ANY;
                ev->initiator.pid = the_lnet.ln_pid;
-               ev->sender        = LNET_NID_ANY;
+               ev->sender        = LNET_NID_ANY;
 
        } else {
                /* event for passive message */
@@ -83,7 +85,7 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
                ev->initiator.pid = hdr->src_pid;
                ev->initiator.nid = hdr->src_nid;
                ev->rlength       = hdr->payload_length;
-               ev->sender        = msg->msg_from;
+               ev->sender        = msg->msg_from;
                ev->mlength       = msg->msg_wanted;
                ev->offset        = msg->msg_offset;
        }
@@ -204,11 +206,10 @@ lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
 
        case LNET_EVENT_GET:
                LASSERT(msg->msg_rx_committed);
-               /* overwritten while sending reply */
+               /* overwritten while sending reply, we should never be
+                * here for optimized GET */
                LASSERT(msg->msg_type == LNET_MSG_REPLY);
-
                msg->msg_type = LNET_MSG_GET; /* fix type */
-               counters->send_length += msg->msg_len;
                break;
        }
 
@@ -230,6 +231,7 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
        if (status != 0)
                goto out;
 
+       counters = the_lnet.ln_counters[msg->msg_rx_cpt];
        switch (ev->type) {
        default:
                LASSERT(ev->type == 0);
@@ -241,7 +243,13 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
                break;
 
        case LNET_EVENT_GET:
-               LASSERT(msg->msg_type == LNET_MSG_GET);
+               /* type is "REPLY" if it's an optimized GET on passive side,
+                * because optimized GET will never be committed for sending,
+                * so message type wouldn't be changed back to "GET" by
+                * lnet_msg_decommit_tx(), see details in lnet_parse_get() */
+               LASSERT(msg->msg_type == LNET_MSG_REPLY ||
+                       msg->msg_type == LNET_MSG_GET);
+               counters->send_length += msg->msg_wanted;
                break;
 
        case LNET_EVENT_PUT:
@@ -249,12 +257,13 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
                break;
 
        case LNET_EVENT_REPLY:
-               LASSERT(msg->msg_type == LNET_MSG_REPLY ||
-                       msg->msg_type == LNET_MSG_GET); /* optimized GET */
+               /* type is "GET" if it's an optimized GET on active side,
+                * see details in lnet_create_reply_msg() */
+               LASSERT(msg->msg_type == LNET_MSG_GET ||
+                       msg->msg_type == LNET_MSG_REPLY);
                break;
        }
 
-       counters = the_lnet.ln_counters[msg->msg_rx_cpt];
        counters->recv_count++;
        if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
                counters->recv_length += msg->msg_wanted;
@@ -349,7 +358,7 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status)
        msg->msg_md = NULL;
 }
 
-void
+static int
 lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
 {
         lnet_handle_wire_t ack_wmd;
@@ -382,9 +391,19 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
                rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY);
 
                lnet_net_lock(cpt);
+               /*
+                * NB: message is committed for sending, we should return
+                * on success because LND will finalize this message later.
+                *
+                * Also, there is possibility that message is commited for
+                * sending and also failed before delivering to LND,
+                * i.e: ENOMEM, in that case we can't fall through either
+                * because CPT for sending can be different with CPT for
+                * receiving, so we should return back to lnet_finalize()
+                * to make sure we are locking the correct partition.
+                */
+               return rc;
 
-               if (rc == 0)
-                       return;
        } else if (status == 0 &&       /* OK so far */
                   (msg->msg_routing && !msg->msg_sending)) {
                /* not forwarded */
@@ -394,13 +413,25 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
                rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
 
                lnet_net_lock(cpt);
-
-               if (rc == 0)
-                       return;
+               /*
+                * NB: message is committed for sending, we should return
+                * on success because LND will finalize this message later.
+                *
+                * Also, there is possibility that message is commited for
+                * sending and also failed before delivering to LND,
+                * i.e: ENOMEM, in that case we can't fall through either:
+                * - The rule is message must decommit for sending first if
+                *   the it's committed for both sending and receiving
+                * - CPT for sending can be different with CPT for receiving,
+                *   so we should return back to lnet_finalize() to make
+                *   sure we are locking the correct partition.
+                */
+               return rc;
        }
 
        lnet_msg_decommit(msg, cpt, status);
        lnet_msg_free_locked(msg);
+       return 0;
 }
 
 void
@@ -409,6 +440,7 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        struct lnet_msg_container       *container;
        int                             my_slot;
        int                             cpt;
+       int                             rc;
        int                             i;
 
         LASSERT (!cfs_in_interrupt ());
@@ -432,9 +464,6 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
                msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
 #endif
-
-        LASSERT (msg->msg_onactivelist);
-
         msg->msg_ev.status = status;
 
        if (msg->msg_md != NULL) {
@@ -445,6 +474,8 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                lnet_res_unlock(cpt);
        }
 
+ again:
+       rc = 0;
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
                /* not commited to network yet */
                LASSERT(!msg->msg_onactivelist);
@@ -470,20 +501,24 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        my_slot = -1;
        for (i = 0; i < container->msc_nfinalizers; i++) {
                if (container->msc_finalizers[i] == cfs_current())
-                       goto out;
+                       break;
 
                if (my_slot < 0 && container->msc_finalizers[i] == NULL)
                        my_slot = i;
        }
 
-       if (my_slot < 0)
-               goto out;
+       if (i < container->msc_nfinalizers || my_slot < 0) {
+               lnet_net_unlock(cpt);
+               return;
+       }
 
        container->msc_finalizers[my_slot] = cfs_current();
 #else
        LASSERT(container->msc_nfinalizers == 1);
-       if (container->msc_finalizers[0] != NULL)
-               goto out;
+       if (container->msc_finalizers[0] != NULL) {
+               lnet_net_unlock(cpt);
+               return;
+       }
 
        my_slot = i = 0;
        container->msc_finalizers[0] = (struct lnet_msg_container *)1;
@@ -497,13 +532,18 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
 
                /* NB drops and regains the lnet lock if it actually does
                 * anything, so my finalizing friends can chomp along too */
-               lnet_complete_msg_locked(msg, cpt);
+               rc = lnet_complete_msg_locked(msg, cpt);
+               if (rc != 0)
+                       break;
        }
 
        container->msc_finalizers[my_slot] = NULL;
- out:
        lnet_net_unlock(cpt);
+
+       if (rc != 0)
+               goto again;
 }
+EXPORT_SYMBOL(lnet_finalize);
 
 void
 lnet_msg_container_cleanup(struct lnet_msg_container *container)