int lnet_islocalnid(lnet_nid_t nid);
int lnet_islocalnet(__u32 net);
-void lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg,
- unsigned int offset, unsigned int mlen);
+void lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
+ unsigned int offset, unsigned int mlen);
+void lnet_msg_detach_md(lnet_msg_t *msg, int status);
void lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev);
void lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type);
+void lnet_msg_commit(lnet_msg_t *msg, int sending);
+void lnet_msg_decommit(lnet_msg_t *msg, int status);
+
void lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev);
void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
unsigned int offset, unsigned int len);
int lnet_send(lnet_nid_t nid, lnet_msg_t *msg);
-void lnet_return_credits_locked (lnet_msg_t *msg);
+void lnet_return_tx_credits_locked(lnet_msg_t *msg);
+void lnet_return_rx_credits_locked(lnet_msg_t *msg);
/* portals functions */
static inline int
lnet_nid_t msg_from;
__u32 msg_type;
+ unsigned int msg_rx_committed:1;
+ unsigned int msg_tx_committed:1;
+
unsigned int msg_vmflush:1; /* VM trying to free memory */
unsigned int msg_target_is_router:1; /* sending to a router */
unsigned int msg_routing:1; /* being forwarded */
* Six types of events can be logged in an event queue.
*/
typedef enum {
- /** An incoming GET operation has completed on the MD. */
- LNET_EVENT_GET,
+ /** An incoming GET operation has completed on the MD. */
+ LNET_EVENT_GET = 1,
/**
* An incoming PUT operation has completed on the MD. The
* underlying layers will not alter the memory (on behalf of this
}
#ifdef __KERNEL__
-static void
-lnet_commit_routedmsg (lnet_msg_t *msg)
-{
- /* ALWAYS called holding the LNET_LOCK */
- LASSERT (msg->msg_routing);
-
- the_lnet.ln_counters.msgs_alloc++;
- if (the_lnet.ln_counters.msgs_alloc >
- the_lnet.ln_counters.msgs_max)
- the_lnet.ln_counters.msgs_max =
- the_lnet.ln_counters.msgs_alloc;
-
- the_lnet.ln_counters.route_count++;
- the_lnet.ln_counters.route_length += msg->msg_len;
-
- LASSERT (!msg->msg_onactivelist);
- msg->msg_onactivelist = 1;
- cfs_list_add(&msg->msg_activelist,
- &the_lnet.ln_msg_container.msc_active);
-}
lnet_rtrbufpool_t *
lnet_msg2bufpool(lnet_msg_t *msg)
#endif
void
-lnet_return_credits_locked (lnet_msg_t *msg)
+lnet_return_tx_credits_locked(lnet_msg_t *msg)
{
- lnet_peer_t *txpeer = msg->msg_txpeer;
- lnet_peer_t *rxpeer = msg->msg_rxpeer;
- lnet_msg_t *msg2;
- lnet_ni_t *ni;
+ lnet_peer_t *txpeer = msg->msg_txpeer;
+ lnet_msg_t *msg2;
+ lnet_ni_t *ni;
if (msg->msg_txcredit) {
/* give back NI txcredits */
msg->msg_txpeer = NULL;
lnet_peer_decref_locked(txpeer);
}
+}
+void
+lnet_return_rx_credits_locked(lnet_msg_t *msg)
+{
+ lnet_peer_t *rxpeer = msg->msg_rxpeer;
#ifdef __KERNEL__
+ lnet_msg_t *msg2;
+
if (msg->msg_rtrcredit) {
/* give back global router credits */
lnet_rtrbuf_t *rb;
LASSERT (!msg->msg_routing);
}
+ lnet_msg_commit(msg, 1);
/* Is this for someone on a local network? */
local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid));
return 0;
}
-void
-lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg,
- unsigned int offset, unsigned int mlen)
-{
- /* ALWAYS called holding the LNET_LOCK */
- /* Here, we commit the MD to a network OP by marking it busy and
- * decrementing its threshold. Come what may, the network "owns"
- * the MD until a call to lnet_finalize() signals completion. */
- LASSERT (!msg->msg_routing);
-
- msg->msg_md = md;
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
-
- if (msg->msg_receiving) {
- msg->msg_offset = offset;
- msg->msg_wanted = mlen;
- }
-
- md->md_refcount++;
- if (md->md_threshold != LNET_MD_THRESH_INF) {
- LASSERT (md->md_threshold > 0);
- md->md_threshold--;
- }
-
- the_lnet.ln_counters.msgs_alloc++;
- if (the_lnet.ln_counters.msgs_alloc >
- the_lnet.ln_counters.msgs_max)
- the_lnet.ln_counters.msgs_max =
- the_lnet.ln_counters.msgs_alloc;
-
- LASSERT (!msg->msg_onactivelist);
- msg->msg_onactivelist = 1;
- cfs_list_add(&msg->msg_activelist,
- &the_lnet.ln_msg_container.msc_active);
-}
-
static void
lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
{
{
lnet_hdr_t *hdr = &msg->msg_hdr;
- LNET_LOCK();
-
- the_lnet.ln_counters.recv_count++;
- the_lnet.ln_counters.recv_length += msg->msg_wanted;
-
- LNET_UNLOCK();
-
if (msg->msg_wanted != 0)
lnet_setpayloadbuffer(msg);
LASSERT (rc == LNET_MATCHMD_OK);
- the_lnet.ln_counters.send_count++;
- the_lnet.ln_counters.send_length += msg->msg_wanted;
-
LNET_UNLOCK();
lnet_build_msg_event(msg, LNET_EVENT_GET);
libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
- lnet_commit_md(md, msg, 0, mlength);
+ lnet_msg_attach_md(msg, md, 0, mlength);
if (mlength != 0)
lnet_setpayloadbuffer(msg);
- the_lnet.ln_counters.recv_count++;
- the_lnet.ln_counters.recv_length += mlength;
-
LNET_UNLOCK();
lnet_build_msg_event(msg, LNET_EVENT_REPLY);
libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
hdr->msg.ack.dst_wmd.wh_object_cookie);
- lnet_commit_md(md, msg, 0, 0);
-
- the_lnet.ln_counters.recv_count++;
+ lnet_msg_attach_md(msg, md, 0, 0);
LNET_UNLOCK();
"(error %d looking up sender)\n",
libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
lnet_msgtyp2str(type), rc);
- goto free_drop;
- }
+ lnet_msg_free(msg);
+ goto drop;
+ }
+
+ lnet_msg_commit(msg, 0);
LNET_UNLOCK();
#ifndef __KERNEL__
goto free_drop;
}
}
- lnet_commit_routedmsg(msg);
rc = lnet_post_routed_recv_locked(msg, 0);
LNET_UNLOCK();
break;
default:
LASSERT(0);
+ rc = -EPROTO;
goto free_drop; /* prevent an unused label if !kernel */
}
LASSERT (rc == ENOENT);
free_drop:
- LASSERT (msg->msg_md == NULL);
- LNET_LOCK();
- if (msg->msg_rxpeer != NULL) {
- lnet_peer_decref_locked(msg->msg_rxpeer);
- msg->msg_rxpeer = NULL;
- }
- lnet_msg_free_locked(msg); /* expects LNET_LOCK held */
- LNET_UNLOCK();
+ LASSERT(msg->msg_md == NULL);
+ lnet_finalize(ni, msg, rc);
drop:
- lnet_drop_message(ni, private, payload_length);
- return 0;
+ lnet_drop_message(ni, private, payload_length);
+ return 0;
}
void
CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
- lnet_commit_md(md, msg, 0, 0);
+ lnet_msg_attach_md(msg, md, 0, 0);
lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
LNET_WIRE_HANDLE_COOKIE_NONE;
}
- the_lnet.ln_counters.send_count++;
- the_lnet.ln_counters.send_length += md->md_length;
-
LNET_UNLOCK();
lnet_build_msg_event(msg, LNET_EVENT_SEND);
msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
msg->msg_hdr.src_nid = peer_id.nid;
msg->msg_hdr.payload_length = getmd->md_length;
+ msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
- lnet_commit_md(getmd, msg, getmd->md_offset, getmd->md_length);
-
- the_lnet.ln_counters.recv_count++;
- the_lnet.ln_counters.recv_length += getmd->md_length;
+ lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
+ lnet_msg_commit(msg, 0);
LNET_UNLOCK();
CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
- lnet_commit_md(md, msg, 0, 0);
+ lnet_msg_attach_md(msg, md, 0, 0);
lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
md->md_lh.lh_cookie;
- the_lnet.ln_counters.send_count++;
-
LNET_UNLOCK();
lnet_build_msg_event(msg, LNET_EVENT_SEND);
}
void
+lnet_msg_commit(lnet_msg_t *msg, int sending)
+{
+ struct lnet_msg_container *container = &the_lnet.ln_msg_container;
+ lnet_counters_t *counters = &the_lnet.ln_counters;
+
+ /* routed message can be committed for both receiving and sending */
+ LASSERT(!msg->msg_tx_committed);
+
+ if (msg->msg_rx_committed) { /* routed message, or reply for GET */
+ LASSERT(sending);
+ LASSERT(msg->msg_onactivelist);
+ msg->msg_tx_committed = 1;
+ return;
+ }
+
+ LASSERT(!msg->msg_onactivelist);
+ msg->msg_onactivelist = 1;
+ cfs_list_add(&msg->msg_activelist, &container->msc_active);
+
+ counters->msgs_alloc++;
+ if (counters->msgs_alloc > counters->msgs_max)
+ counters->msgs_max = counters->msgs_alloc;
+
+ if (sending)
+ msg->msg_tx_committed = 1;
+ else
+ msg->msg_rx_committed = 1;
+}
+
+static void
+lnet_msg_tx_decommit(lnet_msg_t *msg, int status)
+{
+ lnet_counters_t *counters = &the_lnet.ln_counters;
+ lnet_event_t *ev = &msg->msg_ev;
+
+ LASSERT(msg->msg_tx_committed);
+ if (status != 0)
+ goto out;
+
+ switch (ev->type) {
+ default: /* routed message */
+ LASSERT(msg->msg_routing);
+ LASSERT(msg->msg_rx_committed);
+ LASSERT(ev->type == 0);
+
+ counters->route_length += msg->msg_len;
+ counters->route_count++;
+ goto out;
+
+ case LNET_EVENT_PUT:
+ /* should have been decommitted */
+ LASSERT(!msg->msg_rx_committed);
+ /* overwritten while sending ACK */
+ LASSERT(msg->msg_type == LNET_MSG_ACK);
+ msg->msg_type = LNET_MSG_PUT; /* fix type */
+ break;
+
+ case LNET_EVENT_SEND:
+ LASSERT(!msg->msg_rx_committed);
+ if (msg->msg_type == LNET_MSG_PUT)
+ counters->send_length += msg->msg_len;
+ break;
+
+ case LNET_EVENT_GET:
+ LASSERT(msg->msg_rx_committed);
+ /* overwritten while sending reply */
+ LASSERT(msg->msg_type == LNET_MSG_REPLY);
+
+ msg->msg_type = LNET_MSG_GET; /* fix type */
+ counters->send_length += msg->msg_len;
+ break;
+ }
+
+ counters->send_count++;
+ out:
+ lnet_return_tx_credits_locked(msg);
+ msg->msg_tx_committed = 0;
+}
+
+static void
+lnet_msg_rx_decommit(lnet_msg_t *msg, int status)
+{
+ lnet_counters_t *counters = &the_lnet.ln_counters;
+ lnet_event_t *ev = &msg->msg_ev;
+
+ LASSERT(!msg->msg_tx_committed); /* decommitted or uncommitted */
+ LASSERT(msg->msg_rx_committed);
+
+ if (status != 0)
+ goto out;
+
+ switch (ev->type) {
+ default:
+ LASSERT(ev->type == 0);
+ LASSERT(msg->msg_routing);
+ goto out;
+
+ case LNET_EVENT_ACK:
+ LASSERT(msg->msg_type == LNET_MSG_ACK);
+ break;
+
+ case LNET_EVENT_GET:
+ LASSERT(msg->msg_type == LNET_MSG_GET);
+ break;
+
+ case LNET_EVENT_PUT:
+ LASSERT(msg->msg_type == LNET_MSG_PUT);
+ break;
+
+ case LNET_EVENT_REPLY:
+ LASSERT(msg->msg_type == LNET_MSG_REPLY ||
+ msg->msg_type == LNET_MSG_GET); /* optimized GET */
+ break;
+ }
+
+ counters->recv_count++;
+ if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
+ counters->recv_length += msg->msg_wanted;
+
+ out:
+ lnet_return_rx_credits_locked(msg);
+ msg->msg_rx_committed = 0;
+}
+
+void
+lnet_msg_decommit(lnet_msg_t *msg, int status)
+{
+ lnet_counters_t *counters = &the_lnet.ln_counters;
+
+ LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
+ LASSERT(msg->msg_onactivelist);
+
+ if (msg->msg_tx_committed) /* always decommit for sending first */
+ lnet_msg_tx_decommit(msg, status);
+
+ if (msg->msg_rx_committed)
+ lnet_msg_rx_decommit(msg, status);
+
+ cfs_list_del(&msg->msg_activelist);
+ msg->msg_onactivelist = 0;
+ counters->msgs_alloc--;
+}
+
+void
+lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
+ unsigned int offset, unsigned int mlen)
+{
+ /* Here, we attach the MD on lnet_msg and mark it busy and
+ * decrementing its threshold. Come what may, the lnet_msg "owns"
+ * the MD until a call to lnet_msg_detach_md or lnet_finalize()
+ * signals completion. */
+ LASSERT(!msg->msg_routing);
+
+ msg->msg_md = md;
+ if (msg->msg_receiving) { /* commited for receiving */
+ msg->msg_offset = offset;
+ msg->msg_wanted = mlen;
+ }
+
+ md->md_refcount++;
+ if (md->md_threshold != LNET_MD_THRESH_INF) {
+ LASSERT(md->md_threshold > 0);
+ md->md_threshold--;
+ }
+
+ /* build umd in event */
+ lnet_md2handle(&msg->msg_ev.md_handle, md);
+ lnet_md_deconstruct(md, &msg->msg_ev.md);
+}
+
+void
+lnet_msg_detach_md(lnet_msg_t *msg, int status)
+{
+ lnet_libmd_t *md = msg->msg_md;
+ int unlink;
+
+ if (md == NULL)
+ return;
+
+ /* Now it's safe to drop my caller's ref */
+ md->md_refcount--;
+ LASSERT(md->md_refcount >= 0);
+
+ unlink = lnet_md_unlinkable(md);
+ if (md->md_eq != NULL) {
+ msg->msg_ev.status = status;
+ msg->msg_ev.unlinked = unlink;
+ lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
+ }
+
+ if (unlink)
+ lnet_md_unlink(md);
+
+ msg->msg_md = NULL;
+}
+
+void
lnet_complete_msg_locked(lnet_msg_t *msg)
{
lnet_handle_wire_t ack_wmd;
if (status == 0 && msg->msg_ack) {
/* Only send an ACK if the PUT completed successfully */
- lnet_return_credits_locked(msg);
+ lnet_msg_decommit(msg, 0);
msg->msg_ack = 0;
LNET_UNLOCK();
return;
}
- lnet_return_credits_locked(msg);
-
- LASSERT (msg->msg_onactivelist);
- msg->msg_onactivelist = 0;
- cfs_list_del (&msg->msg_activelist);
- the_lnet.ln_counters.msgs_alloc--;
+ lnet_msg_decommit(msg, status);
lnet_msg_free_locked(msg);
}
lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
{
struct lnet_msg_container *container;
- lnet_libmd_t *md;
int my_slot;
int i;
msg->msg_ev.status = status;
- md = msg->msg_md;
- if (md != NULL) {
- int unlink;
-
- /* Now it's safe to drop my caller's ref */
- md->md_refcount--;
- LASSERT (md->md_refcount >= 0);
-
- unlink = lnet_md_unlinkable(md);
-
- msg->msg_ev.unlinked = unlink;
-
- if (md->md_eq != NULL)
- lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
+ if (msg->msg_md != NULL)
+ lnet_msg_detach_md(msg, status);
- if (unlink)
- lnet_md_unlink(md);
-
- msg->msg_md = NULL;
- }
+ if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
+ LNET_UNLOCK();
+ /* not commited to network yet */
+ LASSERT(!msg->msg_onactivelist);
+ lnet_msg_free(msg);
+ return;
+ }
container = &the_lnet.ln_msg_container;
cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
index, libcfs_id2str(src), mlength, rlength,
md->md_lh.lh_cookie, md->md_niov, offset);
- lnet_commit_md(md, msg, offset, mlength);
+ lnet_msg_attach_md(msg, md, offset, mlength);
md->md_offset = offset + mlength;
/* Auto-unlink NOW, so the ME gets unlinked if required.