Whamcloud - gitweb
LU-7734 lnet: fix routing selection
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
index b1995f2..a6ffe8e 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #include <lnet/lib-lnet.h>
 
 void
-lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
+lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev)
 {
-        ENTRY;
+       ENTRY;
 
-        memset(ev, 0, sizeof(*ev));
+       memset(ev, 0, sizeof(*ev));
 
-        ev->status   = 0;
-        ev->unlinked = 1;
-        ev->type     = LNET_EVENT_UNLINK;
-        lnet_md_deconstruct(md, &ev->md);
-        lnet_md2handle(&ev->md_handle, md);
-        EXIT;
+       ev->status   = 0;
+       ev->unlinked = 1;
+       ev->type     = LNET_EVENT_UNLINK;
+       lnet_md_deconstruct(md, &ev->md);
+       lnet_md2handle(&ev->md_handle, md);
+       EXIT;
 }
 
 /*
@@ -72,18 +68,24 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
 
        if (ev_type == LNET_EVENT_SEND) {
                /* event for active message */
-               ev->target.nid    = le64_to_cpu(hdr->dest_nid);
-               ev->target.pid    = le32_to_cpu(hdr->dest_pid);
+               ev->target.nid    = le64_to_cpu(hdr->dest_nid);
+               ev->target.pid    = le32_to_cpu(hdr->dest_pid);
                ev->initiator.nid = LNET_NID_ANY;
                ev->initiator.pid = the_lnet.ln_pid;
+               ev->source.nid    = LNET_NID_ANY;
+               ev->source.pid    = the_lnet.ln_pid;
                ev->sender        = LNET_NID_ANY;
 
        } else {
                /* event for passive message */
-               ev->target.pid    = hdr->dest_pid;
-               ev->target.nid    = hdr->dest_nid;
+               ev->target.pid    = hdr->dest_pid;
+               ev->target.nid    = hdr->dest_nid;
                ev->initiator.pid = hdr->src_pid;
-               ev->initiator.nid = hdr->src_nid;
+               /* Multi-Rail: resolve src_nid to "primary" peer NID */
+               ev->initiator.nid = msg->msg_initiator;
+               /* Multi-Rail: track source NID. */
+               ev->source.pid    = hdr->src_pid;
+               ev->source.nid    = hdr->src_nid;
                ev->rlength       = hdr->payload_length;
                ev->sender        = msg->msg_from;
                ev->mlength       = msg->msg_wanted;
@@ -214,6 +216,10 @@ lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
        }
 
        counters->send_count++;
+       if (msg->msg_txpeer)
+               atomic_inc(&msg->msg_txpeer->lpni_stats.send_count);
+       if (msg->msg_txni)
+               atomic_inc(&msg->msg_txni->ni_stats.send_count);
  out:
        lnet_return_tx_credits_locked(msg);
        msg->msg_tx_committed = 0;
@@ -265,6 +271,10 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
        }
 
        counters->recv_count++;
+       if (msg->msg_rxpeer)
+               atomic_inc(&msg->msg_rxpeer->lpni_stats.recv_count);
+       if (msg->msg_rxni)
+               atomic_inc(&msg->msg_rxni->ni_stats.recv_count);
        if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
                counters->recv_length += msg->msg_wanted;
 
@@ -361,30 +371,30 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status)
 static int
 lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
 {
-        lnet_handle_wire_t ack_wmd;
-        int                rc;
-        int                status = msg->msg_ev.status;
+       struct lnet_handle_wire ack_wmd;
+       int                rc;
+       int                status = msg->msg_ev.status;
 
-        LASSERT (msg->msg_onactivelist);
+       LASSERT(msg->msg_onactivelist);
 
-        if (status == 0 && msg->msg_ack) {
-                /* Only send an ACK if the PUT completed successfully */
+       if (status == 0 && msg->msg_ack) {
+               /* Only send an ACK if the PUT completed successfully */
 
                lnet_msg_decommit(msg, cpt, 0);
 
                msg->msg_ack = 0;
                lnet_net_unlock(cpt);
 
-                LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
-                LASSERT(!msg->msg_routing);
+               LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
+               LASSERT(!msg->msg_routing);
 
-                ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
+               ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
 
-                lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
+               lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.source, 0, 0);
 
-                msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
-                msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
-                msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
+               msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
+               msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
+               msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
 
                /* NB: we probably want to use NID of msg::msg_from as 3rd
                 * parameter (router NID) if it's routed message */
@@ -430,12 +440,12 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
        }
 
        lnet_msg_decommit(msg, cpt, status);
-       lnet_msg_free_locked(msg);
+       lnet_msg_free(msg);
        return 0;
 }
 
 void
-lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
+lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int status)
 {
        struct lnet_msg_container       *container;
        int                             my_slot;
@@ -443,28 +453,12 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        int                             rc;
        int                             i;
 
-       LASSERT (!in_interrupt ());
+       LASSERT(!in_interrupt());
 
        if (msg == NULL)
                return;
-#if 0
-        CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
-               lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
-               msg->msg_target_is_router ? "t" : "",
-               msg->msg_routing ? "X" : "",
-               msg->msg_ack ? "A" : "",
-               msg->msg_sending ? "S" : "",
-               msg->msg_receiving ? "R" : "",
-               msg->msg_delayed ? "d" : "",
-               msg->msg_txcredit ? "C" : "",
-               msg->msg_peertxcredit ? "c" : "",
-               msg->msg_rtrcredit ? "F" : "",
-               msg->msg_peerrtrcredit ? "f" : "",
-               msg->msg_onactivelist ? "!" : "",
-               msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
-               msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
-#endif
-        msg->msg_ev.status = status;
+
+       msg->msg_ev.status = status;
 
        if (msg->msg_md != NULL) {
                cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
@@ -497,7 +491,6 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        /* Recursion breaker.  Don't complete the message here if I am (or
         * enough other threads are) already completing messages */
 
-#ifdef __KERNEL__
        my_slot = -1;
        for (i = 0; i < container->msc_nfinalizers; i++) {
                if (container->msc_finalizers[i] == current)
@@ -513,16 +506,6 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        }
 
        container->msc_finalizers[my_slot] = current;
-#else
-       LASSERT(container->msc_nfinalizers == 1);
-       if (container->msc_finalizers[0] != NULL) {
-               lnet_net_unlock(cpt);
-               return;
-       }
-
-       my_slot = i = 0;
-       container->msc_finalizers[0] = (struct lnet_msg_container *)1;
-#endif
 
        while (!list_empty(&container->msc_finalizing)) {
                msg = list_entry(container->msc_finalizing.next,
@@ -537,6 +520,12 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                        break;
        }
 
+       if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) {
+               lnet_net_unlock(cpt);
+               lnet_delay_rule_check();
+               lnet_net_lock(cpt);
+       }
+
        container->msc_finalizers[my_slot] = NULL;
        lnet_net_unlock(cpt);
 
@@ -548,7 +537,7 @@ EXPORT_SYMBOL(lnet_finalize);
 void
 lnet_msg_container_cleanup(struct lnet_msg_container *container)
 {
-       int     count = 0;
+       int     count = 0;
 
        if (container->msc_init == 0)
                return;
@@ -573,9 +562,6 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container)
                            sizeof(*container->msc_finalizers));
                container->msc_finalizers = NULL;
        }
-#ifdef LNET_USE_LIB_FREELIST
-       lnet_freelist_fini(&container->msc_freelist);
-#endif
        container->msc_init = 0;
 }
 
@@ -589,19 +575,7 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
        INIT_LIST_HEAD(&container->msc_active);
        INIT_LIST_HEAD(&container->msc_finalizing);
 
-#ifdef LNET_USE_LIB_FREELIST
-       memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
-
-       rc = lnet_freelist_init(&container->msc_freelist,
-                               LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
-       if (rc != 0) {
-               CERROR("Failed to init freelist for message container\n");
-               lnet_msg_container_cleanup(container);
-               return rc;
-       }
-#else
        rc = 0;
-#endif
        /* number of CPUs */
        container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
 
@@ -622,7 +596,7 @@ void
 lnet_msg_containers_destroy(void)
 {
        struct lnet_msg_container *container;
-       int     i;
+       int     i;
 
        if (the_lnet.ln_msg_containers == NULL)
                return;