X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=a6ffe8e72c1f03a5000de7e926c34352a35f4227;hp=b1995f2da5e3a992afb91a3aa12576600ea55ee5;hb=76adbed805e71995d521d1a26e1e3d93f3dfd7b7;hpb=1cbd19abbed9dbfd3bbc485f0991cfaeb02b7dae diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index b1995f2..a6ffe8e 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2014, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -43,18 +39,18 @@ #include void -lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev) +lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev) { - ENTRY; + ENTRY; - memset(ev, 0, sizeof(*ev)); + memset(ev, 0, sizeof(*ev)); - ev->status = 0; - ev->unlinked = 1; - ev->type = LNET_EVENT_UNLINK; - lnet_md_deconstruct(md, &ev->md); - lnet_md2handle(&ev->md_handle, md); - EXIT; + ev->status = 0; + ev->unlinked = 1; + ev->type = LNET_EVENT_UNLINK; + lnet_md_deconstruct(md, &ev->md); + lnet_md2handle(&ev->md_handle, md); + EXIT; } /* @@ -72,18 +68,24 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) if (ev_type == LNET_EVENT_SEND) { /* event for active message */ - ev->target.nid = le64_to_cpu(hdr->dest_nid); - ev->target.pid = le32_to_cpu(hdr->dest_pid); + ev->target.nid = le64_to_cpu(hdr->dest_nid); + ev->target.pid = le32_to_cpu(hdr->dest_pid); ev->initiator.nid = LNET_NID_ANY; ev->initiator.pid = the_lnet.ln_pid; + ev->source.nid = LNET_NID_ANY; + ev->source.pid = the_lnet.ln_pid; ev->sender = LNET_NID_ANY; } else { /* event for passive message */ - ev->target.pid = hdr->dest_pid; - ev->target.nid = hdr->dest_nid; + ev->target.pid = hdr->dest_pid; + ev->target.nid = hdr->dest_nid; ev->initiator.pid = hdr->src_pid; - ev->initiator.nid = hdr->src_nid; + /* Multi-Rail: resolve src_nid to "primary" peer NID */ + ev->initiator.nid = msg->msg_initiator; + /* Multi-Rail: track source NID. */ + ev->source.pid = hdr->src_pid; + ev->source.nid = hdr->src_nid; ev->rlength = hdr->payload_length; ev->sender = msg->msg_from; ev->mlength = msg->msg_wanted; @@ -214,6 +216,10 @@ lnet_msg_decommit_tx(lnet_msg_t *msg, int status) } counters->send_count++; + if (msg->msg_txpeer) + atomic_inc(&msg->msg_txpeer->lpni_stats.send_count); + if (msg->msg_txni) + atomic_inc(&msg->msg_txni->ni_stats.send_count); out: lnet_return_tx_credits_locked(msg); msg->msg_tx_committed = 0; @@ -265,6 +271,10 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status) } counters->recv_count++; + if (msg->msg_rxpeer) + atomic_inc(&msg->msg_rxpeer->lpni_stats.recv_count); + if (msg->msg_rxni) + atomic_inc(&msg->msg_rxni->ni_stats.recv_count); if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY) counters->recv_length += msg->msg_wanted; @@ -361,30 +371,30 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status) static int lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) { - lnet_handle_wire_t ack_wmd; - int rc; - int status = msg->msg_ev.status; + struct lnet_handle_wire ack_wmd; + int rc; + int status = msg->msg_ev.status; - LASSERT (msg->msg_onactivelist); + LASSERT(msg->msg_onactivelist); - if (status == 0 && msg->msg_ack) { - /* Only send an ACK if the PUT completed successfully */ + if (status == 0 && msg->msg_ack) { + /* Only send an ACK if the PUT completed successfully */ lnet_msg_decommit(msg, cpt, 0); msg->msg_ack = 0; lnet_net_unlock(cpt); - LASSERT(msg->msg_ev.type == LNET_EVENT_PUT); - LASSERT(!msg->msg_routing); + LASSERT(msg->msg_ev.type == LNET_EVENT_PUT); + LASSERT(!msg->msg_routing); - ack_wmd = msg->msg_hdr.msg.put.ack_wmd; + ack_wmd = msg->msg_hdr.msg.put.ack_wmd; - lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0); + lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.source, 0, 0); - msg->msg_hdr.msg.ack.dst_wmd = ack_wmd; - msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits; - msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength); + msg->msg_hdr.msg.ack.dst_wmd = ack_wmd; + msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits; + msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength); /* NB: we probably want to use NID of msg::msg_from as 3rd * parameter (router NID) if it's routed message */ @@ -430,12 +440,12 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) } lnet_msg_decommit(msg, cpt, status); - lnet_msg_free_locked(msg); + lnet_msg_free(msg); return 0; } void -lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) +lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int status) { struct lnet_msg_container *container; int my_slot; @@ -443,28 +453,12 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) int rc; int i; - LASSERT (!in_interrupt ()); + LASSERT(!in_interrupt()); if (msg == NULL) return; -#if 0 - CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n", - lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target), - msg->msg_target_is_router ? "t" : "", - msg->msg_routing ? "X" : "", - msg->msg_ack ? "A" : "", - msg->msg_sending ? "S" : "", - msg->msg_receiving ? "R" : "", - msg->msg_delayed ? "d" : "", - msg->msg_txcredit ? "C" : "", - msg->msg_peertxcredit ? "c" : "", - msg->msg_rtrcredit ? "F" : "", - msg->msg_peerrtrcredit ? "f" : "", - msg->msg_onactivelist ? "!" : "", - msg->msg_txpeer == NULL ? "" : libcfs_nid2str(msg->msg_txpeer->lp_nid), - msg->msg_rxpeer == NULL ? "" : libcfs_nid2str(msg->msg_rxpeer->lp_nid)); -#endif - msg->msg_ev.status = status; + + msg->msg_ev.status = status; if (msg->msg_md != NULL) { cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); @@ -497,7 +491,6 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) /* Recursion breaker. Don't complete the message here if I am (or * enough other threads are) already completing messages */ -#ifdef __KERNEL__ my_slot = -1; for (i = 0; i < container->msc_nfinalizers; i++) { if (container->msc_finalizers[i] == current) @@ -513,16 +506,6 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) } container->msc_finalizers[my_slot] = current; -#else - LASSERT(container->msc_nfinalizers == 1); - if (container->msc_finalizers[0] != NULL) { - lnet_net_unlock(cpt); - return; - } - - my_slot = i = 0; - container->msc_finalizers[0] = (struct lnet_msg_container *)1; -#endif while (!list_empty(&container->msc_finalizing)) { msg = list_entry(container->msc_finalizing.next, @@ -537,6 +520,12 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) break; } + if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) { + lnet_net_unlock(cpt); + lnet_delay_rule_check(); + lnet_net_lock(cpt); + } + container->msc_finalizers[my_slot] = NULL; lnet_net_unlock(cpt); @@ -548,7 +537,7 @@ EXPORT_SYMBOL(lnet_finalize); void lnet_msg_container_cleanup(struct lnet_msg_container *container) { - int count = 0; + int count = 0; if (container->msc_init == 0) return; @@ -573,9 +562,6 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container) sizeof(*container->msc_finalizers)); container->msc_finalizers = NULL; } -#ifdef LNET_USE_LIB_FREELIST - lnet_freelist_fini(&container->msc_freelist); -#endif container->msc_init = 0; } @@ -589,19 +575,7 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) INIT_LIST_HEAD(&container->msc_active); INIT_LIST_HEAD(&container->msc_finalizing); -#ifdef LNET_USE_LIB_FREELIST - memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t)); - - rc = lnet_freelist_init(&container->msc_freelist, - LNET_FL_MAX_MSGS, sizeof(lnet_msg_t)); - if (rc != 0) { - CERROR("Failed to init freelist for message container\n"); - lnet_msg_container_cleanup(container); - return rc; - } -#else rc = 0; -#endif /* number of CPUs */ container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt); @@ -622,7 +596,7 @@ void lnet_msg_containers_destroy(void) { struct lnet_msg_container *container; - int i; + int i; if (the_lnet.ln_msg_containers == NULL) return;