- unsigned long flags;
-
- /* NB static check; optimizer will elide this if it's right */
- LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
- offsetof (ptl_hdr_t, msg.put.length));
- LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
- offsetof (ptl_hdr_t, msg.get.length));
- LASSERT (offsetof (ptl_hdr_t, msg.ack.length) ==
- offsetof (ptl_hdr_t, msg.reply.length));
-
- /* convert common fields to host byte order */
- hdr->dest_nid = NTOH__u64 (hdr->dest_nid);
- hdr->src_nid = NTOH__u64 (hdr->src_nid);
- hdr->dest_pid = NTOH__u32 (hdr->dest_pid);
- hdr->src_pid = NTOH__u32 (hdr->src_pid);
- hdr->type = NTOH__u32 (hdr->type);
- PTL_HDR_LENGTH(hdr) = NTOH__u32 (PTL_HDR_LENGTH(hdr));
-#if 0
- nal->cb_printf(nal, "%d: lib_parse: nal=%p hdr=%p type=%d\n",
- nal->ni.nid, nal, hdr, hdr->type);
- print_hdr(nal, hdr);
-#endif
- if (hdr->type == PTL_MSG_HELLO) {
- /* dest_nid is really ptl_magicversion_t */
- ptl_magicversion_t *mv = (ptl_magicversion_t *)&hdr->dest_nid;
-
- CERROR (LPU64": Dropping unexpected HELLO message: "
- "magic %d, version %d.%d from "LPD64"\n",
- nal->ni.nid, mv->magic,
- mv->version_major, mv->version_minor,
- hdr->src_nid);
- lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
- return (-1);
- }
-
- if (hdr->dest_nid != nal->ni.nid) {
- CERROR(LPU64": Dropping %s message from "LPU64" to "LPU64
- " (not me)\n", nal->ni.nid, hdr_type_string (hdr),
- hdr->src_nid, hdr->dest_nid);
-
- state_lock (nal, &flags);
- nal->ni.counters.drop_count++;
- nal->ni.counters.drop_length += PTL_HDR_LENGTH(hdr);
- state_unlock (nal, &flags);
-
- lib_recv (nal, private, NULL, NULL, 0, 0, PTL_HDR_LENGTH (hdr));
- return (-1);
- }
+ struct lnet_msg *msg;
+ lnet_peer_t *rxpeer;
+ lnet_pid_t dest_pid;
+ lnet_nid_t dest_nid;
+ lnet_nid_t src_nid;
+ __u32 payload_length;
+ __u32 type;
+ int for_me;
+ int cpt;
+ int rc = 0;
+
+ LASSERT(!in_interrupt());
+
+ type = le32_to_cpu(hdr->type);
+ src_nid = le64_to_cpu(hdr->src_nid);
+ dest_nid = le64_to_cpu(hdr->dest_nid);
+ dest_pid = le32_to_cpu(hdr->dest_pid);
+ payload_length = le32_to_cpu(hdr->payload_length);
+
+ for_me = (ni->ni_nid == dest_nid);
+ cpt = lnet_cpt_of_nid(from_nid);
+
+ switch (type) {
+ case LNET_MSG_ACK:
+ case LNET_MSG_GET:
+ if (payload_length > 0) {
+ CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
+ libcfs_nid2str(from_nid),
+ libcfs_nid2str(src_nid),
+ lnet_msgtyp2str(type), payload_length);
+ return -EPROTO;
+ }
+ break;
+
+ case LNET_MSG_PUT:
+ case LNET_MSG_REPLY:
+ if (payload_length >
+ (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
+ CERROR("%s, src %s: bad %s payload %d "
+ "(%d max expected)\n",
+ libcfs_nid2str(from_nid),
+ libcfs_nid2str(src_nid),
+ lnet_msgtyp2str(type),
+ payload_length,
+ for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
+ return -EPROTO;
+ }
+ break;
+
+ default:
+ CERROR("%s, src %s: Bad message type 0x%x\n",
+ libcfs_nid2str(from_nid),
+ libcfs_nid2str(src_nid), type);
+ return -EPROTO;
+ }
+
+ if (the_lnet.ln_routing &&
+ ni->ni_last_alive != cfs_time_current_sec()) {
+ /* NB: so far here is the only place to set NI status to "up */
+ lnet_ni_lock(ni);
+ ni->ni_last_alive = cfs_time_current_sec();
+ if (ni->ni_status != NULL &&
+ ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
+ ni->ni_status->ns_status = LNET_NI_STATUS_UP;
+ lnet_ni_unlock(ni);
+ }
+
+ /* Regard a bad destination NID as a protocol error. Senders should
+ * know what they're doing; if they don't they're misconfigured, buggy
+ * or malicious so we chop them off at the knees :) */
+
+ if (!for_me) {
+ if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
+ /* should have gone direct */
+ CERROR("%s, src %s: Bad dest nid %s "
+ "(should have been sent direct)\n",
+ libcfs_nid2str(from_nid),
+ libcfs_nid2str(src_nid),
+ libcfs_nid2str(dest_nid));
+ return -EPROTO;
+ }
+
+ if (lnet_islocalnid(dest_nid)) {
+ /* dest is another local NI; sender should have used
+ * this node's NID on its own network */
+ CERROR("%s, src %s: Bad dest nid %s "
+ "(it's my nid but on a different network)\n",
+ libcfs_nid2str(from_nid),
+ libcfs_nid2str(src_nid),
+ libcfs_nid2str(dest_nid));
+ return -EPROTO;
+ }
+
+ if (rdma_req && type == LNET_MSG_GET) {
+ CERROR("%s, src %s: Bad optimized GET for %s "
+ "(final destination must be me)\n",
+ libcfs_nid2str(from_nid),
+ libcfs_nid2str(src_nid),
+ libcfs_nid2str(dest_nid));
+ return -EPROTO;
+ }
+
+ if (!the_lnet.ln_routing) {
+ CERROR("%s, src %s: Dropping message for %s "
+ "(routing not enabled)\n",
+ libcfs_nid2str(from_nid),
+ libcfs_nid2str(src_nid),
+ libcfs_nid2str(dest_nid));
+ goto drop;
+ }
+ }
+
+ /* Message looks OK; we're not going to return an error, so we MUST
+ * call back lnd_recv() come what may... */
+
+ if (!list_empty(&the_lnet.ln_test_peers) && /* normally we don't */
+ fail_peer(src_nid, 0)) { /* shall we now? */
+ CERROR("%s, src %s: Dropping %s to simulate failure\n",
+ libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
+ lnet_msgtyp2str(type));
+ goto drop;
+ }
+
+ if (!list_empty(&the_lnet.ln_drop_rules) &&
+ lnet_drop_rule_match(hdr)) {
+ CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate"
+ "silent message loss\n",
+ libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
+ libcfs_nid2str(dest_nid), lnet_msgtyp2str(type));
+ goto drop;
+ }
+
+
+ msg = lnet_msg_alloc();
+ if (msg == NULL) {
+ CERROR("%s, src %s: Dropping %s (out of memory)\n",
+ libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
+ lnet_msgtyp2str(type));
+ goto drop;
+ }
+
+ /* msg zeroed in lnet_msg_alloc; i.e. flags all clear,
+ * pointers NULL etc */
+
+ msg->msg_type = type;
+ msg->msg_private = private;
+ msg->msg_receiving = 1;
+ msg->msg_rdma_get = rdma_req;
+ msg->msg_len = msg->msg_wanted = payload_length;
+ msg->msg_offset = 0;
+ msg->msg_hdr = *hdr;
+ /* for building message event */
+ msg->msg_from = from_nid;
+ if (!for_me) {
+ msg->msg_target.pid = dest_pid;
+ msg->msg_target.nid = dest_nid;
+ msg->msg_routing = 1;
+
+ } else {
+ /* convert common msg->hdr fields to host byteorder */
+ msg->msg_hdr.type = type;
+ msg->msg_hdr.src_nid = src_nid;
+ msg->msg_hdr.src_pid = le32_to_cpu(msg->msg_hdr.src_pid);
+ msg->msg_hdr.dest_nid = dest_nid;
+ msg->msg_hdr.dest_pid = dest_pid;
+ msg->msg_hdr.payload_length = payload_length;
+ }
+
+ lnet_net_lock(cpt);
+ rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
+ if (rc != 0) {
+ lnet_net_unlock(cpt);
+ CERROR("%s, src %s: Dropping %s "
+ "(error %d looking up sender)\n",
+ libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
+ lnet_msgtyp2str(type), rc);
+ lnet_msg_free(msg);
+ goto drop;
+ }
+
+ if (lnet_isrouter(msg->msg_rxpeer)) {
+ lnet_peer_set_alive(msg->msg_rxpeer);
+ if (avoid_asym_router_failure &&
+ LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
+ /* received a remote message from router, update
+ * remote NI status on this router.
+ * NB: multi-hop routed message will be ignored.
+ */
+ lnet_router_ni_update_locked(msg->msg_rxpeer,
+ LNET_NIDNET(src_nid));
+ }
+ }
+
+ lnet_msg_commit(msg, cpt);
+ /* LND just notified me for incoming message from rxpeer, so assume
+ * it is alive */
+ rxpeer = msg->msg_rxpeer;
+ rxpeer->lp_last_alive = rxpeer->lp_last_query = cfs_time_current();
+ if (!rxpeer->lp_alive)
+ lnet_notify_locked(rxpeer, 0, 1, rxpeer->lp_last_alive);
+
+ if (lnet_isrouter(msg->msg_rxpeer) &&
+ LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
+ lnet_router_ni_update_locked(msg->msg_rxpeer,
+ LNET_NIDNET(src_nid));
+ }
+
+ /* message delay simulation */
+ if (unlikely(!list_empty(&the_lnet.ln_delay_rules) &&
+ lnet_delay_rule_match_locked(hdr, msg))) {
+ lnet_net_unlock(cpt);
+ return 0;
+ }
+
+ if (!for_me) {
+ rc = lnet_parse_forward_locked(ni, msg);
+ lnet_net_unlock(cpt);
+
+ if (rc < 0)
+ goto free_drop;
+
+ if (rc == LNET_CREDIT_OK) {
+ lnet_ni_recv(ni, msg->msg_private, msg, 0,
+ 0, payload_length, payload_length);
+ }
+ return 0;
+ }
+
+ lnet_net_unlock(cpt);
+
+ rc = lnet_parse_local(ni, msg);
+ if (rc != 0)
+ goto free_drop;
+ return 0;
+
+ free_drop:
+ LASSERT(msg->msg_md == NULL);
+ lnet_finalize(ni, msg, rc);