/*
* Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2012, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <lnet/lib-lnet.h>
void
-lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
+lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev)
{
ENTRY;
LASSERT(!msg->msg_onactivelist);
msg->msg_onactivelist = 1;
- cfs_list_add(&msg->msg_activelist, &container->msc_active);
+ list_add(&msg->msg_activelist, &container->msc_active);
counters->msgs_alloc++;
if (counters->msgs_alloc > counters->msgs_max)
lnet_msg_decommit_rx(msg, status);
}
- cfs_list_del(&msg->msg_activelist);
+ list_del(&msg->msg_activelist);
msg->msg_onactivelist = 0;
the_lnet.ln_counters[cpt2]->msgs_alloc--;
LASSERT(!msg->msg_routing);
msg->msg_md = md;
- if (msg->msg_receiving) { /* commited for receiving */
+ if (msg->msg_receiving) { /* committed for receiving */
msg->msg_offset = offset;
msg->msg_wanted = mlen;
}
msg->msg_md = NULL;
}
-void
+static int
lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
{
lnet_handle_wire_t ack_wmd;
int rc;
int status = msg->msg_ev.status;
- LASSERT (msg->msg_onactivelist);
+ LASSERT(msg->msg_onactivelist);
if (status == 0 && msg->msg_ack) {
/* Only send an ACK if the PUT completed successfully */
rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY);
lnet_net_lock(cpt);
+ /*
+ * NB: message is committed for sending, we should return
+ * on success because LND will finalize this message later.
+ *
+ * Also, there is possibility that message is committed for
+ * sending and also failed before delivering to LND,
+ * i.e: ENOMEM, in that case we can't fall through either
+ * because CPT for sending can be different with CPT for
+ * receiving, so we should return back to lnet_finalize()
+ * to make sure we are locking the correct partition.
+ */
+ return rc;
- if (rc == 0)
- return;
} else if (status == 0 && /* OK so far */
(msg->msg_routing && !msg->msg_sending)) {
/* not forwarded */
rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
lnet_net_lock(cpt);
-
- if (rc == 0)
- return;
+ /*
+ * NB: message is committed for sending, we should return
+ * on success because LND will finalize this message later.
+ *
+ * Also, there is possibility that message is committed for
+ * sending and also failed before delivering to LND,
+ * i.e: ENOMEM, in that case we can't fall through either:
+ * - The rule is message must decommit for sending first if
+ * the it's committed for both sending and receiving
+ * - CPT for sending can be different with CPT for receiving,
+ * so we should return back to lnet_finalize() to make
+ * sure we are locking the correct partition.
+ */
+ return rc;
}
lnet_msg_decommit(msg, cpt, status);
- lnet_msg_free_locked(msg);
+ lnet_msg_free(msg);
+ return 0;
}
void
-lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
+lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int status)
{
struct lnet_msg_container *container;
int my_slot;
int cpt;
+ int rc;
int i;
- LASSERT (!cfs_in_interrupt ());
+ LASSERT(!in_interrupt());
- if (msg == NULL)
- return;
+ if (msg == NULL)
+ return;
#if 0
CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
lnet_res_unlock(cpt);
}
+ again:
+ rc = 0;
if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
- /* not commited to network yet */
+ /* not committed to network yet */
LASSERT(!msg->msg_onactivelist);
lnet_msg_free(msg);
return;
}
/*
- * NB: routed message can be commited for both receiving and sending,
+ * NB: routed message can be committed for both receiving and sending,
* we should finalize in LIFO order and keep counters correct.
* (finalize sending first then finalize receiving)
*/
lnet_net_lock(cpt);
container = the_lnet.ln_msg_containers[cpt];
- cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
+ list_add_tail(&msg->msg_list, &container->msc_finalizing);
/* Recursion breaker. Don't complete the message here if I am (or
* enough other threads are) already completing messages */
-#ifdef __KERNEL__
my_slot = -1;
for (i = 0; i < container->msc_nfinalizers; i++) {
- if (container->msc_finalizers[i] == cfs_current())
- goto out;
+ if (container->msc_finalizers[i] == current)
+ break;
if (my_slot < 0 && container->msc_finalizers[i] == NULL)
my_slot = i;
}
- if (my_slot < 0)
- goto out;
-
- container->msc_finalizers[my_slot] = cfs_current();
-#else
- LASSERT(container->msc_nfinalizers == 1);
- if (container->msc_finalizers[0] != NULL)
- goto out;
+ if (i < container->msc_nfinalizers || my_slot < 0) {
+ lnet_net_unlock(cpt);
+ return;
+ }
- my_slot = i = 0;
- container->msc_finalizers[0] = (struct lnet_msg_container *)1;
-#endif
+ container->msc_finalizers[my_slot] = current;
- while (!cfs_list_empty(&container->msc_finalizing)) {
- msg = cfs_list_entry(container->msc_finalizing.next,
- lnet_msg_t, msg_list);
+ while (!list_empty(&container->msc_finalizing)) {
+ msg = list_entry(container->msc_finalizing.next,
+ lnet_msg_t, msg_list);
- cfs_list_del(&msg->msg_list);
+ list_del(&msg->msg_list);
/* NB drops and regains the lnet lock if it actually does
* anything, so my finalizing friends can chomp along too */
- lnet_complete_msg_locked(msg, cpt);
+ rc = lnet_complete_msg_locked(msg, cpt);
+ if (rc != 0)
+ break;
+ }
+
+ if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) {
+ lnet_net_unlock(cpt);
+ lnet_delay_rule_check();
+ lnet_net_lock(cpt);
}
container->msc_finalizers[my_slot] = NULL;
- out:
lnet_net_unlock(cpt);
+
+ if (rc != 0)
+ goto again;
}
+EXPORT_SYMBOL(lnet_finalize);
void
lnet_msg_container_cleanup(struct lnet_msg_container *container)
if (container->msc_init == 0)
return;
- while (!cfs_list_empty(&container->msc_active)) {
- lnet_msg_t *msg = cfs_list_entry(container->msc_active.next,
- lnet_msg_t, msg_activelist);
+ while (!list_empty(&container->msc_active)) {
+ lnet_msg_t *msg = list_entry(container->msc_active.next,
+ lnet_msg_t, msg_activelist);
LASSERT(msg->msg_onactivelist);
msg->msg_onactivelist = 0;
- cfs_list_del(&msg->msg_activelist);
+ list_del(&msg->msg_activelist);
lnet_msg_free(msg);
count++;
}
sizeof(*container->msc_finalizers));
container->msc_finalizers = NULL;
}
-#ifdef LNET_USE_LIB_FREELIST
- lnet_freelist_fini(&container->msc_freelist);
-#endif
container->msc_init = 0;
}
container->msc_init = 1;
- CFS_INIT_LIST_HEAD(&container->msc_active);
- CFS_INIT_LIST_HEAD(&container->msc_finalizing);
+ INIT_LIST_HEAD(&container->msc_active);
+ INIT_LIST_HEAD(&container->msc_finalizing);
-#ifdef LNET_USE_LIB_FREELIST
- memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
-
- rc = lnet_freelist_init(&container->msc_freelist,
- LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
- if (rc != 0) {
- CERROR("Failed to init freelist for message container\n");
- lnet_msg_container_cleanup(container);
- return rc;
- }
-#else
rc = 0;
-#endif
/* number of CPUs */
container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);