LASSERT(!msg->msg_sending);
LASSERT(rlen == msg->msg_len);
LASSERT(mlen <= msg->msg_len);
+ LASSERT(msg->msg_offset == offset);
+ LASSERT(msg->msg_wanted == mlen);
- msg->msg_wanted = mlen;
- msg->msg_offset = offset;
msg->msg_receiving = 0;
if (mlen != 0) {
}
void
-lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
+lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg,
+ unsigned int offset, unsigned int mlen)
{
/* ALWAYS called holding the LNET_LOCK */
/* Here, we commit the MD to a network OP by marking it busy and
LASSERT (!msg->msg_routing);
msg->msg_md = md;
+ lnet_md_deconstruct(md, &msg->msg_ev.md);
+ lnet_md2handle(&msg->msg_ev.md_handle, md);
+
+ if (msg->msg_receiving) {
+ msg->msg_offset = offset;
+ msg->msg_wanted = mlen;
+ }
md->md_refcount++;
if (md->md_threshold != LNET_MD_THRESH_INF) {
}
static void
-lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
- unsigned int offset, unsigned int mlength)
+lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
{
- lnet_hdr_t *hdr = &msg->msg_hdr;
+ lnet_hdr_t *hdr = &msg->msg_hdr;
- LNET_LOCK();
+ LNET_LOCK();
- the_lnet.ln_counters.recv_count++;
- the_lnet.ln_counters.recv_length += mlength;
+ the_lnet.ln_counters.recv_count++;
+ the_lnet.ln_counters.recv_length += msg->msg_wanted;
- LNET_UNLOCK();
+ LNET_UNLOCK();
- if (mlength != 0)
- lnet_setpayloadbuffer(msg);
+ if (msg->msg_wanted != 0)
+ lnet_setpayloadbuffer(msg);
- msg->msg_ev.type = LNET_EVENT_PUT;
- msg->msg_ev.target.pid = hdr->dest_pid;
- msg->msg_ev.target.nid = hdr->dest_nid;
- msg->msg_ev.hdr_data = hdr->msg.put.hdr_data;
+ lnet_build_msg_event(msg, LNET_EVENT_PUT);
/* Must I ACK? If so I'll grab the ack_wmd out of the header and put
* it back into the ACK during lnet_finalize() */
- msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
- (md->md_options & LNET_MD_ACK_DISABLE) == 0);
+ msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
+ (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
- lnet_ni_recv(msg->msg_rxpeer->lp_ni,
- msg->msg_private,
- msg, delayed, offset, mlength,
- hdr->payload_length);
+ lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_delayed,
+ msg->msg_offset, msg->msg_wanted, hdr->payload_length);
}
static int
__u64 version;
lnet_hdr_t *hdr = &msg->msg_hdr;
unsigned int rlength = hdr->payload_length;
- unsigned int mlength = 0;
- unsigned int offset = 0;
lnet_process_id_t src= {0};
- lnet_libmd_t *md;
lnet_portal_t *ptl;
src.nid = hdr->src_nid;
again:
rc = lnet_match_md(index, LNET_MD_OP_PUT, src,
rlength, hdr->msg.put.offset,
- hdr->msg.put.match_bits, msg,
- &mlength, &offset, &md);
+ hdr->msg.put.match_bits, msg);
switch (rc) {
default:
LBUG();
case LNET_MATCHMD_OK:
LNET_UNLOCK();
- lnet_recv_put(md, msg, msg->msg_delayed, offset, mlength);
+ lnet_recv_put(ni, msg);
return 0;
case LNET_MATCHMD_NONE:
lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
{
lnet_hdr_t *hdr = &msg->msg_hdr;
- unsigned int mlength = 0;
- unsigned int offset = 0;
lnet_process_id_t src = {0};
lnet_handle_wire_t reply_wmd;
- lnet_libmd_t *md;
int rc;
src.nid = hdr->src_nid;
rc = lnet_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
hdr->msg.get.sink_length, hdr->msg.get.src_offset,
- hdr->msg.get.match_bits, msg,
- &mlength, &offset, &md);
+ hdr->msg.get.match_bits, msg);
if (rc == LNET_MATCHMD_DROP) {
CNETERR("Dropping GET from %s portal %d match "LPU64
" offset %d length %d\n",
LASSERT (rc == LNET_MATCHMD_OK);
the_lnet.ln_counters.send_count++;
- the_lnet.ln_counters.send_length += mlength;
+ the_lnet.ln_counters.send_length += msg->msg_wanted;
- LNET_UNLOCK();
+ LNET_UNLOCK();
- msg->msg_ev.type = LNET_EVENT_GET;
- msg->msg_ev.target.pid = hdr->dest_pid;
- msg->msg_ev.target.nid = hdr->dest_nid;
- msg->msg_ev.hdr_data = 0;
+ lnet_build_msg_event(msg, LNET_EVENT_GET);
- reply_wmd = hdr->msg.get.return_wmd;
+ reply_wmd = hdr->msg.get.return_wmd;
- lnet_prep_send(msg, LNET_MSG_REPLY, src, offset, mlength);
+ lnet_prep_send(msg, LNET_MSG_REPLY, src,
+ msg->msg_offset, msg->msg_wanted);
msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
- lnet_commit_md(md, msg);
+ lnet_commit_md(md, msg, 0, mlength);
if (mlength != 0)
lnet_setpayloadbuffer(msg);
- msg->msg_ev.type = LNET_EVENT_REPLY;
- msg->msg_ev.target.pid = hdr->dest_pid;
- msg->msg_ev.target.nid = hdr->dest_nid;
- msg->msg_ev.initiator = src;
- msg->msg_ev.rlength = rlength;
- msg->msg_ev.mlength = mlength;
- msg->msg_ev.offset = 0;
-
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
-
the_lnet.ln_counters.recv_count++;
the_lnet.ln_counters.recv_length += mlength;
LNET_UNLOCK();
+ lnet_build_msg_event(msg, LNET_EVENT_REPLY);
+
lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
return 0;
}
libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
hdr->msg.ack.dst_wmd.wh_object_cookie);
- lnet_commit_md(md, msg);
+ lnet_commit_md(md, msg, 0, 0);
- msg->msg_ev.type = LNET_EVENT_ACK;
- msg->msg_ev.target.pid = hdr->dest_pid;
- msg->msg_ev.target.nid = hdr->dest_nid;
- msg->msg_ev.initiator = src;
- msg->msg_ev.mlength = hdr->msg.ack.mlength;
- msg->msg_ev.match_bits = hdr->msg.ack.match_bits;
+ the_lnet.ln_counters.recv_count++;
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
+ LNET_UNLOCK();
- the_lnet.ln_counters.recv_count++;
-
- LNET_UNLOCK();
+ lnet_build_msg_event(msg, LNET_EVENT_ACK);
lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
return 0;
msg->msg_len = msg->msg_wanted = payload_length;
msg->msg_offset = 0;
msg->msg_hdr = *hdr;
+ /* for building message event */
+ msg->msg_from = from_nid;
LNET_LOCK();
rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid);
msg->msg_hdr.dest_pid = dest_pid;
msg->msg_hdr.payload_length = payload_length;
- msg->msg_ev.sender = from_nid;
-
switch (type) {
case LNET_MSG_ACK:
rc = lnet_parse_ack(ni, msg);
msg->msg_hdr.msg.put.offset,
msg->msg_hdr.payload_length);
- lnet_recv_put(msg->msg_md, msg, 1,
- msg->msg_ev.offset, msg->msg_ev.mlength);
+ lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
}
}
CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
- lnet_commit_md(md, msg);
+ lnet_commit_md(md, msg, 0, 0);
lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
LNET_WIRE_HANDLE_COOKIE_NONE;
}
- msg->msg_ev.type = LNET_EVENT_SEND;
- msg->msg_ev.initiator.nid = LNET_NID_ANY;
- msg->msg_ev.initiator.pid = the_lnet.ln_pid;
- msg->msg_ev.target = target;
- msg->msg_ev.sender = LNET_NID_ANY;
- msg->msg_ev.pt_index = portal;
- msg->msg_ev.match_bits = match_bits;
- msg->msg_ev.rlength = md->md_length;
- msg->msg_ev.mlength = md->md_length;
- msg->msg_ev.offset = offset;
- msg->msg_ev.hdr_data = hdr_data;
-
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
-
the_lnet.ln_counters.send_count++;
the_lnet.ln_counters.send_length += md->md_length;
LNET_UNLOCK();
+ lnet_build_msg_event(msg, LNET_EVENT_SEND);
+
rc = lnet_send(self, msg);
if (rc != 0) {
CNETERR( "Error sending PUT to %s: %d\n",
LASSERT (getmd->md_offset == 0);
- CDEBUG(D_NET, "%s: Reply from %s md %p\n",
- libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
-
- lnet_commit_md (getmd, msg);
+ CDEBUG(D_NET, "%s: Reply from %s md %p\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
+ /* setup information for lnet_build_msg_event */
+ msg->msg_from = peer_id.nid;
msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
+ msg->msg_hdr.src_nid = peer_id.nid;
+ msg->msg_hdr.payload_length = getmd->md_length;
- msg->msg_ev.type = LNET_EVENT_REPLY;
- msg->msg_ev.initiator = peer_id;
- msg->msg_ev.sender = peer_id.nid; /* optimized GETs can't be routed */
- msg->msg_ev.rlength = msg->msg_ev.mlength = getmd->md_length;
- msg->msg_ev.offset = 0;
+ lnet_commit_md(getmd, msg, getmd->md_offset, getmd->md_length);
- lnet_md_deconstruct(getmd, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, getmd);
+ the_lnet.ln_counters.recv_count++;
+ the_lnet.ln_counters.recv_length += getmd->md_length;
- the_lnet.ln_counters.recv_count++;
- the_lnet.ln_counters.recv_length += getmd->md_length;
+ LNET_UNLOCK();
- LNET_UNLOCK();
+ lnet_build_msg_event(msg, LNET_EVENT_REPLY);
- return msg;
+ return msg;
drop_msg:
lnet_msg_free_locked(msg);
drop:
- the_lnet.ln_counters.drop_count++;
- the_lnet.ln_counters.drop_length += getmd->md_length;
+ the_lnet.ln_counters.drop_count++;
+ the_lnet.ln_counters.drop_length += getmd->md_length;
- LNET_UNLOCK ();
+ LNET_UNLOCK ();
- return NULL;
+ return NULL;
}
void
CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
- lnet_commit_md(md, msg);
+ lnet_commit_md(md, msg, 0, 0);
lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
md->md_lh.lh_cookie;
- msg->msg_ev.type = LNET_EVENT_SEND;
- msg->msg_ev.initiator.nid = LNET_NID_ANY;
- msg->msg_ev.initiator.pid = the_lnet.ln_pid;
- msg->msg_ev.target = target;
- msg->msg_ev.sender = LNET_NID_ANY;
- msg->msg_ev.pt_index = portal;
- msg->msg_ev.match_bits = match_bits;
- msg->msg_ev.rlength = md->md_length;
- msg->msg_ev.mlength = md->md_length;
- msg->msg_ev.offset = offset;
- msg->msg_ev.hdr_data = 0;
-
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
-
the_lnet.ln_counters.send_count++;
LNET_UNLOCK();
+ lnet_build_msg_event(msg, LNET_EVENT_SEND);
+
rc = lnet_send(self, msg);
if (rc < 0) {
CNETERR( "Error sending GET to %s: %d\n",
static int
lnet_try_match_md(int index, int op_mask, lnet_process_id_t src,
unsigned int rlength, unsigned int roffset,
- __u64 match_bits, lnet_libmd_t *md, lnet_msg_t *msg,
- unsigned int *mlength_out, unsigned int *offset_out)
+ __u64 match_bits, lnet_libmd_t *md, lnet_msg_t *msg)
{
/* ALWAYS called holding the LNET_LOCK, and can't LNET_UNLOCK;
* lnet_match_blocked_msg() relies on this to avoid races */
index, libcfs_id2str(src), mlength, rlength,
md->md_lh.lh_cookie, md->md_niov, offset);
- lnet_commit_md(md, msg);
+ lnet_commit_md(md, msg, offset, mlength);
md->md_offset = offset + mlength;
- /* NB Caller will set ev.type and ev.hdr_data */
- msg->msg_ev.initiator = src;
- msg->msg_ev.pt_index = index;
- msg->msg_ev.match_bits = match_bits;
- msg->msg_ev.rlength = rlength;
- msg->msg_ev.mlength = mlength;
- msg->msg_ev.offset = offset;
-
- lnet_md_deconstruct(md, &msg->msg_ev.md);
- lnet_md2handle(&msg->msg_ev.md_handle, md);
-
- *offset_out = offset;
- *mlength_out = mlength;
-
/* Auto-unlink NOW, so the ME gets unlinked if required.
* We bumped md->md_refcount above so the MD just gets flagged
* for unlink when it is finalized. */
int
lnet_match_md(int index, int op_mask, lnet_process_id_t src,
unsigned int rlength, unsigned int roffset,
- __u64 match_bits, lnet_msg_t *msg,
- unsigned int *mlength_out, unsigned int *offset_out,
- lnet_libmd_t **md_out)
+ __u64 match_bits, lnet_msg_t *msg)
{
struct lnet_portal *ptl = the_lnet.ln_portals[index];
cfs_list_t *head;
LASSERT(me == md->md_me);
rc = lnet_try_match_md(index, op_mask, src, rlength,
- roffset, match_bits, md, msg,
- mlength_out, offset_out);
+ roffset, match_bits, md, msg);
switch (rc) {
default:
LBUG();
continue;
case LNET_MATCHMD_OK:
- *md_out = md;
return LNET_MATCHMD_OK;
case LNET_MATCHMD_DROP:
cfs_list_for_each_safe(entry, tmp, &ptl->ptl_msgq) {
int rc;
int index;
- unsigned int mlength;
- unsigned int offset;
lnet_hdr_t *hdr;
lnet_process_id_t src;
rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
hdr->payload_length,
hdr->msg.put.offset,
- hdr->msg.put.match_bits,
- md, msg, &mlength, &offset);
+ hdr->msg.put.match_bits, md, msg);
if (rc == LNET_MATCHMD_NONE)
continue;