/*
* Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2012, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
case LNET_EVENT_GET:
LASSERT(msg->msg_rx_committed);
- /* overwritten while sending reply */
+ /* overwritten while sending reply, we should never be
+ * here for optimized GET */
LASSERT(msg->msg_type == LNET_MSG_REPLY);
-
msg->msg_type = LNET_MSG_GET; /* fix type */
- counters->send_length += msg->msg_len;
break;
}
if (status != 0)
goto out;
+ counters = the_lnet.ln_counters[msg->msg_rx_cpt];
switch (ev->type) {
default:
LASSERT(ev->type == 0);
break;
case LNET_EVENT_GET:
- LASSERT(msg->msg_type == LNET_MSG_GET);
+ /* type is "REPLY" if it's an optimized GET on passive side,
+ * because optimized GET will never be committed for sending,
+ * so message type wouldn't be changed back to "GET" by
+ * lnet_msg_decommit_tx(), see details in lnet_parse_get() */
+ LASSERT(msg->msg_type == LNET_MSG_REPLY ||
+ msg->msg_type == LNET_MSG_GET);
+ counters->send_length += msg->msg_wanted;
break;
case LNET_EVENT_PUT:
break;
case LNET_EVENT_REPLY:
- LASSERT(msg->msg_type == LNET_MSG_REPLY ||
- msg->msg_type == LNET_MSG_GET); /* optimized GET */
+ /* type is "GET" if it's an optimized GET on active side,
+ * see details in lnet_create_reply_msg() */
+ LASSERT(msg->msg_type == LNET_MSG_GET ||
+ msg->msg_type == LNET_MSG_REPLY);
break;
}
- counters = the_lnet.ln_counters[msg->msg_rx_cpt];
counters->recv_count++;
if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
counters->recv_length += msg->msg_wanted;
LASSERT(!msg->msg_routing);
msg->msg_md = md;
- if (msg->msg_receiving) { /* commited for receiving */
+ if (msg->msg_receiving) { /* committed for receiving */
msg->msg_offset = offset;
msg->msg_wanted = mlen;
}
msg->msg_md = NULL;
}
-void
+static int
lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
{
lnet_handle_wire_t ack_wmd;
rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY);
lnet_net_lock(cpt);
+ /*
+ * NB: message is committed for sending, we should return
+ * on success because LND will finalize this message later.
+ *
+ * Also, there is possibility that message is committed for
+ * sending and also failed before delivering to LND,
+ * i.e: ENOMEM, in that case we can't fall through either
+ * because CPT for sending can be different with CPT for
+ * receiving, so we should return back to lnet_finalize()
+ * to make sure we are locking the correct partition.
+ */
+ return rc;
- if (rc == 0)
- return;
} else if (status == 0 && /* OK so far */
(msg->msg_routing && !msg->msg_sending)) {
/* not forwarded */
rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
lnet_net_lock(cpt);
-
- if (rc == 0)
- return;
+ /*
+ * NB: message is committed for sending, we should return
+ * on success because LND will finalize this message later.
+ *
+ * Also, there is possibility that message is committed for
+ * sending and also failed before delivering to LND,
+ * i.e: ENOMEM, in that case we can't fall through either:
+ * - The rule is message must decommit for sending first if
+ * the it's committed for both sending and receiving
+ * - CPT for sending can be different with CPT for receiving,
+ * so we should return back to lnet_finalize() to make
+ * sure we are locking the correct partition.
+ */
+ return rc;
}
lnet_msg_decommit(msg, cpt, status);
lnet_msg_free_locked(msg);
+ return 0;
}
void
struct lnet_msg_container *container;
int my_slot;
int cpt;
+ int rc;
int i;
- LASSERT (!cfs_in_interrupt ());
+ LASSERT (!in_interrupt ());
- if (msg == NULL)
- return;
+ if (msg == NULL)
+ return;
#if 0
CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
#endif
-
- LASSERT (msg->msg_onactivelist);
-
msg->msg_ev.status = status;
if (msg->msg_md != NULL) {
lnet_res_unlock(cpt);
}
+ again:
+ rc = 0;
if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
- /* not commited to network yet */
+ /* not committed to network yet */
LASSERT(!msg->msg_onactivelist);
lnet_msg_free(msg);
return;
}
/*
- * NB: routed message can be commited for both receiving and sending,
+ * NB: routed message can be committed for both receiving and sending,
* we should finalize in LIFO order and keep counters correct.
* (finalize sending first then finalize receiving)
*/
#ifdef __KERNEL__
my_slot = -1;
for (i = 0; i < container->msc_nfinalizers; i++) {
- if (container->msc_finalizers[i] == cfs_current())
- goto out;
+ if (container->msc_finalizers[i] == current)
+ break;
if (my_slot < 0 && container->msc_finalizers[i] == NULL)
my_slot = i;
}
- if (my_slot < 0)
- goto out;
+ if (i < container->msc_nfinalizers || my_slot < 0) {
+ lnet_net_unlock(cpt);
+ return;
+ }
- container->msc_finalizers[my_slot] = cfs_current();
+ container->msc_finalizers[my_slot] = current;
#else
LASSERT(container->msc_nfinalizers == 1);
- if (container->msc_finalizers[0] != NULL)
- goto out;
+ if (container->msc_finalizers[0] != NULL) {
+ lnet_net_unlock(cpt);
+ return;
+ }
my_slot = i = 0;
container->msc_finalizers[0] = (struct lnet_msg_container *)1;
/* NB drops and regains the lnet lock if it actually does
* anything, so my finalizing friends can chomp along too */
- lnet_complete_msg_locked(msg, cpt);
+ rc = lnet_complete_msg_locked(msg, cpt);
+ if (rc != 0)
+ break;
}
container->msc_finalizers[my_slot] = NULL;
- out:
lnet_net_unlock(cpt);
+
+ if (rc != 0)
+ goto again;
}
+EXPORT_SYMBOL(lnet_finalize);
void
lnet_msg_container_cleanup(struct lnet_msg_container *container)