+ if (rbp->rbp_credits < 0) {
+ /* must have checked eager_recv before here */
+ LASSERT (msg->msg_delayed);
+ list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
+ return EAGAIN;
+ }
+ }
+
+ LASSERT (!list_empty(&rbp->rbp_bufs));
+ rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
+ list_del(&rb->rb_list);
+
+ msg->msg_niov = rbp->rbp_npages;
+ msg->msg_kiov = &rb->rb_kiov[0];
+
+ if (do_recv) {
+ LNET_UNLOCK();
+ lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
+ 0, msg->msg_len, msg->msg_len);
+ LNET_LOCK();
+ }
+ return 0;
+}
+#endif
+
+void
+lnet_return_credits_locked (lnet_msg_t *msg)
+{
+ lnet_peer_t *txpeer = msg->msg_txpeer;
+ lnet_peer_t *rxpeer = msg->msg_rxpeer;
+ lnet_msg_t *msg2;
+ lnet_ni_t *ni;
+
+ if (msg->msg_txcredit) {
+ /* give back NI txcredits */
+ msg->msg_txcredit = 0;
+ ni = txpeer->lp_ni;
+
+ LASSERT((ni->ni_txcredits < 0) == !list_empty(&ni->ni_txq));
+
+ ni->ni_txcredits++;
+ if (ni->ni_txcredits <= 0) {
+ msg2 = list_entry(ni->ni_txq.next, lnet_msg_t, msg_list);
+ list_del(&msg2->msg_list);
+
+ LASSERT(msg2->msg_txpeer->lp_ni == ni);
+ LASSERT(msg2->msg_delayed);
+
+ (void) lnet_post_send_locked(msg2, 1);
+ }
+ }
+
+ if (msg->msg_peertxcredit) {
+ /* give back peer txcredits */
+ msg->msg_peertxcredit = 0;
+
+ LASSERT((txpeer->lp_txcredits < 0) == !list_empty(&txpeer->lp_txq));
+
+ txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
+ LASSERT (txpeer->lp_txqnob >= 0);
+
+ txpeer->lp_txcredits++;
+ if (txpeer->lp_txcredits <= 0) {
+ msg2 = list_entry(txpeer->lp_txq.next,
+ lnet_msg_t, msg_list);
+ list_del(&msg2->msg_list);
+
+ LASSERT (msg2->msg_txpeer == txpeer);
+ LASSERT (msg2->msg_delayed);
+
+ (void) lnet_post_send_locked(msg2, 1);
+ }
+ }
+
+ if (txpeer != NULL) {
+ msg->msg_txpeer = NULL;
+ lnet_peer_decref_locked(txpeer);
+ }
+
+#ifdef __KERNEL__
+ if (msg->msg_rtrcredit) {
+ /* give back global router credits */
+ lnet_rtrbuf_t *rb;
+ lnet_rtrbufpool_t *rbp;
+
+ /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
+ * there until it gets one allocated, or aborts the wait
+ * itself */
+ LASSERT (msg->msg_kiov != NULL);
+
+ rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
+ rbp = rb->rb_pool;
+ LASSERT (rbp == lnet_msg2bufpool(msg));
+
+ msg->msg_kiov = NULL;
+ msg->msg_rtrcredit = 0;
+
+ LASSERT((rbp->rbp_credits < 0) == !list_empty(&rbp->rbp_msgs));
+ LASSERT((rbp->rbp_credits > 0) == !list_empty(&rbp->rbp_bufs));
+
+ list_add(&rb->rb_list, &rbp->rbp_bufs);
+ rbp->rbp_credits++;
+ if (rbp->rbp_credits <= 0) {
+ msg2 = list_entry(rbp->rbp_msgs.next,
+ lnet_msg_t, msg_list);
+ list_del(&msg2->msg_list);
+
+ (void) lnet_post_routed_recv_locked(msg2, 1);
+ }
+ }
+
+ if (msg->msg_peerrtrcredit) {
+ /* give pack peer router credits */
+ msg->msg_peerrtrcredit = 0;
+
+ LASSERT((rxpeer->lp_rtrcredits < 0) == !list_empty(&rxpeer->lp_rtrq));
+
+ rxpeer->lp_rtrcredits++;
+ if (rxpeer->lp_rtrcredits <= 0) {
+ msg2 = list_entry(rxpeer->lp_rtrq.next,
+ lnet_msg_t, msg_list);
+ list_del(&msg2->msg_list);
+
+ (void) lnet_post_routed_recv_locked(msg2, 1);
+ }
+ }
+#else
+ LASSERT (!msg->msg_rtrcredit);
+ LASSERT (!msg->msg_peerrtrcredit);
+#endif
+ if (rxpeer != NULL) {
+ msg->msg_rxpeer = NULL;
+ lnet_peer_decref_locked(rxpeer);
+ }
+}
+
+int
+lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
+{
+ lnet_nid_t dst_nid = msg->msg_target.nid;
+ lnet_ni_t *src_ni;
+ lnet_ni_t *local_ni;
+ lnet_remotenet_t *rnet;
+ lnet_route_t *route;
+ lnet_route_t *best_route;
+ struct list_head *tmp;
+ lnet_peer_t *lp;
+ lnet_peer_t *lp2;
+ int rc;
+
+ LASSERT (msg->msg_txpeer == NULL);
+ LASSERT (!msg->msg_sending);
+ LASSERT (!msg->msg_target_is_router);
+ LASSERT (!msg->msg_receiving);
+
+ msg->msg_sending = 1;
+
+ /* NB! ni != NULL == interface pre-determined (ACK/REPLY) */
+
+ LNET_LOCK();
+
+ if (the_lnet.ln_shutdown) {
+ LNET_UNLOCK();
+ return -ESHUTDOWN;
+ }
+
+ if (src_nid == LNET_NID_ANY) {
+ src_ni = NULL;
+ } else {
+ src_ni = lnet_nid2ni_locked(src_nid);
+ if (src_ni == NULL) {
+ LNET_UNLOCK();
+ CERROR("Can't send to %s: src %s is not a local nid\n",
+ libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
+ return -EINVAL;
+ }
+ LASSERT (!msg->msg_routing);
+ }
+
+ /* Is this for someone on a local network? */
+ local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid));
+
+ if (local_ni != NULL) {
+ if (src_ni == NULL) {
+ src_ni = local_ni;
+ src_nid = src_ni->ni_nid;
+ } else if (src_ni == local_ni) {
+ lnet_ni_decref_locked(local_ni);
+ } else {
+ lnet_ni_decref_locked(local_ni);
+ lnet_ni_decref_locked(src_ni);
+ LNET_UNLOCK();
+ CERROR("no route to %s via from %s\n",
+ libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
+ return -EINVAL;
+ }
+
+ LASSERT (src_nid != LNET_NID_ANY);
+
+ if (!msg->msg_routing) {
+ src_nid = lnet_ptlcompat_srcnid(src_nid, dst_nid);
+ msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
+ }
+
+ if (src_ni == the_lnet.ln_loni) {
+ /* No send credit hassles with LOLND */
+ LNET_UNLOCK();
+ lnet_ni_send(src_ni, msg);
+ lnet_ni_decref(src_ni);
+ return 0;
+ }
+
+ rc = lnet_nid2peer_locked(&lp, dst_nid);
+ lnet_ni_decref_locked(src_ni); /* lp has ref on src_ni; lose mine */
+ if (rc != 0) {
+ LNET_UNLOCK();
+ CERROR("Error %d finding peer %s\n", rc,
+ libcfs_nid2str(dst_nid));
+ /* ENOMEM or shutting down */
+ return rc;
+ }
+ LASSERT (lp->lp_ni == src_ni);
+ } else {
+ /* sending to a remote network */
+ rnet = lnet_find_net_locked(LNET_NIDNET(dst_nid));
+ if (rnet == NULL) {
+ if (src_ni != NULL)
+ lnet_ni_decref_locked(src_ni);
+ LNET_UNLOCK();
+ CERROR("No route to %s\n", libcfs_id2str(msg->msg_target));
+ return -EHOSTUNREACH;
+ }
+
+ /* Find the best gateway I can use */
+ lp = NULL;
+ best_route = NULL;
+ list_for_each(tmp, &rnet->lrn_routes) {
+ route = list_entry(tmp, lnet_route_t, lr_list);
+ lp2 = route->lr_gateway;
+
+ if (lp2->lp_alive &&
+ (src_ni == NULL || lp2->lp_ni == src_ni) &&
+ (lp == NULL || lnet_compare_routers(lp2, lp) > 0)) {
+ best_route = route;
+ lp = lp2;
+ }
+ }
+
+ if (lp == NULL) {
+ if (src_ni != NULL)
+ lnet_ni_decref_locked(src_ni);
+ LNET_UNLOCK();
+ CERROR("No route to %s (all routers down)\n",
+ libcfs_id2str(msg->msg_target));
+ return -EHOSTUNREACH;
+ }
+
+ /* Place selected route at the end of the route list to ensure
+ * fairness; everything else being equal... */
+ list_del(&best_route->lr_list);
+ list_add_tail(&best_route->lr_list, &rnet->lrn_routes);
+
+ if (src_ni == NULL) {
+ src_ni = lp->lp_ni;
+ src_nid = src_ni->ni_nid;
+ } else {
+ LASSERT (src_ni == lp->lp_ni);
+ lnet_ni_decref_locked(src_ni);
+ }
+
+ lnet_peer_addref_locked(lp);
+
+ LASSERT (src_nid != LNET_NID_ANY);
+
+ if (!msg->msg_routing) {
+ /* I'm the source and now I know which NI to send on */
+ src_nid = lnet_ptlcompat_srcnid(src_nid, dst_nid);
+ msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
+ }
+
+ msg->msg_target_is_router = 1;
+ msg->msg_target.nid = lp->lp_nid;
+ msg->msg_target.pid = LUSTRE_SRV_LNET_PID;
+ }
+
+ /* 'lp' is our best choice of peer */
+
+ LASSERT (!msg->msg_peertxcredit);
+ LASSERT (!msg->msg_txcredit);
+ LASSERT (msg->msg_txpeer == NULL);
+
+ msg->msg_txpeer = lp; /* msg takes my ref on lp */
+
+ rc = lnet_post_send_locked(msg, 0);
+ LNET_UNLOCK();
+
+ if (rc == 0)
+ lnet_ni_send(src_ni, msg);
+
+ return 0;
+}
+
+static void
+lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
+{
+ /* ALWAYS called holding the LNET_LOCK */
+ /* Here, we commit the MD to a network OP by marking it busy and
+ * decrementing its threshold. Come what may, the network "owns"
+ * the MD until a call to lnet_finalize() signals completion. */
+ LASSERT (!msg->msg_routing);
+
+ msg->msg_md = md;
+
+ md->md_refcount++;
+ if (md->md_threshold != LNET_MD_THRESH_INF) {
+ LASSERT (md->md_threshold > 0);
+ md->md_threshold--;
+ }
+
+ the_lnet.ln_counters.msgs_alloc++;
+ if (the_lnet.ln_counters.msgs_alloc >
+ the_lnet.ln_counters.msgs_max)
+ the_lnet.ln_counters.msgs_max =
+ the_lnet.ln_counters.msgs_alloc;
+
+ LASSERT (!msg->msg_onactivelist);
+ msg->msg_onactivelist = 1;
+ list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
+}
+
+static void
+lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
+{
+ LNET_LOCK();
+ the_lnet.ln_counters.drop_count++;
+ the_lnet.ln_counters.drop_length += nob;
+ LNET_UNLOCK();
+
+ lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
+}
+
+static void
+lnet_drop_delayed_put(lnet_msg_t *msg, char *reason)
+{
+ LASSERT (msg->msg_md == NULL);
+ LASSERT (msg->msg_delayed);
+ LASSERT (msg->msg_rxpeer != NULL);
+ LASSERT (msg->msg_hdr.type == LNET_MSG_PUT);
+
+ CWARN("Dropping delayed PUT from %s portal %d match "LPU64
+ " offset %d length %d: %s\n",
+ libcfs_id2str((lnet_process_id_t){
+ .nid = msg->msg_hdr.src_nid,
+ .pid = msg->msg_hdr.src_pid}),
+ msg->msg_hdr.msg.put.ptl_index,
+ msg->msg_hdr.msg.put.match_bits,
+ msg->msg_hdr.msg.put.offset,
+ msg->msg_hdr.payload_length,
+ reason);
+
+ /* NB I can't drop msg's ref on msg_rxpeer until after I've
+ * called lnet_drop_message(), so I just hang onto msg as well
+ * until that's done */
+
+ lnet_drop_message(msg->msg_rxpeer->lp_ni,
+ msg->msg_private, msg->msg_len);
+
+ LNET_LOCK();
+
+ lnet_peer_decref_locked(msg->msg_rxpeer);
+ msg->msg_rxpeer = NULL;
+
+ lnet_msg_free(msg);
+
+ LNET_UNLOCK();
+}
+
+int
+LNetSetLazyPortal(int portal)
+{
+ lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
+
+ if (portal < 0 || portal >= the_lnet.ln_nportals)
+ return -EINVAL;
+
+ CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
+
+ LNET_LOCK();
+
+ ptl->ptl_options |= LNET_PTL_LAZY;
+
+ LNET_UNLOCK();
+
+ return 0;
+}
+
+int
+LNetClearLazyPortal(int portal)
+{
+ struct list_head zombies;
+ lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
+ lnet_msg_t *msg;
+
+ if (portal < 0 || portal >= the_lnet.ln_nportals)
+ return -EINVAL;
+
+ LNET_LOCK();
+
+ if ((ptl->ptl_options & LNET_PTL_LAZY) == 0) {
+ LNET_UNLOCK();
+ return 0;
+ }
+
+ if (the_lnet.ln_shutdown)
+ CWARN ("Active lazy portal %d on exit\n", portal);
+ else
+ CDEBUG (D_NET, "clearing portal %d lazy\n", portal);
+
+ /* grab all the blocked messages atomically */
+ list_add(&zombies, &ptl->ptl_msgq);
+ list_del_init(&ptl->ptl_msgq);
+
+ ptl->ptl_msgq_version++;
+ ptl->ptl_options &= ~LNET_PTL_LAZY;
+
+ LNET_UNLOCK();
+
+ while (!list_empty(&zombies)) {
+ msg = list_entry(zombies.next, lnet_msg_t, msg_list);
+ list_del(&msg->msg_list);
+
+ lnet_drop_delayed_put(msg, "Clearing lazy portal attr");
+ }
+
+ return 0;
+}
+
+static void
+lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
+ unsigned int offset, unsigned int mlength)
+{
+ lnet_hdr_t *hdr = &msg->msg_hdr;
+
+ LNET_LOCK();
+
+ the_lnet.ln_counters.recv_count++;
+ the_lnet.ln_counters.recv_length += mlength;
+
+ LNET_UNLOCK();
+
+ if (mlength != 0)
+ lnet_setpayloadbuffer(msg);
+
+ msg->msg_ev.type = LNET_EVENT_PUT;
+ msg->msg_ev.target.pid = hdr->dest_pid;
+ msg->msg_ev.target.nid = hdr->dest_nid;
+ msg->msg_ev.hdr_data = hdr->msg.put.hdr_data;
+
+ /* Must I ACK? If so I'll grab the ack_wmd out of the header and put
+ * it back into the ACK during lnet_finalize() */
+ msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
+ (md->md_options & LNET_MD_ACK_DISABLE) == 0);
+
+ lnet_ni_recv(msg->msg_rxpeer->lp_ni,
+ msg->msg_private,
+ msg, delayed, offset, mlength,
+ hdr->payload_length);
+}
+
+/* called with LNET_LOCK held */
+void
+lnet_match_blocked_msg(lnet_libmd_t *md)
+{
+ CFS_LIST_HEAD (drops);
+ CFS_LIST_HEAD (matches);
+ struct list_head *tmp;
+ struct list_head *entry;
+ lnet_msg_t *msg;
+ lnet_me_t *me = md->md_me;
+ lnet_portal_t *ptl = &the_lnet.ln_portals[me->me_portal];
+
+ LASSERT (me->me_portal < the_lnet.ln_nportals);
+
+ if ((ptl->ptl_options & LNET_PTL_LAZY) == 0) {
+ LASSERT (list_empty(&ptl->ptl_msgq));
+ return;
+ }
+
+ LASSERT (md->md_refcount == 0); /* a brand new MD */
+
+ list_for_each_safe (entry, tmp, &ptl->ptl_msgq) {
+ int rc;
+ int index;
+ unsigned int mlength;
+ unsigned int offset;
+ lnet_hdr_t *hdr;
+ lnet_process_id_t src;
+
+ msg = list_entry(entry, lnet_msg_t, msg_list);
+
+ LASSERT (msg->msg_delayed);
+
+ hdr = &msg->msg_hdr;
+ index = hdr->msg.put.ptl_index;
+
+ src.nid = hdr->src_nid;
+ src.pid = hdr->src_pid;
+
+ rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
+ hdr->payload_length,
+ hdr->msg.put.offset,
+ hdr->msg.put.match_bits,
+ md, msg, &mlength, &offset);
+
+ if (rc == LNET_MATCHMD_NONE)
+ continue;
+
+ /* Hurrah! This _is_ a match */
+ list_del(&msg->msg_list);
+ ptl->ptl_msgq_version++;
+
+ if (rc == LNET_MATCHMD_OK) {
+ list_add_tail(&msg->msg_list, &matches);
+
+ CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
+ "match "LPU64" offset %d length %d.\n",
+ libcfs_id2str(src),
+ hdr->msg.put.ptl_index,
+ hdr->msg.put.match_bits,
+ hdr->msg.put.offset,
+ hdr->payload_length);
+ } else {
+ LASSERT (rc == LNET_MATCHMD_DROP);
+
+ list_add_tail(&msg->msg_list, &drops);
+ }
+
+ if (lnet_md_exhausted(md))
+ break;
+ }
+
+ LNET_UNLOCK();
+
+ list_for_each_safe (entry, tmp, &drops) {
+ msg = list_entry(entry, lnet_msg_t, msg_list);
+
+ list_del(&msg->msg_list);
+
+ lnet_drop_delayed_put(msg, "Bad match");
+ }