*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
/*
* Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <lnet/lib-lnet.h>
void
-lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
+lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev)
{
- ENTRY;
+ ENTRY;
- memset(ev, 0, sizeof(*ev));
+ memset(ev, 0, sizeof(*ev));
- ev->status = 0;
- ev->unlinked = 1;
- ev->type = LNET_EVENT_UNLINK;
- lnet_md_deconstruct(md, &ev->md);
- lnet_md2handle(&ev->md_handle, md);
- EXIT;
+ ev->status = 0;
+ ev->unlinked = 1;
+ ev->type = LNET_EVENT_UNLINK;
+ lnet_md_deconstruct(md, &ev->md);
+ lnet_md2handle(&ev->md_handle, md);
+ EXIT;
}
/*
if (ev_type == LNET_EVENT_SEND) {
/* event for active message */
- ev->target.nid = le64_to_cpu(hdr->dest_nid);
- ev->target.pid = le32_to_cpu(hdr->dest_pid);
+ ev->target.nid = le64_to_cpu(hdr->dest_nid);
+ ev->target.pid = le32_to_cpu(hdr->dest_pid);
ev->initiator.nid = LNET_NID_ANY;
ev->initiator.pid = the_lnet.ln_pid;
+ ev->source.nid = LNET_NID_ANY;
+ ev->source.pid = the_lnet.ln_pid;
ev->sender = LNET_NID_ANY;
} else {
/* event for passive message */
- ev->target.pid = hdr->dest_pid;
- ev->target.nid = hdr->dest_nid;
+ ev->target.pid = hdr->dest_pid;
+ ev->target.nid = hdr->dest_nid;
ev->initiator.pid = hdr->src_pid;
- ev->initiator.nid = hdr->src_nid;
+ /* Multi-Rail: resolve src_nid to "primary" peer NID */
+ ev->initiator.nid = msg->msg_initiator;
+ /* Multi-Rail: track source NID. */
+ ev->source.pid = hdr->src_pid;
+ ev->source.nid = hdr->src_nid;
ev->rlength = hdr->payload_length;
ev->sender = msg->msg_from;
ev->mlength = msg->msg_wanted;
LASSERT(!msg->msg_onactivelist);
msg->msg_onactivelist = 1;
- cfs_list_add(&msg->msg_activelist, &container->msc_active);
+ list_add(&msg->msg_activelist, &container->msc_active);
counters->msgs_alloc++;
if (counters->msgs_alloc > counters->msgs_max)
}
counters->send_count++;
+ if (msg->msg_txpeer)
+ atomic_inc(&msg->msg_txpeer->lpni_stats.send_count);
+ if (msg->msg_txni)
+ atomic_inc(&msg->msg_txni->ni_stats.send_count);
out:
lnet_return_tx_credits_locked(msg);
msg->msg_tx_committed = 0;
}
counters->recv_count++;
+ if (msg->msg_rxpeer)
+ atomic_inc(&msg->msg_rxpeer->lpni_stats.recv_count);
+ if (msg->msg_rxni)
+ atomic_inc(&msg->msg_rxni->ni_stats.recv_count);
if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
counters->recv_length += msg->msg_wanted;
lnet_msg_decommit_rx(msg, status);
}
- cfs_list_del(&msg->msg_activelist);
+ list_del(&msg->msg_activelist);
msg->msg_onactivelist = 0;
the_lnet.ln_counters[cpt2]->msgs_alloc--;
LASSERT(!msg->msg_routing);
msg->msg_md = md;
- if (msg->msg_receiving) { /* commited for receiving */
+ if (msg->msg_receiving) { /* committed for receiving */
msg->msg_offset = offset;
msg->msg_wanted = mlen;
}
static int
lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
{
- lnet_handle_wire_t ack_wmd;
- int rc;
- int status = msg->msg_ev.status;
+ struct lnet_handle_wire ack_wmd;
+ int rc;
+ int status = msg->msg_ev.status;
- LASSERT (msg->msg_onactivelist);
+ LASSERT(msg->msg_onactivelist);
- if (status == 0 && msg->msg_ack) {
- /* Only send an ACK if the PUT completed successfully */
+ if (status == 0 && msg->msg_ack) {
+ /* Only send an ACK if the PUT completed successfully */
lnet_msg_decommit(msg, cpt, 0);
msg->msg_ack = 0;
lnet_net_unlock(cpt);
- LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
- LASSERT(!msg->msg_routing);
+ LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
+ LASSERT(!msg->msg_routing);
- ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
+ ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
- lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
+ lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.source, 0, 0);
- msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
- msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
- msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
+ msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
+ msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
+ msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
/* NB: we probably want to use NID of msg::msg_from as 3rd
* parameter (router NID) if it's routed message */
* NB: message is committed for sending, we should return
* on success because LND will finalize this message later.
*
- * Also, there is possibility that message is commited for
+ * Also, there is possibility that message is committed for
* sending and also failed before delivering to LND,
* i.e: ENOMEM, in that case we can't fall through either
* because CPT for sending can be different with CPT for
* NB: message is committed for sending, we should return
* on success because LND will finalize this message later.
*
- * Also, there is possibility that message is commited for
+ * Also, there is possibility that message is committed for
* sending and also failed before delivering to LND,
* i.e: ENOMEM, in that case we can't fall through either:
* - The rule is message must decommit for sending first if
}
lnet_msg_decommit(msg, cpt, status);
- lnet_msg_free_locked(msg);
+ lnet_msg_free(msg);
return 0;
}
void
-lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
+lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int status)
{
struct lnet_msg_container *container;
int my_slot;
int rc;
int i;
- LASSERT (!cfs_in_interrupt ());
-
- if (msg == NULL)
- return;
-#if 0
- CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
- lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
- msg->msg_target_is_router ? "t" : "",
- msg->msg_routing ? "X" : "",
- msg->msg_ack ? "A" : "",
- msg->msg_sending ? "S" : "",
- msg->msg_receiving ? "R" : "",
- msg->msg_delayed ? "d" : "",
- msg->msg_txcredit ? "C" : "",
- msg->msg_peertxcredit ? "c" : "",
- msg->msg_rtrcredit ? "F" : "",
- msg->msg_peerrtrcredit ? "f" : "",
- msg->msg_onactivelist ? "!" : "",
- msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
- msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
-#endif
- msg->msg_ev.status = status;
+ LASSERT(!in_interrupt());
+
+ if (msg == NULL)
+ return;
+
+ msg->msg_ev.status = status;
if (msg->msg_md != NULL) {
cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
again:
rc = 0;
if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
- /* not commited to network yet */
+ /* not committed to network yet */
LASSERT(!msg->msg_onactivelist);
lnet_msg_free(msg);
return;
}
/*
- * NB: routed message can be commited for both receiving and sending,
+ * NB: routed message can be committed for both receiving and sending,
* we should finalize in LIFO order and keep counters correct.
* (finalize sending first then finalize receiving)
*/
lnet_net_lock(cpt);
container = the_lnet.ln_msg_containers[cpt];
- cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
+ list_add_tail(&msg->msg_list, &container->msc_finalizing);
/* Recursion breaker. Don't complete the message here if I am (or
* enough other threads are) already completing messages */
-#ifdef __KERNEL__
my_slot = -1;
for (i = 0; i < container->msc_nfinalizers; i++) {
- if (container->msc_finalizers[i] == cfs_current())
+ if (container->msc_finalizers[i] == current)
break;
if (my_slot < 0 && container->msc_finalizers[i] == NULL)
return;
}
- container->msc_finalizers[my_slot] = cfs_current();
-#else
- LASSERT(container->msc_nfinalizers == 1);
- if (container->msc_finalizers[0] != NULL) {
- lnet_net_unlock(cpt);
- return;
- }
-
- my_slot = i = 0;
- container->msc_finalizers[0] = (struct lnet_msg_container *)1;
-#endif
+ container->msc_finalizers[my_slot] = current;
- while (!cfs_list_empty(&container->msc_finalizing)) {
- msg = cfs_list_entry(container->msc_finalizing.next,
- lnet_msg_t, msg_list);
+ while (!list_empty(&container->msc_finalizing)) {
+ msg = list_entry(container->msc_finalizing.next,
+ lnet_msg_t, msg_list);
- cfs_list_del(&msg->msg_list);
+ list_del(&msg->msg_list);
/* NB drops and regains the lnet lock if it actually does
* anything, so my finalizing friends can chomp along too */
break;
}
+ if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) {
+ lnet_net_unlock(cpt);
+ lnet_delay_rule_check();
+ lnet_net_lock(cpt);
+ }
+
container->msc_finalizers[my_slot] = NULL;
lnet_net_unlock(cpt);
void
lnet_msg_container_cleanup(struct lnet_msg_container *container)
{
- int count = 0;
+ int count = 0;
if (container->msc_init == 0)
return;
- while (!cfs_list_empty(&container->msc_active)) {
- lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
- lnet_msg_t, msg_activelist);
+ while (!list_empty(&container->msc_active)) {
+ lnet_msg_t *msg = list_entry(container->msc_active.next,
+ lnet_msg_t, msg_activelist);
LASSERT(msg->msg_onactivelist);
msg->msg_onactivelist = 0;
- cfs_list_del(&msg->msg_activelist);
+ list_del(&msg->msg_activelist);
lnet_msg_free(msg);
count++;
}
sizeof(*container->msc_finalizers));
container->msc_finalizers = NULL;
}
-#ifdef LNET_USE_LIB_FREELIST
- lnet_freelist_fini(&container->msc_freelist);
-#endif
container->msc_init = 0;
}
container->msc_init = 1;
- CFS_INIT_LIST_HEAD(&container->msc_active);
- CFS_INIT_LIST_HEAD(&container->msc_finalizing);
-
-#ifdef LNET_USE_LIB_FREELIST
- memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
+ INIT_LIST_HEAD(&container->msc_active);
+ INIT_LIST_HEAD(&container->msc_finalizing);
- rc = lnet_freelist_init(&container->msc_freelist,
- LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
- if (rc != 0) {
- CERROR("Failed to init freelist for message container\n");
- lnet_msg_container_cleanup(container);
- return rc;
- }
-#else
rc = 0;
-#endif
/* number of CPUs */
container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
lnet_msg_containers_destroy(void)
{
struct lnet_msg_container *container;
- int i;
+ int i;
if (the_lnet.ln_msg_containers == NULL)
return;