+ lnet_hdr_t *hdr = &msg->msg_hdr;
+
+ LNET_LOCK();
+
+ the_lnet.ln_counters.recv_count++;
+ the_lnet.ln_counters.recv_length += mlength;
+
+ LNET_UNLOCK();
+
+ if (mlength != 0)
+ lnet_setpayloadbuffer(msg);
+
+ msg->msg_ev.type = LNET_EVENT_PUT;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.hdr_data = hdr->msg.put.hdr_data;
+
+ /* Must I ACK? If so I'll grab the ack_wmd out of the header and put
+ * it back into the ACK during lnet_finalize() */
+ msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
+ (md->md_options & LNET_MD_ACK_DISABLE) == 0);
+
+ lnet_ni_recv(msg->msg_rxpeer->lp_ni,
+ msg->msg_private,
+ msg, delayed, offset, mlength,
+ hdr->payload_length);
+}
+
+/* called with LNET_LOCK held */
+void
+lnet_match_blocked_msg(lnet_libmd_t *md)
+{
+ CFS_LIST_HEAD (drops);
+ CFS_LIST_HEAD (matches);
+ struct list_head *tmp;
+ struct list_head *entry;
+ lnet_msg_t *msg;
+ lnet_me_t *me = md->md_me;
+ lnet_portal_t *ptl = &the_lnet.ln_portals[me->me_portal];
+
+ LASSERT (me->me_portal < the_lnet.ln_nportals);
+
+ if ((ptl->ptl_options & LNET_PTL_LAZY) == 0) {
+ LASSERT (list_empty(&ptl->ptl_msgq));
+ return;
+ }
+
+ LASSERT (md->md_refcount == 0); /* a brand new MD */
+
+ list_for_each_safe (entry, tmp, &ptl->ptl_msgq) {
+ int rc;
+ int index;
+ unsigned int mlength;
+ unsigned int offset;
+ lnet_hdr_t *hdr;
+ lnet_process_id_t src;
+
+ msg = list_entry(entry, lnet_msg_t, msg_list);
+
+ LASSERT (msg->msg_delayed);
+
+ hdr = &msg->msg_hdr;
+ index = hdr->msg.put.ptl_index;
+
+ src.nid = hdr->src_nid;
+ src.pid = hdr->src_pid;
+
+ rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
+ hdr->payload_length,
+ hdr->msg.put.offset,
+ hdr->msg.put.match_bits,
+ md, msg, &mlength, &offset);
+
+ if (rc == LNET_MATCHMD_NONE)
+ continue;
+
+ /* Hurrah! This _is_ a match */
+ list_del(&msg->msg_list);
+ ptl->ptl_msgq_version++;
+
+ if (rc == LNET_MATCHMD_OK) {
+ list_add_tail(&msg->msg_list, &matches);
+
+ CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
+ "match "LPU64" offset %d length %d.\n",
+ libcfs_id2str(src),
+ hdr->msg.put.ptl_index,
+ hdr->msg.put.match_bits,
+ hdr->msg.put.offset,
+ hdr->payload_length);
+ } else {
+ LASSERT (rc == LNET_MATCHMD_DROP);
+
+ list_add_tail(&msg->msg_list, &drops);
+ }
+
+ if (lnet_md_exhausted(md))
+ break;
+ }
+
+ LNET_UNLOCK();
+
+ list_for_each_safe (entry, tmp, &drops) {
+ msg = list_entry(entry, lnet_msg_t, msg_list);
+
+ list_del(&msg->msg_list);
+
+ lnet_drop_delayed_put(msg, "Bad match");
+ }
+
+ list_for_each_safe (entry, tmp, &matches) {
+ msg = list_entry(entry, lnet_msg_t, msg_list);
+
+ list_del(&msg->msg_list);
+
+ /* md won't disappear under me, since each msg
+ * holds a ref on it */
+ lnet_recv_put(md, msg, 1,
+ msg->msg_ev.offset,
+ msg->msg_ev.mlength);
+ }
+
+ LNET_LOCK();
+}
+
+static int
+lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
+{
+ int rc;
+ int index;
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+ unsigned int rlength = hdr->payload_length;
+ unsigned int mlength = 0;
+ unsigned int offset = 0;
+ lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
+ /* .pid = */ hdr->src_pid};
+ lnet_libmd_t *md;
+
+ /* Convert put fields to host byte order */
+ hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
+ hdr->msg.put.ptl_index = le32_to_cpu(hdr->msg.put.ptl_index);
+ hdr->msg.put.offset = le32_to_cpu(hdr->msg.put.offset);
+
+ index = hdr->msg.put.ptl_index;
+
+ LNET_LOCK();
+
+ rc = lnet_match_md(index, LNET_MD_OP_PUT, src,
+ rlength, hdr->msg.put.offset,
+ hdr->msg.put.match_bits, msg,
+ &mlength, &offset, &md);
+ switch (rc) {
+ default:
+ LBUG();
+
+ case LNET_MATCHMD_OK:
+ LNET_UNLOCK();
+ lnet_recv_put(md, msg, 0, offset, mlength);
+ return 0;
+
+ case LNET_MATCHMD_NONE:
+ rc = lnet_eager_recv_locked(msg);
+ if (rc == 0 && !the_lnet.ln_shutdown) {
+ list_add_tail(&msg->msg_list,
+ &the_lnet.ln_portals[index].ptl_msgq);
+
+ the_lnet.ln_portals[index].ptl_msgq_version++;
+
+ CDEBUG(D_NET, "Delaying PUT from %s portal %d match "
+ LPU64" offset %d length %d: no match \n",
+ libcfs_id2str(src), index,
+ hdr->msg.put.match_bits,
+ hdr->msg.put.offset, rlength);
+
+ LNET_UNLOCK();
+ return 0;
+ }
+ /* fall through */
+
+ case LNET_MATCHMD_DROP:
+ CWARN("Dropping PUT from %s portal %d match "LPU64
+ " offset %d length %d: %d\n",
+ libcfs_id2str(src), index,
+ hdr->msg.put.match_bits,
+ hdr->msg.put.offset, rlength, rc);
+ LNET_UNLOCK();
+
+ return ENOENT; /* +ve: OK but no match */
+ }
+}
+
+static int
+lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
+{
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+ unsigned int mlength = 0;
+ unsigned int offset = 0;
+ lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
+ /* .pid = */ hdr->src_pid};
+ lnet_handle_wire_t reply_wmd;
+ lnet_libmd_t *md;
+ int rc;
+
+ /* Convert get fields to host byte order */
+ hdr->msg.get.match_bits = le64_to_cpu(hdr->msg.get.match_bits);
+ hdr->msg.get.ptl_index = le32_to_cpu(hdr->msg.get.ptl_index);
+ hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
+ hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
+
+ LNET_LOCK();
+
+ rc = lnet_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
+ hdr->msg.get.sink_length, hdr->msg.get.src_offset,
+ hdr->msg.get.match_bits, msg,
+ &mlength, &offset, &md);
+ if (rc == LNET_MATCHMD_DROP) {
+ CWARN("Dropping GET from %s portal %d match "LPU64
+ " offset %d length %d\n",
+ libcfs_id2str(src),
+ hdr->msg.get.ptl_index,
+ hdr->msg.get.match_bits,
+ hdr->msg.get.src_offset,
+ hdr->msg.get.sink_length);
+ LNET_UNLOCK();
+ return ENOENT; /* +ve: OK but no match */
+ }
+
+ LASSERT (rc == LNET_MATCHMD_OK);
+
+ the_lnet.ln_counters.send_count++;
+ the_lnet.ln_counters.send_length += mlength;
+
+ LNET_UNLOCK();
+
+ reply_wmd = hdr->msg.get.return_wmd;
+
+ lnet_prep_send(msg, LNET_MSG_REPLY, src, offset, mlength);
+
+ msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
+
+ msg->msg_ev.type = LNET_EVENT_GET;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.hdr_data = 0;
+
+ if (rdma_get) {
+ /* The LND completes the REPLY from her recv procedure */
+ lnet_ni_recv(ni, msg->msg_private, msg, 0,
+ msg->msg_offset, msg->msg_len, msg->msg_len);
+ return 0;
+ }
+
+ lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
+ msg->msg_receiving = 0;
+
+ rc = lnet_send(ni->ni_nid, msg);
+ if (rc < 0) {
+ /* didn't get as far as lnet_ni_send() */
+ CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rc);
+
+ lnet_finalize(ni, msg, rc);
+ }
+
+ return 0;
+}
+
+static int
+lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
+{
+ void *private = msg->msg_private;
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+ lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
+ /* .pid = */ hdr->src_pid};
+ lnet_libmd_t *md;
+ int rlength;
+ int mlength;
+
+ LNET_LOCK();
+
+ /* NB handles only looked up by creator (no flips) */
+ md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
+ if (md == NULL || md->md_threshold == 0) {
+ CWARN("%s: Dropping REPLY from %s for %s "
+ "MD "LPX64"."LPX64"\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
+ (md == NULL) ? "invalid" : "inactive",
+ hdr->msg.reply.dst_wmd.wh_interface_cookie,
+ hdr->msg.reply.dst_wmd.wh_object_cookie);
+
+ LNET_UNLOCK();
+ return ENOENT; /* +ve: OK but no match */
+ }
+
+ LASSERT (md->md_offset == 0);
+
+ rlength = hdr->payload_length;
+ mlength = MIN(rlength, md->md_length);
+
+ if (mlength < rlength &&
+ (md->md_options & LNET_MD_TRUNCATE) == 0) {
+ CERROR ("%s: Dropping REPLY from %s length %d "
+ "for MD "LPX64" would overflow (%d)\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
+ rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
+ mlength);
+ LNET_UNLOCK();
+ return ENOENT; /* +ve: OK but no match */
+ }
+
+ CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md "LPX64"\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
+ mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
+
+ lnet_commit_md(md, msg);
+
+ if (mlength != 0)
+ lnet_setpayloadbuffer(msg);
+
+ msg->msg_ev.type = LNET_EVENT_REPLY;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.initiator = src;
+ msg->msg_ev.rlength = rlength;
+ msg->msg_ev.mlength = mlength;
+ msg->msg_ev.offset = 0;
+
+ lnet_md_deconstruct(md, &msg->msg_ev.md);
+ lnet_md2handle(&msg->msg_ev.md_handle, md);
+
+ the_lnet.ln_counters.recv_count++;
+ the_lnet.ln_counters.recv_length += mlength;
+
+ LNET_UNLOCK();
+
+ lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
+ return 0;
+}
+
+static int
+lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
+{
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+ lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
+ /* .pid = */ hdr->src_pid};
+ lnet_libmd_t *md;
+
+ /* Convert ack fields to host byte order */
+ hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
+ hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
+
+ LNET_LOCK();
+
+ /* NB handles only looked up by creator (no flips) */
+ md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
+ if (md == NULL || md->md_threshold == 0) {
+#if 0
+ /* Don't moan; this is expected */
+ CERROR ("%s: Dropping ACK from %s to %s MD "LPX64"."LPX64"\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
+ (md == NULL) ? "invalid" : "inactive",
+ hdr->msg.ack.dst_wmd.wh_interface_cookie,
+ hdr->msg.ack.dst_wmd.wh_object_cookie);
+#endif
+ LNET_UNLOCK();
+ return ENOENT; /* +ve! */
+ }
+
+ CDEBUG(D_NET, "%s: ACK from %s into md "LPX64"\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
+ hdr->msg.ack.dst_wmd.wh_object_cookie);
+
+ lnet_commit_md(md, msg);
+
+ msg->msg_ev.type = LNET_EVENT_ACK;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.initiator = src;
+ msg->msg_ev.mlength = hdr->msg.ack.mlength;
+ msg->msg_ev.match_bits = hdr->msg.ack.match_bits;
+
+ lnet_md_deconstruct(md, &msg->msg_ev.md);
+ lnet_md2handle(&msg->msg_ev.md_handle, md);
+
+ the_lnet.ln_counters.recv_count++;
+
+ LNET_UNLOCK();
+
+ lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
+ return 0;
+}
+
+char *
+lnet_msgtyp2str (int type)
+{
+ switch (type) {
+ case LNET_MSG_ACK: