+ /* ALWAYS called holding the LNET_LOCK */
+ LASSERT (msg->msg_routing);
+
+ the_lnet.ln_counters.msgs_alloc++;
+ if (the_lnet.ln_counters.msgs_alloc >
+ the_lnet.ln_counters.msgs_max)
+ the_lnet.ln_counters.msgs_max =
+ the_lnet.ln_counters.msgs_alloc;
+
+ the_lnet.ln_counters.route_count++;
+ the_lnet.ln_counters.route_length += msg->msg_len;
+
+ LASSERT (!msg->msg_onactivelist);
+ msg->msg_onactivelist = 1;
+ list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
+}
+
+lnet_rtrbufpool_t *
+lnet_msg2bufpool(lnet_msg_t *msg)
+{
+ lnet_rtrbufpool_t *rbp = &the_lnet.ln_rtrpools[0];
+
+ LASSERT (msg->msg_len <= LNET_MTU);
+ while (msg->msg_len > (unsigned int)rbp->rbp_npages * CFS_PAGE_SIZE) {
+ rbp++;
+ LASSERT (rbp < &the_lnet.ln_rtrpools[LNET_NRBPOOLS]);
+ }
+
+ return rbp;
+}
+
+int
+lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
+{
+ /* lnet_parse is going to LNET_UNLOCK immediately after this, so it
+ * sets do_recv FALSE and I don't do the unlock/send/lock bit. I
+ * return EAGAIN if msg blocked and 0 if sent or OK to send */
+ lnet_peer_t *lp = msg->msg_rxpeer;
+ lnet_rtrbufpool_t *rbp;
+ lnet_rtrbuf_t *rb;
+
+ LASSERT (msg->msg_iov == NULL);
+ LASSERT (msg->msg_kiov == NULL);
+ LASSERT (msg->msg_niov == 0);
+ LASSERT (msg->msg_routing);
+ LASSERT (msg->msg_receiving);
+ LASSERT (!msg->msg_sending);
+
+ /* non-lnet_parse callers only send delayed messages */
+ LASSERT (!do_recv || msg->msg_delayed);
+
+ if (!msg->msg_peerrtrcredit) {
+ LASSERT ((lp->lp_rtrcredits < 0) == !list_empty(&lp->lp_rtrq));
+
+ msg->msg_peerrtrcredit = 1;
+ lp->lp_rtrcredits--;
+ if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
+ lp->lp_minrtrcredits = lp->lp_rtrcredits;
+
+ if (lp->lp_rtrcredits < 0) {
+ /* must have checked eager_recv before here */
+ LASSERT (msg->msg_delayed);
+ list_add_tail(&msg->msg_list, &lp->lp_rtrq);
+ return EAGAIN;
+ }
+ }
+
+ rbp = lnet_msg2bufpool(msg);
+
+ if (!msg->msg_rtrcredit) {
+ LASSERT ((rbp->rbp_credits < 0) == !list_empty(&rbp->rbp_msgs));
+
+ msg->msg_rtrcredit = 1;
+ rbp->rbp_credits--;
+ if (rbp->rbp_credits < rbp->rbp_mincredits)
+ rbp->rbp_mincredits = rbp->rbp_credits;
+
+ if (rbp->rbp_credits < 0) {
+ /* must have checked eager_recv before here */
+ LASSERT (msg->msg_delayed);
+ list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
+ return EAGAIN;
+ }
+ }
+
+ LASSERT (!list_empty(&rbp->rbp_bufs));
+ rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
+ list_del(&rb->rb_list);
+
+ msg->msg_niov = rbp->rbp_npages;
+ msg->msg_kiov = &rb->rb_kiov[0];
+
+ if (do_recv) {
+ LNET_UNLOCK();
+ lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
+ 0, msg->msg_len, msg->msg_len);
+ LNET_LOCK();
+ }
+ return 0;
+}
+#endif
+
+void
+lnet_return_credits_locked (lnet_msg_t *msg)
+{
+ lnet_peer_t *txpeer = msg->msg_txpeer;
+ lnet_peer_t *rxpeer = msg->msg_rxpeer;
+ lnet_msg_t *msg2;
+ lnet_ni_t *ni;
+
+ if (msg->msg_txcredit) {
+ /* give back NI txcredits */
+ msg->msg_txcredit = 0;
+ ni = txpeer->lp_ni;
+
+ LASSERT((ni->ni_txcredits < 0) == !list_empty(&ni->ni_txq));
+
+ ni->ni_txcredits++;
+ if (ni->ni_txcredits <= 0) {
+ msg2 = list_entry(ni->ni_txq.next, lnet_msg_t, msg_list);
+ list_del(&msg2->msg_list);
+
+ LASSERT(msg2->msg_txpeer->lp_ni == ni);
+ LASSERT(msg2->msg_delayed);
+
+ (void) lnet_post_send_locked(msg2, 1);
+ }
+ }
+
+ if (msg->msg_peertxcredit) {
+ /* give back peer txcredits */
+ msg->msg_peertxcredit = 0;
+
+ LASSERT((txpeer->lp_txcredits < 0) == !list_empty(&txpeer->lp_txq));
+
+ txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
+ LASSERT (txpeer->lp_txqnob >= 0);
+
+ txpeer->lp_txcredits++;
+ if (txpeer->lp_txcredits <= 0) {
+ msg2 = list_entry(txpeer->lp_txq.next,
+ lnet_msg_t, msg_list);
+ list_del(&msg2->msg_list);
+
+ LASSERT (msg2->msg_txpeer == txpeer);
+ LASSERT (msg2->msg_delayed);
+
+ (void) lnet_post_send_locked(msg2, 1);
+ }
+ }
+
+ if (txpeer != NULL) {
+ msg->msg_txpeer = NULL;
+ lnet_peer_decref_locked(txpeer);
+ }
+
+#ifdef __KERNEL__
+ if (msg->msg_rtrcredit) {
+ /* give back global router credits */
+ lnet_rtrbuf_t *rb;
+ lnet_rtrbufpool_t *rbp;
+
+ /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
+ * there until it gets one allocated, or aborts the wait
+ * itself */
+ LASSERT (msg->msg_kiov != NULL);
+
+ rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
+ rbp = rb->rb_pool;
+ LASSERT (rbp == lnet_msg2bufpool(msg));
+
+ msg->msg_kiov = NULL;
+ msg->msg_rtrcredit = 0;
+
+ LASSERT((rbp->rbp_credits < 0) == !list_empty(&rbp->rbp_msgs));
+ LASSERT((rbp->rbp_credits > 0) == !list_empty(&rbp->rbp_bufs));
+
+ list_add(&rb->rb_list, &rbp->rbp_bufs);
+ rbp->rbp_credits++;
+ if (rbp->rbp_credits <= 0) {
+ msg2 = list_entry(rbp->rbp_msgs.next,
+ lnet_msg_t, msg_list);
+ list_del(&msg2->msg_list);
+
+ (void) lnet_post_routed_recv_locked(msg2, 1);
+ }
+ }
+
+ if (msg->msg_peerrtrcredit) {
+ /* give back peer router credits */
+ msg->msg_peerrtrcredit = 0;
+
+ LASSERT((rxpeer->lp_rtrcredits < 0) == !list_empty(&rxpeer->lp_rtrq));
+
+ rxpeer->lp_rtrcredits++;
+ if (rxpeer->lp_rtrcredits <= 0) {
+ msg2 = list_entry(rxpeer->lp_rtrq.next,
+ lnet_msg_t, msg_list);
+ list_del(&msg2->msg_list);
+
+ (void) lnet_post_routed_recv_locked(msg2, 1);
+ }
+ }
+#else
+ LASSERT (!msg->msg_rtrcredit);
+ LASSERT (!msg->msg_peerrtrcredit);
+#endif
+ if (rxpeer != NULL) {
+ msg->msg_rxpeer = NULL;
+ lnet_peer_decref_locked(rxpeer);
+ }
+}
+
+int
+lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
+{
+ lnet_nid_t dst_nid = msg->msg_target.nid;
+ lnet_ni_t *src_ni;
+ lnet_ni_t *local_ni;
+ lnet_remotenet_t *rnet;
+ lnet_route_t *route;
+ lnet_route_t *best_route;
+ struct list_head *tmp;
+ lnet_peer_t *lp;
+ lnet_peer_t *lp2;
+ int rc;