Whamcloud - gitweb
LU-7734 lnet: fix routing selection
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
index fa12deb..a6ffe8e 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
 /*
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #include <lnet/lib-lnet.h>
 
 void
-lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
+lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev)
 {
-        ENTRY;
+       ENTRY;
 
-        memset(ev, 0, sizeof(*ev));
+       memset(ev, 0, sizeof(*ev));
 
-        ev->status   = 0;
-        ev->unlinked = 1;
-        ev->type     = LNET_EVENT_UNLINK;
-        lnet_md_deconstruct(md, &ev->md);
-        lnet_md2handle(&ev->md_handle, md);
-        EXIT;
+       ev->status   = 0;
+       ev->unlinked = 1;
+       ev->type     = LNET_EVENT_UNLINK;
+       lnet_md_deconstruct(md, &ev->md);
+       lnet_md2handle(&ev->md_handle, md);
+       EXIT;
 }
 
 /*
@@ -70,18 +68,24 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
 
        if (ev_type == LNET_EVENT_SEND) {
                /* event for active message */
-               ev->target.nid    = le64_to_cpu(hdr->dest_nid);
-               ev->target.pid    = le32_to_cpu(hdr->dest_pid);
+               ev->target.nid    = le64_to_cpu(hdr->dest_nid);
+               ev->target.pid    = le32_to_cpu(hdr->dest_pid);
                ev->initiator.nid = LNET_NID_ANY;
                ev->initiator.pid = the_lnet.ln_pid;
+               ev->source.nid    = LNET_NID_ANY;
+               ev->source.pid    = the_lnet.ln_pid;
                ev->sender        = LNET_NID_ANY;
 
        } else {
                /* event for passive message */
-               ev->target.pid    = hdr->dest_pid;
-               ev->target.nid    = hdr->dest_nid;
+               ev->target.pid    = hdr->dest_pid;
+               ev->target.nid    = hdr->dest_nid;
                ev->initiator.pid = hdr->src_pid;
-               ev->initiator.nid = hdr->src_nid;
+               /* Multi-Rail: resolve src_nid to "primary" peer NID */
+               ev->initiator.nid = msg->msg_initiator;
+               /* Multi-Rail: track source NID. */
+               ev->source.pid    = hdr->src_pid;
+               ev->source.nid    = hdr->src_nid;
                ev->rlength       = hdr->payload_length;
                ev->sender        = msg->msg_from;
                ev->mlength       = msg->msg_wanted;
@@ -160,7 +164,7 @@ lnet_msg_commit(lnet_msg_t *msg, int cpt)
 
        LASSERT(!msg->msg_onactivelist);
        msg->msg_onactivelist = 1;
-       cfs_list_add(&msg->msg_activelist, &container->msc_active);
+       list_add(&msg->msg_activelist, &container->msc_active);
 
        counters->msgs_alloc++;
        if (counters->msgs_alloc > counters->msgs_max)
@@ -212,6 +216,10 @@ lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
        }
 
        counters->send_count++;
+       if (msg->msg_txpeer)
+               atomic_inc(&msg->msg_txpeer->lpni_stats.send_count);
+       if (msg->msg_txni)
+               atomic_inc(&msg->msg_txni->ni_stats.send_count);
  out:
        lnet_return_tx_credits_locked(msg);
        msg->msg_tx_committed = 0;
@@ -263,6 +271,10 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
        }
 
        counters->recv_count++;
+       if (msg->msg_rxpeer)
+               atomic_inc(&msg->msg_rxpeer->lpni_stats.recv_count);
+       if (msg->msg_rxni)
+               atomic_inc(&msg->msg_rxni->ni_stats.recv_count);
        if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
                counters->recv_length += msg->msg_wanted;
 
@@ -294,7 +306,7 @@ lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status)
                lnet_msg_decommit_rx(msg, status);
        }
 
-       cfs_list_del(&msg->msg_activelist);
+       list_del(&msg->msg_activelist);
        msg->msg_onactivelist = 0;
 
        the_lnet.ln_counters[cpt2]->msgs_alloc--;
@@ -317,7 +329,7 @@ lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
        LASSERT(!msg->msg_routing);
 
        msg->msg_md = md;
-       if (msg->msg_receiving) { /* commited for receiving */
+       if (msg->msg_receiving) { /* committed for receiving */
                msg->msg_offset = offset;
                msg->msg_wanted = mlen;
        }
@@ -359,30 +371,30 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status)
 static int
 lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
 {
-        lnet_handle_wire_t ack_wmd;
-        int                rc;
-        int                status = msg->msg_ev.status;
+       struct lnet_handle_wire ack_wmd;
+       int                rc;
+       int                status = msg->msg_ev.status;
 
-        LASSERT (msg->msg_onactivelist);
+       LASSERT(msg->msg_onactivelist);
 
-        if (status == 0 && msg->msg_ack) {
-                /* Only send an ACK if the PUT completed successfully */
+       if (status == 0 && msg->msg_ack) {
+               /* Only send an ACK if the PUT completed successfully */
 
                lnet_msg_decommit(msg, cpt, 0);
 
                msg->msg_ack = 0;
                lnet_net_unlock(cpt);
 
-                LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
-                LASSERT(!msg->msg_routing);
+               LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
+               LASSERT(!msg->msg_routing);
 
-                ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
+               ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
 
-                lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
+               lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.source, 0, 0);
 
-                msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
-                msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
-                msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
+               msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
+               msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
+               msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
 
                /* NB: we probably want to use NID of msg::msg_from as 3rd
                 * parameter (router NID) if it's routed message */
@@ -393,7 +405,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
                 * NB: message is committed for sending, we should return
                 * on success because LND will finalize this message later.
                 *
-                * Also, there is possibility that message is commited for
+                * Also, there is possibility that message is committed for
                 * sending and also failed before delivering to LND,
                 * i.e: ENOMEM, in that case we can't fall through either
                 * because CPT for sending can be different with CPT for
@@ -415,7 +427,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
                 * NB: message is committed for sending, we should return
                 * on success because LND will finalize this message later.
                 *
-                * Also, there is possibility that message is commited for
+                * Also, there is possibility that message is committed for
                 * sending and also failed before delivering to LND,
                 * i.e: ENOMEM, in that case we can't fall through either:
                 * - The rule is message must decommit for sending first if
@@ -428,12 +440,12 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
        }
 
        lnet_msg_decommit(msg, cpt, status);
-       lnet_msg_free_locked(msg);
+       lnet_msg_free(msg);
        return 0;
 }
 
 void
-lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
+lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int status)
 {
        struct lnet_msg_container       *container;
        int                             my_slot;
@@ -441,28 +453,12 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        int                             rc;
        int                             i;
 
-        LASSERT (!cfs_in_interrupt ());
-
-        if (msg == NULL)
-                return;
-#if 0
-        CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
-               lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
-               msg->msg_target_is_router ? "t" : "",
-               msg->msg_routing ? "X" : "",
-               msg->msg_ack ? "A" : "",
-               msg->msg_sending ? "S" : "",
-               msg->msg_receiving ? "R" : "",
-               msg->msg_delayed ? "d" : "",
-               msg->msg_txcredit ? "C" : "",
-               msg->msg_peertxcredit ? "c" : "",
-               msg->msg_rtrcredit ? "F" : "",
-               msg->msg_peerrtrcredit ? "f" : "",
-               msg->msg_onactivelist ? "!" : "",
-               msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
-               msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
-#endif
-        msg->msg_ev.status = status;
+       LASSERT(!in_interrupt());
+
+       if (msg == NULL)
+               return;
+
+       msg->msg_ev.status = status;
 
        if (msg->msg_md != NULL) {
                cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
@@ -475,14 +471,14 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
  again:
        rc = 0;
        if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
-               /* not commited to network yet */
+               /* not committed to network yet */
                LASSERT(!msg->msg_onactivelist);
                lnet_msg_free(msg);
                return;
        }
 
        /*
-        * NB: routed message can be commited for both receiving and sending,
+        * NB: routed message can be committed for both receiving and sending,
         * we should finalize in LIFO order and keep counters correct.
         * (finalize sending first then finalize receiving)
         */
@@ -490,15 +486,14 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
        lnet_net_lock(cpt);
 
        container = the_lnet.ln_msg_containers[cpt];
-       cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
+       list_add_tail(&msg->msg_list, &container->msc_finalizing);
 
        /* Recursion breaker.  Don't complete the message here if I am (or
         * enough other threads are) already completing messages */
 
-#ifdef __KERNEL__
        my_slot = -1;
        for (i = 0; i < container->msc_nfinalizers; i++) {
-               if (container->msc_finalizers[i] == cfs_current())
+               if (container->msc_finalizers[i] == current)
                        break;
 
                if (my_slot < 0 && container->msc_finalizers[i] == NULL)
@@ -510,23 +505,13 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                return;
        }
 
-       container->msc_finalizers[my_slot] = cfs_current();
-#else
-       LASSERT(container->msc_nfinalizers == 1);
-       if (container->msc_finalizers[0] != NULL) {
-               lnet_net_unlock(cpt);
-               return;
-       }
-
-       my_slot = i = 0;
-       container->msc_finalizers[0] = (struct lnet_msg_container *)1;
-#endif
+       container->msc_finalizers[my_slot] = current;
 
-       while (!cfs_list_empty(&container->msc_finalizing)) {
-               msg = cfs_list_entry(container->msc_finalizing.next,
-                                    lnet_msg_t, msg_list);
+       while (!list_empty(&container->msc_finalizing)) {
+               msg = list_entry(container->msc_finalizing.next,
+                                lnet_msg_t, msg_list);
 
-               cfs_list_del(&msg->msg_list);
+               list_del(&msg->msg_list);
 
                /* NB drops and regains the lnet lock if it actually does
                 * anything, so my finalizing friends can chomp along too */
@@ -535,6 +520,12 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
                        break;
        }
 
+       if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) {
+               lnet_net_unlock(cpt);
+               lnet_delay_rule_check();
+               lnet_net_lock(cpt);
+       }
+
        container->msc_finalizers[my_slot] = NULL;
        lnet_net_unlock(cpt);
 
@@ -546,18 +537,18 @@ EXPORT_SYMBOL(lnet_finalize);
 void
 lnet_msg_container_cleanup(struct lnet_msg_container *container)
 {
-       int     count = 0;
+       int     count = 0;
 
        if (container->msc_init == 0)
                return;
 
-       while (!cfs_list_empty(&container->msc_active)) {
-               lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
-                                                lnet_msg_t, msg_activelist);
+       while (!list_empty(&container->msc_active)) {
+               lnet_msg_t *msg = list_entry(container->msc_active.next,
+                                            lnet_msg_t, msg_activelist);
 
                LASSERT(msg->msg_onactivelist);
                msg->msg_onactivelist = 0;
-               cfs_list_del(&msg->msg_activelist);
+               list_del(&msg->msg_activelist);
                lnet_msg_free(msg);
                count++;
        }
@@ -571,9 +562,6 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container)
                            sizeof(*container->msc_finalizers));
                container->msc_finalizers = NULL;
        }
-#ifdef LNET_USE_LIB_FREELIST
-       lnet_freelist_fini(&container->msc_freelist);
-#endif
        container->msc_init = 0;
 }
 
@@ -584,22 +572,10 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
 
        container->msc_init = 1;
 
-       CFS_INIT_LIST_HEAD(&container->msc_active);
-       CFS_INIT_LIST_HEAD(&container->msc_finalizing);
-
-#ifdef LNET_USE_LIB_FREELIST
-       memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
+       INIT_LIST_HEAD(&container->msc_active);
+       INIT_LIST_HEAD(&container->msc_finalizing);
 
-       rc = lnet_freelist_init(&container->msc_freelist,
-                               LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
-       if (rc != 0) {
-               CERROR("Failed to init freelist for message container\n");
-               lnet_msg_container_cleanup(container);
-               return rc;
-       }
-#else
        rc = 0;
-#endif
        /* number of CPUs */
        container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
 
@@ -620,7 +596,7 @@ void
 lnet_msg_containers_destroy(void)
 {
        struct lnet_msg_container *container;
-       int     i;
+       int     i;
 
        if (the_lnet.ln_msg_containers == NULL)
                return;