#include <portals/lib-p30.h>
#include <portals/arg-blocks.h>
-/*
- * Right now it does not check access control lists.
- *
- * We only support one MD per ME, which is how the Portals 3.1 spec is written.
- * All previous complication is removed.
- */
-
-static lib_me_t *
-lib_find_me(nal_cb_t *nal, int index, int op_mask, ptl_nid_t src_nid,
- ptl_pid_t src_pid, ptl_size_t rlength, ptl_size_t roffset,
- ptl_match_bits_t match_bits, ptl_size_t *mlength_out,
- ptl_size_t *offset_out, int *unlink_out)
+/* forward ref */
+static void lib_commit_md (nal_cb_t *nal, lib_md_t *md, lib_msg_t *msg);
+
+static lib_md_t *
+lib_match_md(nal_cb_t *nal, int index, int op_mask,
+ ptl_nid_t src_nid, ptl_pid_t src_pid,
+ ptl_size_t rlength, ptl_size_t roffset,
+ ptl_match_bits_t match_bits, lib_msg_t *msg,
+ ptl_size_t *mlength_out, ptl_size_t *offset_out)
{
lib_ni_t *ni = &nal->ni;
struct list_head *match_list = &ni->tbl.tbl[index];
lib_md_t *md;
ptl_size_t mlength;
ptl_size_t offset;
-
ENTRY;
CDEBUG (D_NET, "Request from "LPU64".%d of length %d into portal %d "
LASSERT (me == md->me);
- /* MD deactivated */
- if (md->threshold == 0)
- continue;
-
/* mismatched MD op */
if ((md->options & op_mask) == 0)
continue;
+ /* MD exhausted */
+ if (lib_md_exhausted(md))
+ continue;
+
/* mismatched ME nid/pid? */
if (me->match_id.nid != PTL_NID_ANY &&
me->match_id.nid != src_nid)
else
offset = roffset;
- mlength = md->length - offset;
- if ((md->options & PTL_MD_MAX_SIZE) != 0 &&
- mlength > md->max_size)
+ if ((md->options & PTL_MD_MAX_SIZE) != 0) {
mlength = md->max_size;
+ LASSERT (md->offset + mlength <= md->length);
+ } else {
+ mlength = md->length - offset;
+ }
if (rlength <= mlength) { /* fits in allowed space */
mlength = rlength;
goto failed;
}
+ /* Commit to this ME/MD */
+ CDEBUG(D_NET, "Incoming %s index %x from "LPU64"/%u of "
+ "length %d/%d into md "LPX64" [%d] + %d\n",
+ (op_mask == PTL_MD_OP_PUT) ? "put" : "get",
+ index, src_nid, src_pid, mlength, rlength,
+ md->md_lh.lh_cookie, md->md_niov, offset);
+
+ lib_commit_md(nal, md, msg);
md->offset = offset + mlength;
+ /* NB Caller sets ev.type and ev.hdr_data */
+ msg->ev.initiator.nid = src_nid;
+ msg->ev.initiator.pid = src_pid;
+ msg->ev.portal = index;
+ msg->ev.match_bits = match_bits;
+ msg->ev.rlength = rlength;
+ msg->ev.mlength = mlength;
+ msg->ev.offset = offset;
+
+ lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
+
*offset_out = offset;
*mlength_out = mlength;
- *unlink_out = ((md->options & PTL_MD_AUTO_UNLINK) != 0 &&
- md->offset >= (md->length - md->max_size));
- RETURN (me);
+
+ /* Auto-unlink NOW, so the ME gets unlinked if required.
+ * We bumped md->pending above so the MD just gets flagged
+ * for unlink when it is finalized. */
+ if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) != 0 &&
+ lib_md_exhausted(md))
+ lib_md_unlink(nal, md);
+
+ RETURN (md);
}
failed:
lib_ni_t *ni = &nal->ni;
ptl_size_t mlength = 0;
ptl_size_t offset = 0;
- int unlink = 0;
ptl_err_t rc;
- lib_me_t *me;
lib_md_t *md;
unsigned long flags;
state_lock(nal, &flags);
- me = lib_find_me(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
- hdr->src_nid, hdr->src_pid,
- hdr->payload_length, hdr->msg.put.offset,
- hdr->msg.put.match_bits,
- &mlength, &offset, &unlink);
- if (me == NULL) {
+ md = lib_match_md(nal, hdr->msg.put.ptl_index, PTL_MD_OP_PUT,
+ hdr->src_nid, hdr->src_pid,
+ hdr->payload_length, hdr->msg.put.offset,
+ hdr->msg.put.match_bits, msg,
+ &mlength, &offset);
+ if (md == NULL) {
state_unlock(nal, &flags);
return (PTL_FAIL);
}
- md = me->md;
- CDEBUG(D_NET, "Incoming put index %x from "LPU64"/%u of length %d/%d "
- "into md "LPX64" [%d] + %d\n", hdr->msg.put.ptl_index,
- hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length,
- md->md_lh.lh_cookie, md->md_niov, offset);
-
- lib_commit_md(nal, md, msg);
-
- msg->ev.type = PTL_EVENT_PUT;
- msg->ev.initiator.nid = hdr->src_nid;
- msg->ev.initiator.pid = hdr->src_pid;
- msg->ev.portal = hdr->msg.put.ptl_index;
- msg->ev.match_bits = hdr->msg.put.match_bits;
- msg->ev.rlength = hdr->payload_length;
- msg->ev.mlength = mlength;
- msg->ev.offset = offset;
+ msg->ev.type = PTL_EVENT_PUT_END;
msg->ev.hdr_data = hdr->msg.put.hdr_data;
- lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
-
if (!ptl_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
!(md->options & PTL_MD_ACK_DISABLE)) {
msg->ack_wmd = hdr->msg.put.ack_wmd;
ni->counters.recv_count++;
ni->counters.recv_length += mlength;
- /* only unlink after MD's pending count has been bumped in
- * lib_commit_md() otherwise lib_me_unlink() will nuke it */
- if (unlink)
- lib_me_unlink (nal, me);
-
state_unlock(nal, &flags);
rc = lib_recv(nal, private, msg, md, offset, mlength,
lib_ni_t *ni = &nal->ni;
ptl_size_t mlength = 0;
ptl_size_t offset = 0;
- int unlink = 0;
- lib_me_t *me;
lib_md_t *md;
ptl_hdr_t reply;
unsigned long flags;
state_lock(nal, &flags);
- me = lib_find_me(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
- hdr->src_nid, hdr->src_pid,
- hdr->msg.get.sink_length, hdr->msg.get.src_offset,
- hdr->msg.get.match_bits,
- &mlength, &offset, &unlink);
- if (me == NULL) {
+ md = lib_match_md(nal, hdr->msg.get.ptl_index, PTL_MD_OP_GET,
+ hdr->src_nid, hdr->src_pid,
+ hdr->msg.get.sink_length, hdr->msg.get.src_offset,
+ hdr->msg.get.match_bits, msg,
+ &mlength, &offset);
+ if (md == NULL) {
state_unlock(nal, &flags);
return (PTL_FAIL);
}
- md = me->md;
- CDEBUG(D_NET, "Incoming get index %d from "LPU64".%u of length %d/%d "
- "from md "LPX64" [%d] + %d\n", hdr->msg.get.ptl_index,
- hdr->src_nid, hdr->src_pid, mlength, hdr->payload_length,
- md->md_lh.lh_cookie, md->md_niov, offset);
-
- lib_commit_md(nal, md, msg);
-
- msg->ev.type = PTL_EVENT_GET;
- msg->ev.initiator.nid = hdr->src_nid;
- msg->ev.initiator.pid = hdr->src_pid;
- msg->ev.portal = hdr->msg.get.ptl_index;
- msg->ev.match_bits = hdr->msg.get.match_bits;
- msg->ev.rlength = hdr->payload_length;
- msg->ev.mlength = mlength;
- msg->ev.offset = offset;
+ msg->ev.type = PTL_EVENT_GET_END;
msg->ev.hdr_data = 0;
- lib_md_deconstruct(nal, md, &msg->ev.mem_desc);
-
ni->counters.send_count++;
ni->counters.send_length += mlength;
- /* only unlink after MD's refcount has been bumped in
- * lib_commit_md() otherwise lib_me_unlink() will nuke it */
- if (unlink)
- lib_me_unlink (nal, me);
-
state_unlock(nal, &flags);
memset (&reply, 0, sizeof (reply));
lib_commit_md(nal, md, msg);
- msg->ev.type = PTL_EVENT_REPLY;
+ msg->ev.type = PTL_EVENT_REPLY_END;
msg->ev.initiator.nid = hdr->src_nid;
msg->ev.initiator.pid = hdr->src_pid;
msg->ev.rlength = rlength;
return;
}
- do_gettimeofday(&msg->ev.arrival_time);
-
switch (hdr->type) {
case PTL_MSG_ACK:
rc = parse_ack(nal, hdr, private, msg);
{
CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
nal->ni.nid, id->nid);
- return (ret->rc = PTL_INV_PROC);
+ return (ret->rc = PTL_PROCESS_INVALID);
}
msg = lib_msg_alloc(nal);
if (msg == NULL) {
CERROR(LPU64": Dropping PUT to "LPU64": ENOMEM on lib_msg_t\n",
ni->nid, id->nid);
- return (ret->rc = PTL_NOSPACE);
+ return (ret->rc = PTL_NO_SPACE);
}
state_lock(nal, &flags);
lib_msg_free(nal, msg);
state_unlock(nal, &flags);
- return (ret->rc = PTL_INV_MD);
+ return (ret->rc = PTL_MD_INVALID);
}
CDEBUG(D_NET, "PtlPut -> %Lu: %lu\n", (unsigned long long)id->nid,
lib_commit_md(nal, md, msg);
- msg->ev.type = PTL_EVENT_SENT;
+ msg->ev.type = PTL_EVENT_SEND_END;
msg->ev.initiator.nid = ni->nid;
msg->ev.initiator.pid = ni->pid;
msg->ev.portal = args->portal_in;
}
lib_msg_t *
-lib_fake_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_md_t *getmd)
+lib_create_reply_msg (nal_cb_t *nal, ptl_nid_t peer_nid, lib_msg_t *getmsg)
{
/* The NAL can DMA direct to the GET md (i.e. no REPLY msg). This
- * returns a msg the NAL can pass to lib_finalize() so that a REPLY
- * event still occurs.
+ * returns a msg for the NAL to pass to lib_finalize() when the sink
+ * data has been received.
*
- * CAVEAT EMPTOR: 'getmd' is passed by pointer so it MUST be valid.
- * This can only be guaranteed while a lib_msg_t holds a reference
- * on it (ie. pending > 0), so best call this before the
- * lib_finalize() of the original GET. */
+ * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
+ * lib_finalize() is called on it, so the NAL must call this first */
lib_ni_t *ni = &nal->ni;
lib_msg_t *msg = lib_msg_alloc(nal);
+ lib_md_t *getmd = getmsg->md;
unsigned long flags;
state_lock(nal, &flags);
lib_commit_md (nal, getmd, msg);
- msg->ev.type = PTL_EVENT_REPLY;
+ msg->ev.type = PTL_EVENT_REPLY_END;
msg->ev.initiator.nid = peer_nid;
msg->ev.initiator.pid = 0; /* XXX FIXME!!! */
msg->ev.rlength = msg->ev.mlength = getmd->length;
{
CERROR(LPU64": Dropping PUT to "LPU64": simulated failure\n",
nal->ni.nid, id->nid);
- return (ret->rc = PTL_INV_PROC);
+ return (ret->rc = PTL_PROCESS_INVALID);
}
msg = lib_msg_alloc(nal);
if (msg == NULL) {
CERROR(LPU64": Dropping GET to "LPU64": ENOMEM on lib_msg_t\n",
ni->nid, id->nid);
- return (ret->rc = PTL_NOSPACE);
+ return (ret->rc = PTL_NO_SPACE);
}
state_lock(nal, &flags);
lib_msg_free(nal, msg);
state_unlock(nal, &flags);
- return ret->rc = PTL_INV_MD;
+ return ret->rc = PTL_MD_INVALID;
}
CDEBUG(D_NET, "PtlGet -> %Lu: %lu\n", (unsigned long long)id->nid,
lib_commit_md(nal, md, msg);
- msg->ev.type = PTL_EVENT_SENT;
+ msg->ev.type = PTL_EVENT_SEND_END;
msg->ev.initiator.nid = ni->nid;
msg->ev.initiator.pid = ni->pid;
msg->ev.portal = args->portal_in;