- /* NB I can't drop msg's ref on msg_rxpeer until after I've
- * called lnet_drop_message(), so I just hang onto msg as well
- * until that's done */
-
- lnet_drop_message(msg->msg_rxpeer->lp_ni,
- msg->msg_private, msg->msg_len);
-
- LNET_LOCK();
-
- lnet_peer_decref_locked(msg->msg_rxpeer);
- msg->msg_rxpeer = NULL;
-
- lnet_msg_free(msg);
-
- LNET_UNLOCK();
-}
-
-/**
- * Turn on the lazy portal attribute. Use with caution!
- *
- * This portal attribute only affects incoming PUT requests to the portal,
- * and is off by default. By default, if there's no matching MD for an
- * incoming PUT request, it is simply dropped. With the lazy attribute on,
- * such requests are queued indefinitely until either a matching MD is
- * posted to the portal or the lazy attribute is turned off.
- *
- * It would prevent dropped requests, however it should be regarded as the
- * last line of defense - i.e. users must keep a close watch on active
- * buffers on a lazy portal and once it becomes too low post more buffers as
- * soon as possible. This is because delayed requests usually have detrimental
- * effects on underlying network connections. A few delayed requests often
- * suffice to bring an underlying connection to a complete halt, due to flow
- * control mechanisms.
- *
- * There's also a DOS attack risk. If users don't post match-all MDs on a
- * lazy portal, a malicious peer can easily stop a service by sending some
- * PUT requests with match bits that won't match any MD. A routed server is
- * especially vulnerable since the connections to its neighbor routers are
- * shared among all clients.
- *
- * \param portal Index of the portal to enable the lazy attribute on.
- *
- * \retval 0 On success.
- * \retval -EINVAL If \a portal is not a valid index.
- */
-int
-LNetSetLazyPortal(int portal)
-{
- lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
-
- if (portal < 0 || portal >= the_lnet.ln_nportals)
- return -EINVAL;
-
- CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
-
- LNET_LOCK();
- lnet_portal_setopt(ptl, LNET_PTL_LAZY);
- LNET_UNLOCK();
-
- return 0;
-}
-
-/**
- * Turn off the lazy portal attribute. Delayed requests on the portal,
- * if any, will be all dropped when this function returns.
- *
- * \param portal Index of the portal to disable the lazy attribute on.
- *
- * \retval 0 On success.
- * \retval -EINVAL If \a portal is not a valid index.
- */
-int
-LNetClearLazyPortal(int portal)
-{
- cfs_list_t zombies;
- lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
- lnet_msg_t *msg;
-
- if (portal < 0 || portal >= the_lnet.ln_nportals)
- return -EINVAL;
-
- LNET_LOCK();
-
- if (!lnet_portal_is_lazy(ptl)) {
- LNET_UNLOCK();
- return 0;
- }
-
- if (the_lnet.ln_shutdown)
- CWARN ("Active lazy portal %d on exit\n", portal);
- else
- CDEBUG (D_NET, "clearing portal %d lazy\n", portal);
-
- /* grab all the blocked messages atomically */
- cfs_list_add(&zombies, &ptl->ptl_msgq);
- cfs_list_del_init(&ptl->ptl_msgq);
-
- ptl->ptl_msgq_version++;
- lnet_portal_unsetopt(ptl, LNET_PTL_LAZY);
-
- LNET_UNLOCK();
-
- while (!cfs_list_empty(&zombies)) {
- msg = cfs_list_entry(zombies.next, lnet_msg_t, msg_list);
- cfs_list_del(&msg->msg_list);
-
- lnet_drop_delayed_put(msg, "Clearing lazy portal attr");
- }
-
- return 0;
-}
-
-static void
-lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
- unsigned int offset, unsigned int mlength)
-{
- lnet_hdr_t *hdr = &msg->msg_hdr;
-
- LNET_LOCK();
-
- the_lnet.ln_counters.recv_count++;
- the_lnet.ln_counters.recv_length += mlength;
-
- LNET_UNLOCK();
-
- if (mlength != 0)
- lnet_setpayloadbuffer(msg);
-
- msg->msg_ev.type = LNET_EVENT_PUT;
- msg->msg_ev.target.pid = hdr->dest_pid;
- msg->msg_ev.target.nid = hdr->dest_nid;
- msg->msg_ev.hdr_data = hdr->msg.put.hdr_data;
-
- /* Must I ACK? If so I'll grab the ack_wmd out of the header and put
- * it back into the ACK during lnet_finalize() */
- msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
- (md->md_options & LNET_MD_ACK_DISABLE) == 0);
-
- lnet_ni_recv(msg->msg_rxpeer->lp_ni,
- msg->msg_private,
- msg, delayed, offset, mlength,
- hdr->payload_length);
-}
-
-/* called with LNET_LOCK held */
-void
-lnet_match_blocked_msg(lnet_libmd_t *md)
-{
- CFS_LIST_HEAD (drops);
- CFS_LIST_HEAD (matches);
- cfs_list_t *tmp;
- cfs_list_t *entry;
- lnet_msg_t *msg;
- lnet_portal_t *ptl;
- lnet_me_t *me = md->md_me;
-
- LASSERT (me->me_portal < (unsigned int)the_lnet.ln_nportals);
-
- ptl = &the_lnet.ln_portals[me->me_portal];
- if (!lnet_portal_is_lazy(ptl)) {
- LASSERT (cfs_list_empty(&ptl->ptl_msgq));
- return;
- }
-
- LASSERT (md->md_refcount == 0); /* a brand new MD */
-
- cfs_list_for_each_safe (entry, tmp, &ptl->ptl_msgq) {
- int rc;
- int index;
- unsigned int mlength;
- unsigned int offset;
- lnet_hdr_t *hdr;
- lnet_process_id_t src;
-
- msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
- LASSERT (msg->msg_delayed);
-
- hdr = &msg->msg_hdr;
- index = hdr->msg.put.ptl_index;
-
- src.nid = hdr->src_nid;
- src.pid = hdr->src_pid;
-
- rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
- hdr->payload_length,
- hdr->msg.put.offset,
- hdr->msg.put.match_bits,
- md, msg, &mlength, &offset);
-
- if (rc == LNET_MATCHMD_NONE)
- continue;
-
- /* Hurrah! This _is_ a match */
- cfs_list_del(&msg->msg_list);
- ptl->ptl_msgq_version++;
-
- if (rc == LNET_MATCHMD_OK) {
- cfs_list_add_tail(&msg->msg_list, &matches);
-
- CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
- "match "LPU64" offset %d length %d.\n",
- libcfs_id2str(src),
- hdr->msg.put.ptl_index,
- hdr->msg.put.match_bits,
- hdr->msg.put.offset,
- hdr->payload_length);
- } else {
- LASSERT (rc == LNET_MATCHMD_DROP);
-
- cfs_list_add_tail(&msg->msg_list, &drops);
- }
-
- if (lnet_md_exhausted(md))
- break;
- }
-
- LNET_UNLOCK();
-
- cfs_list_for_each_safe (entry, tmp, &drops) {
- msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
- cfs_list_del(&msg->msg_list);
-
- lnet_drop_delayed_put(msg, "Bad match");
- }
-
- cfs_list_for_each_safe (entry, tmp, &matches) {
- msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
- cfs_list_del(&msg->msg_list);
-
- /* md won't disappear under me, since each msg
- * holds a ref on it */
- lnet_recv_put(md, msg, 1,
- msg->msg_ev.offset,
- msg->msg_ev.mlength);
- }
-
- LNET_LOCK();