+ if (msg->msg_peerrtrcredit) {
+ /* give back peer router credits */
+ msg->msg_peerrtrcredit = 0;
+
+ LASSERT((rxpeer->lp_rtrcredits < 0) ==
+ !cfs_list_empty(&rxpeer->lp_rtrq));
+
+ rxpeer->lp_rtrcredits++;
+ if (rxpeer->lp_rtrcredits <= 0) {
+ msg2 = cfs_list_entry(rxpeer->lp_rtrq.next,
+ lnet_msg_t, msg_list);
+ cfs_list_del(&msg2->msg_list);
+
+ (void) lnet_post_routed_recv_locked(msg2, 1);
+ }
+ }
+#else
+ LASSERT (!msg->msg_rtrcredit);
+ LASSERT (!msg->msg_peerrtrcredit);
+#endif
+ if (rxpeer != NULL) {
+ msg->msg_rxpeer = NULL;
+ lnet_peer_decref_locked(rxpeer);
+ }
+}
+
+static lnet_peer_t *
+lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target)
+{
+ lnet_remotenet_t *rnet;
+ lnet_route_t *rtr;
+ lnet_route_t *rtr_best;
+ lnet_route_t *rtr_last;
+ struct lnet_peer *lp_best;
+ struct lnet_peer *lp;
+ int rc;
+
+ rnet = lnet_find_net_locked(LNET_NIDNET(target));
+ if (rnet == NULL)
+ return NULL;
+
+ lp_best = NULL;
+ rtr_best = rtr_last = NULL;
+ cfs_list_for_each_entry(rtr, &rnet->lrn_routes, lr_list) {
+ lp = rtr->lr_gateway;
+
+ if (!lp->lp_alive || /* gateway is down */
+ (lp->lp_ping_version == LNET_PROTO_PING_VERSION &&
+ rtr->lr_downis != 0)) /* NI to target is down */
+ continue;
+
+ if (ni != NULL && lp->lp_ni != ni)
+ continue;
+
+ if (lp_best == NULL) {
+ rtr_best = rtr_last = rtr;
+ lp_best = lp;
+ continue;
+ }
+
+ rc = lnet_compare_routes(rtr, rtr_best);
+ if (rc < 0)
+ continue;
+
+ rtr_best = rtr;
+ lp_best = lp;
+ }
+
+ if (rtr_best != NULL) {
+ /* Place selected route at the end of the route list to ensure
+ * fairness; everything else being equal... */
+ cfs_list_del(&rtr_best->lr_list);
+ cfs_list_add_tail(&rtr_best->lr_list, &rnet->lrn_routes);
+ }
+
+ return lp_best;
+}
+
+int
+lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
+{
+ lnet_nid_t dst_nid = msg->msg_target.nid;
+ lnet_ni_t *src_ni;
+ lnet_ni_t *local_ni;
+ lnet_peer_t *lp;
+ int rc;
+
+ LASSERT (msg->msg_txpeer == NULL);
+ LASSERT (!msg->msg_sending);
+ LASSERT (!msg->msg_target_is_router);
+ LASSERT (!msg->msg_receiving);
+
+ msg->msg_sending = 1;
+
+ /* NB! ni != NULL == interface pre-determined (ACK/REPLY) */
+
+ LNET_LOCK();
+
+ if (the_lnet.ln_shutdown) {
+ LNET_UNLOCK();
+ return -ESHUTDOWN;
+ }
+
+ if (src_nid == LNET_NID_ANY) {
+ src_ni = NULL;
+ } else {
+ src_ni = lnet_nid2ni_locked(src_nid);
+ if (src_ni == NULL) {
+ LNET_UNLOCK();
+ LCONSOLE_WARN("Can't send to %s: src %s is not a "
+ "local nid\n", libcfs_nid2str(dst_nid),
+ libcfs_nid2str(src_nid));
+ return -EINVAL;
+ }
+ LASSERT (!msg->msg_routing);
+ }
+
+ /* Is this for someone on a local network? */
+ local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid));
+
+ if (local_ni != NULL) {
+ if (src_ni == NULL) {
+ src_ni = local_ni;
+ src_nid = src_ni->ni_nid;
+ } else if (src_ni == local_ni) {
+ lnet_ni_decref_locked(local_ni);
+ } else {
+ lnet_ni_decref_locked(local_ni);
+ lnet_ni_decref_locked(src_ni);
+ LNET_UNLOCK();
+ LCONSOLE_WARN("No route to %s via from %s\n",
+ libcfs_nid2str(dst_nid),
+ libcfs_nid2str(src_nid));
+ return -EINVAL;
+ }
+
+ LASSERT (src_nid != LNET_NID_ANY);
+
+ if (!msg->msg_routing)
+ msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
+
+ if (src_ni == the_lnet.ln_loni) {
+ /* No send credit hassles with LOLND */
+ LNET_UNLOCK();
+ lnet_ni_send(src_ni, msg);
+ lnet_ni_decref(src_ni);
+ return 0;
+ }
+
+ rc = lnet_nid2peer_locked(&lp, dst_nid);
+ lnet_ni_decref_locked(src_ni); /* lp has ref on src_ni; lose mine */
+ if (rc != 0) {
+ LNET_UNLOCK();
+ LCONSOLE_WARN("Error %d finding peer %s\n", rc,
+ libcfs_nid2str(dst_nid));
+ /* ENOMEM or shutting down */
+ return rc;
+ }
+ LASSERT (lp->lp_ni == src_ni);
+ } else {
+#ifndef __KERNEL__
+ LNET_UNLOCK();
+
+ /* NB
+ * - once application finishes computation, check here to update
+ * router states before it waits for pending IO in LNetEQPoll
+ * - recursion breaker: router checker sends no message
+ * to remote networks */
+ if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING)
+ lnet_router_checker();
+
+ LNET_LOCK();
+#endif
+ /* sending to a remote network */
+ lp = lnet_find_route_locked(src_ni, dst_nid);
+ if (lp == NULL) {
+ if (src_ni != NULL)
+ lnet_ni_decref_locked(src_ni);
+ LNET_UNLOCK();
+
+ LCONSOLE_WARN("No route to %s via %s "
+ "(all routers down)\n",
+ libcfs_id2str(msg->msg_target),
+ libcfs_nid2str(src_nid));
+ return -EHOSTUNREACH;
+ }
+
+ CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
+ libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
+ lnet_msgtyp2str(msg->msg_type), msg->msg_len);
+
+ if (src_ni == NULL) {
+ src_ni = lp->lp_ni;
+ src_nid = src_ni->ni_nid;
+ } else {
+ LASSERT (src_ni == lp->lp_ni);
+ lnet_ni_decref_locked(src_ni);
+ }
+
+ lnet_peer_addref_locked(lp);
+
+ LASSERT (src_nid != LNET_NID_ANY);
+
+ if (!msg->msg_routing) {
+ /* I'm the source and now I know which NI to send on */
+ msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
+ }
+
+ msg->msg_target_is_router = 1;
+ msg->msg_target.nid = lp->lp_nid;
+ msg->msg_target.pid = LUSTRE_SRV_LNET_PID;
+ }
+
+ /* 'lp' is our best choice of peer */
+
+ LASSERT (!msg->msg_peertxcredit);
+ LASSERT (!msg->msg_txcredit);
+ LASSERT (msg->msg_txpeer == NULL);
+
+ msg->msg_txpeer = lp; /* msg takes my ref on lp */
+
+ rc = lnet_post_send_locked(msg, 0);
+ LNET_UNLOCK();
+
+ if (rc == EHOSTUNREACH)
+ return -EHOSTUNREACH;
+
+ if (rc == 0)
+ lnet_ni_send(src_ni, msg);
+
+ return 0;
+}
+
+void
+lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
+{
+ /* ALWAYS called holding the LNET_LOCK */
+ /* Here, we commit the MD to a network OP by marking it busy and
+ * decrementing its threshold. Come what may, the network "owns"
+ * the MD until a call to lnet_finalize() signals completion. */
+ LASSERT (!msg->msg_routing);
+
+ msg->msg_md = md;
+
+ md->md_refcount++;
+ if (md->md_threshold != LNET_MD_THRESH_INF) {
+ LASSERT (md->md_threshold > 0);
+ md->md_threshold--;
+ }
+
+ the_lnet.ln_counters.msgs_alloc++;
+ if (the_lnet.ln_counters.msgs_alloc >
+ the_lnet.ln_counters.msgs_max)
+ the_lnet.ln_counters.msgs_max =
+ the_lnet.ln_counters.msgs_alloc;
+
+ LASSERT (!msg->msg_onactivelist);
+ msg->msg_onactivelist = 1;
+ cfs_list_add(&msg->msg_activelist,
+ &the_lnet.ln_msg_container.msc_active);
+}
+
+static void
+lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
+{
+ LNET_LOCK();
+ the_lnet.ln_counters.drop_count++;
+ the_lnet.ln_counters.drop_length += nob;
+ LNET_UNLOCK();
+
+ lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
+}
+
+static void
+lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
+ unsigned int offset, unsigned int mlength)
+{
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+
+ LNET_LOCK();
+
+ the_lnet.ln_counters.recv_count++;
+ the_lnet.ln_counters.recv_length += mlength;
+
+ LNET_UNLOCK();
+
+ if (mlength != 0)
+ lnet_setpayloadbuffer(msg);
+
+ msg->msg_ev.type = LNET_EVENT_PUT;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.hdr_data = hdr->msg.put.hdr_data;
+
+ /* Must I ACK? If so I'll grab the ack_wmd out of the header and put
+ * it back into the ACK during lnet_finalize() */
+ msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
+ (md->md_options & LNET_MD_ACK_DISABLE) == 0);
+
+ lnet_ni_recv(msg->msg_rxpeer->lp_ni,
+ msg->msg_private,
+ msg, delayed, offset, mlength,
+ hdr->payload_length);
+}
+
+static int
+lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
+{
+ int rc;
+ int index;
+ __u64 version;
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+ unsigned int rlength = hdr->payload_length;
+ unsigned int mlength = 0;
+ unsigned int offset = 0;
+ lnet_process_id_t src= {0};
+ lnet_libmd_t *md;
+ lnet_portal_t *ptl;
+
+ src.nid = hdr->src_nid;
+ src.pid = hdr->src_pid;
+
+ /* Convert put fields to host byte order */
+ hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
+ hdr->msg.put.ptl_index = le32_to_cpu(hdr->msg.put.ptl_index);
+ hdr->msg.put.offset = le32_to_cpu(hdr->msg.put.offset);
+
+ index = hdr->msg.put.ptl_index;
+
+ LNET_LOCK();
+
+ again:
+ rc = lnet_match_md(index, LNET_MD_OP_PUT, src,
+ rlength, hdr->msg.put.offset,
+ hdr->msg.put.match_bits, msg,
+ &mlength, &offset, &md);
+ switch (rc) {
+ default:
+ LBUG();
+
+ case LNET_MATCHMD_OK:
+ LNET_UNLOCK();
+ lnet_recv_put(md, msg, msg->msg_delayed, offset, mlength);
+ return 0;
+
+ case LNET_MATCHMD_NONE:
+ ptl = the_lnet.ln_portals[index];
+ version = ptl->ptl_ml_version;
+
+ rc = 0;
+ if (!msg->msg_delayed)
+ rc = lnet_eager_recv_locked(msg);
+
+ if (rc == 0 &&
+ !the_lnet.ln_shutdown &&
+ lnet_ptl_is_lazy(ptl)) {
+ if (version != ptl->ptl_ml_version)
+ goto again;
+
+ cfs_list_add_tail(&msg->msg_list, &ptl->ptl_msgq);
+ ptl->ptl_msgq_version++;
+ LNET_UNLOCK();
+
+ CDEBUG(D_NET, "Delaying PUT from %s portal %d match "
+ LPU64" offset %d length %d: no match \n",
+ libcfs_id2str(src), index,
+ hdr->msg.put.match_bits,
+ hdr->msg.put.offset, rlength);
+ return 0;
+ }
+ /* fall through */
+
+ case LNET_MATCHMD_DROP:
+ CNETERR("Dropping PUT from %s portal %d match "LPU64
+ " offset %d length %d: %d\n",
+ libcfs_id2str(src), index,
+ hdr->msg.put.match_bits,
+ hdr->msg.put.offset, rlength, rc);
+ LNET_UNLOCK();
+
+ return ENOENT; /* +ve: OK but no match */
+ }
+}
+
+static int
+lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
+{
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+ unsigned int mlength = 0;
+ unsigned int offset = 0;
+ lnet_process_id_t src = {0};
+ lnet_handle_wire_t reply_wmd;
+ lnet_libmd_t *md;
+ int rc;
+
+ src.nid = hdr->src_nid;
+ src.pid = hdr->src_pid;
+
+ /* Convert get fields to host byte order */
+ hdr->msg.get.match_bits = le64_to_cpu(hdr->msg.get.match_bits);
+ hdr->msg.get.ptl_index = le32_to_cpu(hdr->msg.get.ptl_index);
+ hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
+ hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
+
+ LNET_LOCK();
+
+ rc = lnet_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
+ hdr->msg.get.sink_length, hdr->msg.get.src_offset,
+ hdr->msg.get.match_bits, msg,
+ &mlength, &offset, &md);
+ if (rc == LNET_MATCHMD_DROP) {
+ CNETERR("Dropping GET from %s portal %d match "LPU64
+ " offset %d length %d\n",
+ libcfs_id2str(src),
+ hdr->msg.get.ptl_index,
+ hdr->msg.get.match_bits,
+ hdr->msg.get.src_offset,
+ hdr->msg.get.sink_length);
+ LNET_UNLOCK();
+ return ENOENT; /* +ve: OK but no match */
+ }
+
+ LASSERT (rc == LNET_MATCHMD_OK);
+
+ the_lnet.ln_counters.send_count++;
+ the_lnet.ln_counters.send_length += mlength;
+
+ LNET_UNLOCK();
+
+ msg->msg_ev.type = LNET_EVENT_GET;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.hdr_data = 0;
+
+ reply_wmd = hdr->msg.get.return_wmd;
+
+ lnet_prep_send(msg, LNET_MSG_REPLY, src, offset, mlength);
+
+ msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
+
+ if (rdma_get) {
+ /* The LND completes the REPLY from her recv procedure */
+ lnet_ni_recv(ni, msg->msg_private, msg, 0,
+ msg->msg_offset, msg->msg_len, msg->msg_len);
+ return 0;
+ }
+
+ lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
+ msg->msg_receiving = 0;
+
+ rc = lnet_send(ni->ni_nid, msg);
+ if (rc < 0) {
+ /* didn't get as far as lnet_ni_send() */
+ CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rc);
+
+ lnet_finalize(ni, msg, rc);
+ }
+
+ return 0;
+}
+
+static int
+lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
+{
+ void *private = msg->msg_private;
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+ lnet_process_id_t src = {0};
+ lnet_libmd_t *md;
+ int rlength;
+ int mlength;
+
+ LNET_LOCK();
+
+ src.nid = hdr->src_nid;
+ src.pid = hdr->src_pid;
+
+ /* NB handles only looked up by creator (no flips) */
+ md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
+ if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
+ CNETERR("%s: Dropping REPLY from %s for %s "
+ "MD "LPX64"."LPX64"\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
+ (md == NULL) ? "invalid" : "inactive",
+ hdr->msg.reply.dst_wmd.wh_interface_cookie,
+ hdr->msg.reply.dst_wmd.wh_object_cookie);
+ if (md != NULL && md->md_me != NULL)
+ CERROR("REPLY MD also attached to portal %d\n",
+ md->md_me->me_portal);
+
+ LNET_UNLOCK();
+ return ENOENT; /* +ve: OK but no match */
+ }
+
+ LASSERT (md->md_offset == 0);
+
+ rlength = hdr->payload_length;
+ mlength = MIN(rlength, (int)md->md_length);
+
+ if (mlength < rlength &&
+ (md->md_options & LNET_MD_TRUNCATE) == 0) {
+ CNETERR("%s: Dropping REPLY from %s length %d "
+ "for MD "LPX64" would overflow (%d)\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
+ rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
+ mlength);
+ LNET_UNLOCK();
+ return ENOENT; /* +ve: OK but no match */
+ }
+
+ CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md "LPX64"\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
+ mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
+
+ lnet_commit_md(md, msg);
+
+ if (mlength != 0)
+ lnet_setpayloadbuffer(msg);
+
+ msg->msg_ev.type = LNET_EVENT_REPLY;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.initiator = src;
+ msg->msg_ev.rlength = rlength;
+ msg->msg_ev.mlength = mlength;
+ msg->msg_ev.offset = 0;
+
+ lnet_md_deconstruct(md, &msg->msg_ev.md);
+ lnet_md2handle(&msg->msg_ev.md_handle, md);
+
+ the_lnet.ln_counters.recv_count++;
+ the_lnet.ln_counters.recv_length += mlength;
+
+ LNET_UNLOCK();
+
+ lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
+ return 0;
+}
+
+static int
+lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
+{
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+ lnet_process_id_t src = {0};
+ lnet_libmd_t *md;
+
+ src.nid = hdr->src_nid;
+ src.pid = hdr->src_pid;
+
+ /* Convert ack fields to host byte order */
+ hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
+ hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
+
+ LNET_LOCK();
+
+ /* NB handles only looked up by creator (no flips) */
+ md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
+ if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
+ /* Don't moan; this is expected */
+ CDEBUG(D_NET,
+ "%s: Dropping ACK from %s to %s MD "LPX64"."LPX64"\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
+ (md == NULL) ? "invalid" : "inactive",
+ hdr->msg.ack.dst_wmd.wh_interface_cookie,
+ hdr->msg.ack.dst_wmd.wh_object_cookie);
+ if (md != NULL && md->md_me != NULL)
+ CERROR("Source MD also attached to portal %d\n",
+ md->md_me->me_portal);
+
+ LNET_UNLOCK();
+ return ENOENT; /* +ve! */
+ }
+
+ CDEBUG(D_NET, "%s: ACK from %s into md "LPX64"\n",
+ libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
+ hdr->msg.ack.dst_wmd.wh_object_cookie);
+
+ lnet_commit_md(md, msg);
+
+ msg->msg_ev.type = LNET_EVENT_ACK;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.initiator = src;
+ msg->msg_ev.mlength = hdr->msg.ack.mlength;
+ msg->msg_ev.match_bits = hdr->msg.ack.match_bits;
+
+ lnet_md_deconstruct(md, &msg->msg_ev.md);
+ lnet_md2handle(&msg->msg_ev.md_handle, md);
+
+ the_lnet.ln_counters.recv_count++;
+
+ LNET_UNLOCK();
+
+ lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
+ return 0;
+}
+
+char *
+lnet_msgtyp2str (int type)
+{
+ switch (type) {
+ case LNET_MSG_ACK:
+ return ("ACK");
+ case LNET_MSG_PUT:
+ return ("PUT");
+ case LNET_MSG_GET:
+ return ("GET");
+ case LNET_MSG_REPLY:
+ return ("REPLY");
+ case LNET_MSG_HELLO:
+ return ("HELLO");
+ default:
+ return ("<UNKNOWN>");