- if (hdr->type == PTL_MSG_HELLO) {
- /* dest_nid is really ptl_magicversion_t */
- ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
-
- CERROR (LPU64": Dropping unexpected HELLO message: "
- "magic %d, version %d.%d from "LPD64"\n",
- nal->ni.nid, mv->magic,
- mv->version_major, mv->version_minor,
- hdr->src_nid);
- lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
- return (-1);
- }
-
- if (hdr->dest_nid != nal->ni.nid) {
- CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64
- " (not me)\n", nal->ni.nid, hdr_type_string (hdr),
- hdr->src_nid, hdr->dest_nid);
-
- state_lock (nal, &flags);
- nal->ni.counters.drop_count++;
- nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
- state_unlock (nal, &flags);
-
- lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
- return (-1);
- }
-
- if (!list_empty (&nal->ni.ni_test_peers) && /* normally we don't */
- fail_peer (nal, hdr->src_nid, 0)) /* shall we now? */
- {
- CERROR(LPU64": Dropping incoming %s from "LPU64
- ": simulated failure\n",
- nal->ni.nid, hdr_type_string (hdr),
- hdr->src_nid);
- return (-1);
+ /* sending to a remote network */
+ lp = lnet_find_route_locked(src_ni, dst_nid);
+ if (lp == NULL) {
+ if (src_ni != NULL)
+ lnet_ni_decref_locked(src_ni);
+ LNET_UNLOCK();
+
+ LCONSOLE_WARN("No route to %s via %s "
+ "(all routers down)\n",
+ libcfs_id2str(msg->msg_target),
+ libcfs_nid2str(src_nid));
+ return -EHOSTUNREACH;
+ }
+
+ CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
+ libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
+ lnet_msgtyp2str(msg->msg_type), msg->msg_len);
+
+ if (src_ni == NULL) {
+ src_ni = lp->lp_ni;
+ src_nid = src_ni->ni_nid;
+ } else {
+ LASSERT (src_ni == lp->lp_ni);
+ lnet_ni_decref_locked(src_ni);
+ }
+
+ lnet_peer_addref_locked(lp);
+
+ LASSERT (src_nid != LNET_NID_ANY);
+
+ if (!msg->msg_routing) {
+ /* I'm the source and now I know which NI to send on */
+ msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
+ }
+
+ msg->msg_target_is_router = 1;
+ msg->msg_target.nid = lp->lp_nid;
+ msg->msg_target.pid = LUSTRE_SRV_LNET_PID;
+ }
+
+ /* 'lp' is our best choice of peer */
+
+ LASSERT (!msg->msg_peertxcredit);
+ LASSERT (!msg->msg_txcredit);
+ LASSERT (msg->msg_txpeer == NULL);
+
+ msg->msg_txpeer = lp; /* msg takes my ref on lp */
+
+ rc = lnet_post_send_locked(msg, 0);
+ LNET_UNLOCK();
+
+ if (rc == EHOSTUNREACH)
+ return -EHOSTUNREACH;
+
+ if (rc == 0)
+ lnet_ni_send(src_ni, msg);
+
+ return 0;
+}
+
+void
+lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
+{
+ /* ALWAYS called holding the LNET_LOCK */
+ /* Here, we commit the MD to a network OP by marking it busy and
+ * decrementing its threshold. Come what may, the network "owns"
+ * the MD until a call to lnet_finalize() signals completion. */
+ LASSERT (!msg->msg_routing);
+
+ msg->msg_md = md;
+
+ md->md_refcount++;
+ if (md->md_threshold != LNET_MD_THRESH_INF) {
+ LASSERT (md->md_threshold > 0);
+ md->md_threshold--;
+ }
+
+ the_lnet.ln_counters.msgs_alloc++;
+ if (the_lnet.ln_counters.msgs_alloc >
+ the_lnet.ln_counters.msgs_max)
+ the_lnet.ln_counters.msgs_max =
+ the_lnet.ln_counters.msgs_alloc;
+
+ LASSERT (!msg->msg_onactivelist);
+ msg->msg_onactivelist = 1;
+ cfs_list_add(&msg->msg_activelist,
+ &the_lnet.ln_msg_container.msc_active);
+}
+
+static void
+lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
+{
+ LNET_LOCK();
+ the_lnet.ln_counters.drop_count++;
+ the_lnet.ln_counters.drop_length += nob;
+ LNET_UNLOCK();
+
+ lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
+}
+
+static void
+lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
+ unsigned int offset, unsigned int mlength)
+{
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+
+ LNET_LOCK();
+
+ the_lnet.ln_counters.recv_count++;
+ the_lnet.ln_counters.recv_length += mlength;
+
+ LNET_UNLOCK();
+
+ if (mlength != 0)
+ lnet_setpayloadbuffer(msg);
+
+ msg->msg_ev.type = LNET_EVENT_PUT;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.hdr_data = hdr->msg.put.hdr_data;
+
+ /* Must I ACK? If so I'll grab the ack_wmd out of the header and put
+ * it back into the ACK during lnet_finalize() */
+ msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
+ (md->md_options & LNET_MD_ACK_DISABLE) == 0);
+
+ lnet_ni_recv(msg->msg_rxpeer->lp_ni,
+ msg->msg_private,
+ msg, delayed, offset, mlength,
+ hdr->payload_length);
+}
+
+static int
+lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
+{
+ int rc;
+ int index;
+ __u64 version;
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+ unsigned int rlength = hdr->payload_length;
+ unsigned int mlength = 0;
+ unsigned int offset = 0;
+ lnet_process_id_t src= {0};
+ lnet_libmd_t *md;
+ lnet_portal_t *ptl;
+
+ src.nid = hdr->src_nid;
+ src.pid = hdr->src_pid;
+
+ /* Convert put fields to host byte order */
+ hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
+ hdr->msg.put.ptl_index = le32_to_cpu(hdr->msg.put.ptl_index);
+ hdr->msg.put.offset = le32_to_cpu(hdr->msg.put.offset);
+
+ index = hdr->msg.put.ptl_index;
+
+ LNET_LOCK();
+
+ again:
+ rc = lnet_match_md(index, LNET_MD_OP_PUT, src,
+ rlength, hdr->msg.put.offset,
+ hdr->msg.put.match_bits, msg,
+ &mlength, &offset, &md);
+ switch (rc) {
+ default:
+ LBUG();
+
+ case LNET_MATCHMD_OK:
+ LNET_UNLOCK();
+ lnet_recv_put(md, msg, msg->msg_delayed, offset, mlength);
+ return 0;
+
+ case LNET_MATCHMD_NONE:
+ ptl = the_lnet.ln_portals[index];
+ version = ptl->ptl_ml_version;
+
+ rc = 0;
+ if (!msg->msg_delayed)
+ rc = lnet_eager_recv_locked(msg);
+
+ if (rc == 0 &&
+ !the_lnet.ln_shutdown &&
+ lnet_ptl_is_lazy(ptl)) {
+ if (version != ptl->ptl_ml_version)
+ goto again;
+
+ cfs_list_add_tail(&msg->msg_list, &ptl->ptl_msgq);
+ ptl->ptl_msgq_version++;
+ LNET_UNLOCK();
+
+ CDEBUG(D_NET, "Delaying PUT from %s portal %d match "
+ LPU64" offset %d length %d: no match \n",
+ libcfs_id2str(src), index,
+ hdr->msg.put.match_bits,
+ hdr->msg.put.offset, rlength);
+ return 0;
+ }
+ /* fall through */
+
+ case LNET_MATCHMD_DROP:
+ CNETERR("Dropping PUT from %s portal %d match "LPU64
+ " offset %d length %d: %d\n",
+ libcfs_id2str(src), index,
+ hdr->msg.put.match_bits,
+ hdr->msg.put.offset, rlength, rc);
+ LNET_UNLOCK();
+
+ return ENOENT; /* +ve: OK but no match */
+ }
+}
+
+static int
+lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
+{
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+ unsigned int mlength = 0;
+ unsigned int offset = 0;
+ lnet_process_id_t src = {0};
+ lnet_handle_wire_t reply_wmd;
+ lnet_libmd_t *md;
+ int rc;
+
+ src.nid = hdr->src_nid;
+ src.pid = hdr->src_pid;
+
+ /* Convert get fields to host byte order */
+ hdr->msg.get.match_bits = le64_to_cpu(hdr->msg.get.match_bits);
+ hdr->msg.get.ptl_index = le32_to_cpu(hdr->msg.get.ptl_index);
+ hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
+ hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
+
+ LNET_LOCK();
+
+ rc = lnet_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
+ hdr->msg.get.sink_length, hdr->msg.get.src_offset,
+ hdr->msg.get.match_bits, msg,
+ &mlength, &offset, &md);
+ if (rc == LNET_MATCHMD_DROP) {
+ CNETERR("Dropping GET from %s portal %d match "LPU64
+ " offset %d length %d\n",
+ libcfs_id2str(src),
+ hdr->msg.get.ptl_index,
+ hdr->msg.get.match_bits,
+ hdr->msg.get.src_offset,
+ hdr->msg.get.sink_length);
+ LNET_UNLOCK();
+ return ENOENT; /* +ve: OK but no match */
+ }
+
+ LASSERT (rc == LNET_MATCHMD_OK);
+
+ the_lnet.ln_counters.send_count++;
+ the_lnet.ln_counters.send_length += mlength;
+
+ LNET_UNLOCK();
+
+ msg->msg_ev.type = LNET_EVENT_GET;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.hdr_data = 0;
+
+ reply_wmd = hdr->msg.get.return_wmd;
+
+ lnet_prep_send(msg, LNET_MSG_REPLY, src, offset, mlength);
+
+ msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
+
+ if (rdma_get) {
+ /* The LND completes the REPLY from her recv procedure */
+ lnet_ni_recv(ni, msg->msg_private, msg, 0,
+ msg->msg_offset, msg->msg_len, msg->msg_len);
+ return 0;