Whamcloud - gitweb
LU-56 lnet: LNet message event cleanup
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index 95b4985..2b118a7 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -46,172 +44,6 @@ static int local_nid_dist_zero = 1;
 CFS_MODULE_PARM(local_nid_dist_zero, "i", int, 0444,
                 "Reserved");
 
-/* forward ref */
-static void lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg);
-
-#define LNET_MATCHMD_NONE     0   /* Didn't match */
-#define LNET_MATCHMD_OK       1   /* Matched OK */
-#define LNET_MATCHMD_DROP     2   /* Must be discarded */
-
-static int
-lnet_try_match_md (int index, int op_mask, lnet_process_id_t src,
-                   unsigned int rlength, unsigned int roffset,
-                   __u64 match_bits, lnet_libmd_t *md, lnet_msg_t *msg,
-                   unsigned int *mlength_out, unsigned int *offset_out)
-{
-        /* ALWAYS called holding the LNET_LOCK, and can't LNET_UNLOCK;
-         * lnet_match_blocked_msg() relies on this to avoid races */
-        unsigned int  offset;
-        unsigned int  mlength;
-        lnet_me_t    *me = md->md_me;
-
-        /* mismatched MD op */
-        if ((md->md_options & op_mask) == 0)
-                return LNET_MATCHMD_NONE;
-
-        /* MD exhausted */
-        if (lnet_md_exhausted(md))
-                return LNET_MATCHMD_NONE;
-
-        /* mismatched ME nid/pid? */
-        if (me->me_match_id.nid != LNET_NID_ANY &&
-            me->me_match_id.nid != src.nid)
-                return LNET_MATCHMD_NONE;
-
-        if (me->me_match_id.pid != LNET_PID_ANY &&
-            me->me_match_id.pid != src.pid)
-                return LNET_MATCHMD_NONE;
-
-        /* mismatched ME matchbits? */
-        if (((me->me_match_bits ^ match_bits) & ~me->me_ignore_bits) != 0)
-                return LNET_MATCHMD_NONE;
-
-        /* Hurrah! This _is_ a match; check it out... */
-
-        if ((md->md_options & LNET_MD_MANAGE_REMOTE) == 0)
-                offset = md->md_offset;
-        else
-                offset = roffset;
-
-        if ((md->md_options & LNET_MD_MAX_SIZE) != 0) {
-                mlength = md->md_max_size;
-                LASSERT (md->md_offset + mlength <= md->md_length);
-        } else {
-                mlength = md->md_length - offset;
-        }
-
-        if (rlength <= mlength) {        /* fits in allowed space */
-                mlength = rlength;
-        } else if ((md->md_options & LNET_MD_TRUNCATE) == 0) {
-                /* this packet _really_ is too big */
-                CERROR("Matching packet from %s, match "LPU64
-                       " length %d too big: %d left, %d allowed\n",
-                       libcfs_id2str(src), match_bits, rlength,
-                       md->md_length - offset, mlength);
-
-                return LNET_MATCHMD_DROP;
-        }
-
-        /* Commit to this ME/MD */
-        CDEBUG(D_NET, "Incoming %s index %x from %s of "
-               "length %d/%d into md "LPX64" [%d] + %d\n",
-               (op_mask == LNET_MD_OP_PUT) ? "put" : "get",
-               index, libcfs_id2str(src), mlength, rlength,
-               md->md_lh.lh_cookie, md->md_niov, offset);
-
-        lnet_commit_md(md, msg);
-        md->md_offset = offset + mlength;
-
-        /* NB Caller will set ev.type and ev.hdr_data */
-        msg->msg_ev.initiator = src;
-        msg->msg_ev.pt_index = index;
-        msg->msg_ev.match_bits = match_bits;
-        msg->msg_ev.rlength = rlength;
-        msg->msg_ev.mlength = mlength;
-        msg->msg_ev.offset = offset;
-
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
-
-        *offset_out = offset;
-        *mlength_out = mlength;
-
-        /* Auto-unlink NOW, so the ME gets unlinked if required.
-         * We bumped md->md_refcount above so the MD just gets flagged
-         * for unlink when it is finalized. */
-        if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0 &&
-            lnet_md_exhausted(md)) {
-                lnet_md_unlink(md);
-        }
-
-        return LNET_MATCHMD_OK;
-}
-
-static int
-lnet_match_md(int index, int op_mask, lnet_process_id_t src,
-              unsigned int rlength, unsigned int roffset,
-              __u64 match_bits, lnet_msg_t *msg,
-              unsigned int *mlength_out, unsigned int *offset_out,
-              lnet_libmd_t **md_out)
-{
-        lnet_portal_t    *ptl = &the_lnet.ln_portals[index];
-        cfs_list_t       *head;
-        lnet_me_t        *me;
-        lnet_me_t        *tmp;
-        lnet_libmd_t     *md;
-        int               rc;
-
-        CDEBUG (D_NET, "Request from %s of length %d into portal %d "
-                "MB="LPX64"\n", libcfs_id2str(src), rlength, index, match_bits);
-
-        if (index < 0 || index >= the_lnet.ln_nportals) {
-                CERROR("Invalid portal %d not in [0-%d]\n",
-                       index, the_lnet.ln_nportals);
-                return LNET_MATCHMD_DROP;
-        }
-
-        head = lnet_portal_me_head(index, src, match_bits);
-        if (head == NULL) /* nobody posted anything on this portal */
-                goto out;
-
-        cfs_list_for_each_entry_safe_typed (me, tmp, head,
-                                            lnet_me_t, me_list) {
-                md = me->me_md;
-
-                /* ME attached but MD not attached yet */
-                if (md == NULL)
-                        continue;
-
-                LASSERT (me == md->md_me);
-
-                rc = lnet_try_match_md(index, op_mask, src, rlength,
-                                       roffset, match_bits, md, msg,
-                                       mlength_out, offset_out);
-                switch (rc) {
-                default:
-                        LBUG();
-
-                case LNET_MATCHMD_NONE:
-                        continue;
-
-                case LNET_MATCHMD_OK:
-                        *md_out = md;
-                        return LNET_MATCHMD_OK;
-
-                case LNET_MATCHMD_DROP:
-                        return LNET_MATCHMD_DROP;
-                }
-                /* not reached */
-        }
-
- out:
-        if (op_mask == LNET_MD_OP_GET ||
-            !lnet_portal_is_lazy(ptl))
-                return LNET_MATCHMD_DROP;
-
-        return LNET_MATCHMD_NONE;
-}
-
 int
 lnet_fail_nid (lnet_nid_t nid, unsigned int threshold)
 {
@@ -775,9 +607,9 @@ lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
                 LASSERT(!msg->msg_sending);
                 LASSERT(rlen == msg->msg_len);
                 LASSERT(mlen <= msg->msg_len);
+               LASSERT(msg->msg_offset == offset);
+               LASSERT(msg->msg_wanted == mlen);
 
-                msg->msg_wanted = mlen;
-                msg->msg_offset = offset;
                 msg->msg_receiving = 0;
 
                 if (mlen != 0) {
@@ -923,6 +755,7 @@ lnet_ni_peer_alive(lnet_peer_t *lp)
 
         LASSERT (lnet_peer_aliveness_enabled(lp));
         LASSERT (ni->ni_lnd->lnd_query != NULL);
+        LASSERT (the_lnet.ln_routing == 1);
 
         LNET_UNLOCK();
         (ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
@@ -932,7 +765,6 @@ lnet_ni_peer_alive(lnet_peer_t *lp)
 
         if (last_alive != 0) /* NI has updated timestamp */
                 lp->lp_last_alive = last_alive;
-        return;
 }
 
 /* NB: always called with LNET_LOCK held */
@@ -943,6 +775,7 @@ lnet_peer_is_alive (lnet_peer_t *lp, cfs_time_t now)
         cfs_time_t deadline;
 
         LASSERT (lnet_peer_aliveness_enabled(lp));
+        LASSERT (the_lnet.ln_routing == 1);
 
         /* Trust lnet_notify() if it has more recent aliveness news, but
          * ignore the initial assumed death (see lnet_peers_start_down()).
@@ -974,6 +807,10 @@ lnet_peer_alive_locked (lnet_peer_t *lp)
 {
         cfs_time_t now = cfs_time_current();
 
+        /* LU-630: only router checks peer health. */
+        if (the_lnet.ln_routing == 0)
+                return 1;
+
         if (!lnet_peer_aliveness_enabled(lp))
                 return -ENODEV;
 
@@ -1028,6 +865,8 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send)
         /* NB 'lp' is always the next hop */
         if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
             lnet_peer_alive_locked(lp) == 0) {
+                the_lnet.ln_counters.drop_count++;
+                the_lnet.ln_counters.drop_length += msg->msg_len;
                 LNET_UNLOCK();
 
                 CNETERR("Dropping message for %s: peer not alive\n",
@@ -1100,7 +939,8 @@ lnet_commit_routedmsg (lnet_msg_t *msg)
 
         LASSERT (!msg->msg_onactivelist);
         msg->msg_onactivelist = 1;
-        cfs_list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
+       cfs_list_add(&msg->msg_activelist,
+                    &the_lnet.ln_msg_container.msc_active);
 }
 
 lnet_rtrbufpool_t *
@@ -1306,18 +1146,65 @@ lnet_return_credits_locked (lnet_msg_t *msg)
         }
 }
 
+static lnet_peer_t *
+lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target)
+{
+       lnet_remotenet_t        *rnet;
+       lnet_route_t            *rtr;
+       lnet_route_t            *rtr_best;
+       lnet_route_t            *rtr_last;
+       struct lnet_peer        *lp_best;
+       struct lnet_peer        *lp;
+       int                     rc;
+
+       rnet = lnet_find_net_locked(LNET_NIDNET(target));
+       if (rnet == NULL)
+               return NULL;
+
+       lp_best = NULL;
+       rtr_best = rtr_last = NULL;
+       cfs_list_for_each_entry(rtr, &rnet->lrn_routes, lr_list) {
+               lp = rtr->lr_gateway;
+
+               if (!lp->lp_alive || /* gateway is down */
+                   (lp->lp_ping_version == LNET_PROTO_PING_VERSION &&
+                    rtr->lr_downis != 0)) /* NI to target is down */
+                       continue;
+
+               if (ni != NULL && lp->lp_ni != ni)
+                       continue;
+
+               if (lp_best == NULL) {
+                       rtr_best = rtr_last = rtr;
+                       lp_best = lp;
+                       continue;
+               }
+
+               rc = lnet_compare_routes(rtr, rtr_best);
+               if (rc < 0)
+                       continue;
+
+               rtr_best = rtr;
+               lp_best = lp;
+       }
+
+       if (rtr_best != NULL) {
+               /* Place selected route at the end of the route list to ensure
+                * fairness; everything else being equal... */
+               cfs_list_del(&rtr_best->lr_list);
+               cfs_list_add_tail(&rtr_best->lr_list, &rnet->lrn_routes);
+       }
+
+       return lp_best;
+}
+
 int
 lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
 {
         lnet_nid_t        dst_nid = msg->msg_target.nid;
         lnet_ni_t        *src_ni;
         lnet_ni_t        *local_ni;
-        lnet_remotenet_t *rnet;
-        lnet_route_t     *route;
-        lnet_route_t     *best_route;
-        cfs_list_t       *tmp;
         lnet_peer_t      *lp;
-        lnet_peer_t      *lp2;
         int               rc;
 
         LASSERT (msg->msg_txpeer == NULL);
@@ -1407,33 +1294,7 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
                 LNET_LOCK();
 #endif
                 /* sending to a remote network */
-                rnet = lnet_find_net_locked(LNET_NIDNET(dst_nid));
-                if (rnet == NULL) {
-                        if (src_ni != NULL)
-                                lnet_ni_decref_locked(src_ni);
-                        LNET_UNLOCK();
-                        LCONSOLE_WARN("No route to %s\n",
-                                      libcfs_id2str(msg->msg_target));
-                        return -EHOSTUNREACH;
-                }
-
-                /* Find the best gateway I can use */
-                lp = NULL;
-                best_route = NULL;
-                cfs_list_for_each(tmp, &rnet->lrn_routes) {
-                        route = cfs_list_entry(tmp, lnet_route_t, lr_list);
-                        lp2 = route->lr_gateway;
-
-                        if (lp2->lp_alive &&
-                            lnet_router_down_ni(lp2, rnet->lrn_net) <= 0 &&
-                            (src_ni == NULL || lp2->lp_ni == src_ni) &&
-                            (lp == NULL ||
-                             lnet_compare_routes(route, best_route) > 0)) {
-                                best_route = route;
-                                lp = lp2;
-                        }
-                }
-
+               lp = lnet_find_route_locked(src_ni, dst_nid);
                 if (lp == NULL) {
                         if (src_ni != NULL)
                                 lnet_ni_decref_locked(src_ni);
@@ -1446,10 +1307,9 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
                         return -EHOSTUNREACH;
                 }
 
-                /* Place selected route at the end of the route list to ensure
-                 * fairness; everything else being equal... */
-                cfs_list_del(&best_route->lr_list);
-                cfs_list_add_tail(&best_route->lr_list, &rnet->lrn_routes);
+                CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
+                       libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
+                       lnet_msgtyp2str(msg->msg_type), msg->msg_len);
 
                 if (src_ni == NULL) {
                         src_ni = lp->lp_ni;
@@ -1493,8 +1353,9 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
         return 0;
 }
 
-static void
-lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
+void
+lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg,
+              unsigned int offset, unsigned int mlen)
 {
         /* ALWAYS called holding the LNET_LOCK */
         /* Here, we commit the MD to a network OP by marking it busy and
@@ -1503,6 +1364,13 @@ lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
         LASSERT (!msg->msg_routing);
 
         msg->msg_md = md;
+       lnet_md_deconstruct(md, &msg->msg_ev.md);
+       lnet_md2handle(&msg->msg_ev.md_handle, md);
+
+       if (msg->msg_receiving) {
+               msg->msg_offset = offset;
+               msg->msg_wanted = mlen;
+       }
 
         md->md_refcount++;
         if (md->md_threshold != LNET_MD_THRESH_INF) {
@@ -1518,7 +1386,8 @@ lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
 
         LASSERT (!msg->msg_onactivelist);
         msg->msg_onactivelist = 1;
-        cfs_list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
+       cfs_list_add(&msg->msg_activelist,
+                    &the_lnet.ln_msg_container.msc_active);
 }
 
 static void
@@ -1533,267 +1402,29 @@ lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
 }
 
 static void
-lnet_drop_delayed_put(lnet_msg_t *msg, char *reason)
+lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
 {
-        lnet_process_id_t id = {0};
-
-        id.nid = msg->msg_hdr.src_nid;
-        id.pid = msg->msg_hdr.src_pid;
+       lnet_hdr_t      *hdr = &msg->msg_hdr;
 
-        LASSERT (msg->msg_md == NULL);
-        LASSERT (msg->msg_delayed);
-        LASSERT (msg->msg_rxpeer != NULL);
-        LASSERT (msg->msg_hdr.type == LNET_MSG_PUT);
-
-        CWARN("Dropping delayed PUT from %s portal %d match "LPU64
-              " offset %d length %d: %s\n", 
-              libcfs_id2str(id),
-              msg->msg_hdr.msg.put.ptl_index,
-              msg->msg_hdr.msg.put.match_bits,
-              msg->msg_hdr.msg.put.offset,
-              msg->msg_hdr.payload_length,
-              reason);
-
-        /* NB I can't drop msg's ref on msg_rxpeer until after I've
-         * called lnet_drop_message(), so I just hang onto msg as well
-         * until that's done */
-
-        lnet_drop_message(msg->msg_rxpeer->lp_ni,
-                          msg->msg_private, msg->msg_len);
+       LNET_LOCK();
 
-        LNET_LOCK();
+       the_lnet.ln_counters.recv_count++;
+       the_lnet.ln_counters.recv_length += msg->msg_wanted;
 
-        lnet_peer_decref_locked(msg->msg_rxpeer);
-        msg->msg_rxpeer = NULL;
+       LNET_UNLOCK();
 
-        lnet_msg_free(msg);
+       if (msg->msg_wanted != 0)
+               lnet_setpayloadbuffer(msg);
 
-        LNET_UNLOCK();
-}
-
-/**
- * Turn on the lazy portal attribute. Use with caution!
- *
- * This portal attribute only affects incoming PUT requests to the portal,
- * and is off by default. By default, if there's no matching MD for an
- * incoming PUT request, it is simply dropped. With the lazy attribute on,
- * such requests are queued indefinitely until either a matching MD is
- * posted to the portal or the lazy attribute is turned off.
- *
- * It would prevent dropped requests, however it should be regarded as the
- * last line of defense - i.e. users must keep a close watch on active
- * buffers on a lazy portal and once it becomes too low post more buffers as
- * soon as possible. This is because delayed requests usually have detrimental
- * effects on underlying network connections. A few delayed requests often
- * suffice to bring an underlying connection to a complete halt, due to flow
- * control mechanisms.
- *
- * There's also a DOS attack risk. If users don't post match-all MDs on a
- * lazy portal, a malicious peer can easily stop a service by sending some
- * PUT requests with match bits that won't match any MD. A routed server is
- * especially vulnerable since the connections to its neighbor routers are
- * shared among all clients.
- *
- * \param portal Index of the portal to enable the lazy attribute on.
- *
- * \retval 0       On success.
- * \retval -EINVAL If \a portal is not a valid index.
- */
-int
-LNetSetLazyPortal(int portal)
-{
-        lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
-
-        if (portal < 0 || portal >= the_lnet.ln_nportals)
-                return -EINVAL;
-
-        CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
-
-        LNET_LOCK();
-        lnet_portal_setopt(ptl, LNET_PTL_LAZY);
-        LNET_UNLOCK();
-
-        return 0;
-}
-
-/**
- * Turn off the lazy portal attribute. Delayed requests on the portal,
- * if any, will be all dropped when this function returns.
- *
- * \param portal Index of the portal to disable the lazy attribute on.
- *
- * \retval 0       On success.
- * \retval -EINVAL If \a portal is not a valid index.
- */
-int
-LNetClearLazyPortal(int portal)
-{
-        cfs_list_t        zombies;
-        lnet_portal_t    *ptl = &the_lnet.ln_portals[portal];
-        lnet_msg_t       *msg;
-
-        if (portal < 0 || portal >= the_lnet.ln_nportals)
-                return -EINVAL;
-
-        LNET_LOCK();
-
-        if (!lnet_portal_is_lazy(ptl)) {
-                LNET_UNLOCK();
-                return 0;
-        }
-
-        if (the_lnet.ln_shutdown)
-                CWARN ("Active lazy portal %d on exit\n", portal);
-        else
-                CDEBUG (D_NET, "clearing portal %d lazy\n", portal);
-
-        /* grab all the blocked messages atomically */
-        cfs_list_add(&zombies, &ptl->ptl_msgq);
-        cfs_list_del_init(&ptl->ptl_msgq);
-
-        ptl->ptl_msgq_version++;
-        lnet_portal_unsetopt(ptl, LNET_PTL_LAZY);
-
-        LNET_UNLOCK();
-
-        while (!cfs_list_empty(&zombies)) {
-                msg = cfs_list_entry(zombies.next, lnet_msg_t, msg_list);
-                cfs_list_del(&msg->msg_list);
-
-                lnet_drop_delayed_put(msg, "Clearing lazy portal attr");
-        }
-
-        return 0;
-}
-
-static void
-lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
-              unsigned int offset, unsigned int mlength)
-{
-        lnet_hdr_t       *hdr = &msg->msg_hdr;
-
-        LNET_LOCK();
-
-        the_lnet.ln_counters.recv_count++;
-        the_lnet.ln_counters.recv_length += mlength;
-
-        LNET_UNLOCK();
-
-        if (mlength != 0)
-                lnet_setpayloadbuffer(msg);
-
-        msg->msg_ev.type       = LNET_EVENT_PUT;
-        msg->msg_ev.target.pid = hdr->dest_pid;
-        msg->msg_ev.target.nid = hdr->dest_nid;
-        msg->msg_ev.hdr_data   = hdr->msg.put.hdr_data;
+       lnet_build_msg_event(msg, LNET_EVENT_PUT);
 
         /* Must I ACK?  If so I'll grab the ack_wmd out of the header and put
          * it back into the ACK during lnet_finalize() */
-        msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
-                        (md->md_options & LNET_MD_ACK_DISABLE) == 0);
+       msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
+                       (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
 
-        lnet_ni_recv(msg->msg_rxpeer->lp_ni,
-                     msg->msg_private,
-                     msg, delayed, offset, mlength,
-                     hdr->payload_length);
-}
-
-/* called with LNET_LOCK held */
-void
-lnet_match_blocked_msg(lnet_libmd_t *md)
-{
-        CFS_LIST_HEAD    (drops);
-        CFS_LIST_HEAD    (matches);
-        cfs_list_t       *tmp;
-        cfs_list_t       *entry;
-        lnet_msg_t       *msg;
-        lnet_portal_t    *ptl;
-        lnet_me_t        *me  = md->md_me;
-
-        LASSERT (me->me_portal < (unsigned int)the_lnet.ln_nportals);
-
-        ptl = &the_lnet.ln_portals[me->me_portal];
-        if (!lnet_portal_is_lazy(ptl)) {
-                LASSERT (cfs_list_empty(&ptl->ptl_msgq));
-                return;
-        }
-
-        LASSERT (md->md_refcount == 0); /* a brand new MD */
-
-        cfs_list_for_each_safe (entry, tmp, &ptl->ptl_msgq) {
-                int               rc;
-                int               index;
-                unsigned int      mlength;
-                unsigned int      offset;
-                lnet_hdr_t       *hdr;
-                lnet_process_id_t src;
-
-                msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
-                LASSERT (msg->msg_delayed);
-
-                hdr   = &msg->msg_hdr;
-                index = hdr->msg.put.ptl_index;
-
-                src.nid = hdr->src_nid;
-                src.pid = hdr->src_pid;
-
-                rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
-                                       hdr->payload_length,
-                                       hdr->msg.put.offset,
-                                       hdr->msg.put.match_bits,
-                                       md, msg, &mlength, &offset);
-
-                if (rc == LNET_MATCHMD_NONE)
-                        continue;
-
-                /* Hurrah! This _is_ a match */
-                cfs_list_del(&msg->msg_list);
-                ptl->ptl_msgq_version++;
-
-                if (rc == LNET_MATCHMD_OK) {
-                        cfs_list_add_tail(&msg->msg_list, &matches);
-
-                        CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
-                               "match "LPU64" offset %d length %d.\n",
-                               libcfs_id2str(src),
-                               hdr->msg.put.ptl_index,
-                               hdr->msg.put.match_bits,
-                               hdr->msg.put.offset,
-                               hdr->payload_length);
-                } else {
-                        LASSERT (rc == LNET_MATCHMD_DROP);
-
-                        cfs_list_add_tail(&msg->msg_list, &drops);
-                }
-
-                if (lnet_md_exhausted(md))
-                        break;
-        }
-
-        LNET_UNLOCK();
-
-        cfs_list_for_each_safe (entry, tmp, &drops) {
-                msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
-                cfs_list_del(&msg->msg_list);
-
-                lnet_drop_delayed_put(msg, "Bad match");
-        }
-
-        cfs_list_for_each_safe (entry, tmp, &matches) {
-                msg = cfs_list_entry(entry, lnet_msg_t, msg_list);
-
-                cfs_list_del(&msg->msg_list);
-
-                /* md won't disappear under me, since each msg
-                 * holds a ref on it */
-                lnet_recv_put(md, msg, 1,
-                              msg->msg_ev.offset,
-                              msg->msg_ev.mlength);
-        }
-
-        LNET_LOCK();
+       lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_delayed,
+                    msg->msg_offset, msg->msg_wanted, hdr->payload_length);
 }
 
 static int
@@ -1804,10 +1435,7 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
         __u64             version;
         lnet_hdr_t       *hdr = &msg->msg_hdr;
         unsigned int      rlength = hdr->payload_length;
-        unsigned int      mlength = 0;
-        unsigned int      offset = 0;
         lnet_process_id_t src= {0};
-        lnet_libmd_t     *md;
         lnet_portal_t    *ptl;
 
         src.nid = hdr->src_nid;
@@ -1825,19 +1453,18 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
  again:
         rc = lnet_match_md(index, LNET_MD_OP_PUT, src,
                            rlength, hdr->msg.put.offset,
-                           hdr->msg.put.match_bits, msg,
-                           &mlength, &offset, &md);
+                          hdr->msg.put.match_bits, msg);
         switch (rc) {
         default:
                 LBUG();
 
         case LNET_MATCHMD_OK:
                 LNET_UNLOCK();
-                lnet_recv_put(md, msg, msg->msg_delayed, offset, mlength);
+               lnet_recv_put(ni, msg);
                 return 0;
 
         case LNET_MATCHMD_NONE:
-                ptl = &the_lnet.ln_portals[index];
+               ptl = the_lnet.ln_portals[index];
                 version = ptl->ptl_ml_version;
 
                 rc = 0;
@@ -1846,7 +1473,7 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
 
                 if (rc == 0 &&
                     !the_lnet.ln_shutdown &&
-                    lnet_portal_is_lazy(ptl)) {
+                   lnet_ptl_is_lazy(ptl)) {
                         if (version != ptl->ptl_ml_version)
                                 goto again;
 
@@ -1879,11 +1506,8 @@ static int
 lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
 {
         lnet_hdr_t        *hdr = &msg->msg_hdr;
-        unsigned int       mlength = 0;
-        unsigned int       offset = 0;
         lnet_process_id_t  src = {0};
         lnet_handle_wire_t reply_wmd;
-        lnet_libmd_t      *md;
         int                rc;
 
         src.nid = hdr->src_nid;
@@ -1899,8 +1523,7 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
 
         rc = lnet_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
                            hdr->msg.get.sink_length, hdr->msg.get.src_offset,
-                           hdr->msg.get.match_bits, msg,
-                           &mlength, &offset, &md);
+                          hdr->msg.get.match_bits, msg);
         if (rc == LNET_MATCHMD_DROP) {
                 CNETERR("Dropping GET from %s portal %d match "LPU64
                         " offset %d length %d\n",
@@ -1916,18 +1539,16 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
         LASSERT (rc == LNET_MATCHMD_OK);
 
         the_lnet.ln_counters.send_count++;
-        the_lnet.ln_counters.send_length += mlength;
+       the_lnet.ln_counters.send_length += msg->msg_wanted;
 
-        LNET_UNLOCK();
+       LNET_UNLOCK();
 
-        msg->msg_ev.type = LNET_EVENT_GET;
-        msg->msg_ev.target.pid = hdr->dest_pid;
-        msg->msg_ev.target.nid = hdr->dest_nid;
-        msg->msg_ev.hdr_data = 0;
+       lnet_build_msg_event(msg, LNET_EVENT_GET);
 
-        reply_wmd = hdr->msg.get.return_wmd;
+       reply_wmd = hdr->msg.get.return_wmd;
 
-        lnet_prep_send(msg, LNET_MSG_REPLY, src, offset, mlength);
+       lnet_prep_send(msg, LNET_MSG_REPLY, src,
+                      msg->msg_offset, msg->msg_wanted);
 
         msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
 
@@ -2005,27 +1626,18 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), 
                mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
 
-        lnet_commit_md(md, msg);
+       lnet_commit_md(md, msg, 0, mlength);
 
         if (mlength != 0)
                 lnet_setpayloadbuffer(msg);
 
-        msg->msg_ev.type = LNET_EVENT_REPLY;
-        msg->msg_ev.target.pid = hdr->dest_pid;
-        msg->msg_ev.target.nid = hdr->dest_nid;
-        msg->msg_ev.initiator = src;
-        msg->msg_ev.rlength = rlength;
-        msg->msg_ev.mlength = mlength;
-        msg->msg_ev.offset = 0;
-
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
-
         the_lnet.ln_counters.recv_count++;
         the_lnet.ln_counters.recv_length += mlength;
 
         LNET_UNLOCK();
 
+       lnet_build_msg_event(msg, LNET_EVENT_REPLY);
+
         lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
         return 0;
 }
@@ -2068,21 +1680,13 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), 
                hdr->msg.ack.dst_wmd.wh_object_cookie);
 
-        lnet_commit_md(md, msg);
-
-        msg->msg_ev.type = LNET_EVENT_ACK;
-        msg->msg_ev.target.pid = hdr->dest_pid;
-        msg->msg_ev.target.nid = hdr->dest_nid;
-        msg->msg_ev.initiator = src;
-        msg->msg_ev.mlength = hdr->msg.ack.mlength;
-        msg->msg_ev.match_bits = hdr->msg.ack.match_bits;
+       lnet_commit_md(md, msg, 0, 0);
 
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
+       the_lnet.ln_counters.recv_count++;
 
-        the_lnet.ln_counters.recv_count++;
+       LNET_UNLOCK();
 
-        LNET_UNLOCK();
+       lnet_build_msg_event(msg, LNET_EVENT_ACK);
 
         lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
         return 0;
@@ -2225,17 +1829,16 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
                 return -EPROTO;
         }
 
-        if (the_lnet.ln_routing) {
-                cfs_time_t now = cfs_time_current();
-
-                LNET_LOCK();
-
-                ni->ni_last_alive = now;
-                if (ni->ni_status != NULL &&
-                    ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
-                        ni->ni_status->ns_status = LNET_NI_STATUS_UP;
+       if (the_lnet.ln_routing &&
+           ni->ni_last_alive != cfs_time_current_sec()) {
+               LNET_LOCK();
 
-                LNET_UNLOCK();
+               /* NB: so far here is the only place to set NI status to "up */
+               ni->ni_last_alive = cfs_time_current_sec();
+               if (ni->ni_status != NULL &&
+                   ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
+                       ni->ni_status->ns_status = LNET_NI_STATUS_UP;
+               LNET_UNLOCK();
         }
 
         /* Regard a bad destination NID as a protocol error.  Senders should
@@ -2311,6 +1914,8 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         msg->msg_len = msg->msg_wanted = payload_length;
         msg->msg_offset = 0;
         msg->msg_hdr = *hdr;
+       /* for building message event */
+       msg->msg_from = from_nid;
 
         LNET_LOCK();
         rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid);
@@ -2360,8 +1965,6 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         msg->msg_hdr.dest_pid = dest_pid;
         msg->msg_hdr.payload_length = payload_length;
 
-        msg->msg_ev.sender = from_nid;
-
         switch (type) {
         case LNET_MSG_ACK:
                 rc = lnet_parse_ack(ni, msg);
@@ -2392,7 +1995,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
                 lnet_peer_decref_locked(msg->msg_rxpeer);
                 msg->msg_rxpeer = NULL;
         }
-        lnet_msg_free(msg);                     /* expects LNET_LOCK held */
+       lnet_msg_free_locked(msg);      /* expects LNET_LOCK held */
         LNET_UNLOCK();
 
  drop:
@@ -2400,6 +2003,79 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         return 0;
 }
 
+void
+lnet_drop_delayed_msg_list(cfs_list_t *head, char *reason)
+{
+       while (!cfs_list_empty(head)) {
+               lnet_process_id_t       id = {0};
+               lnet_msg_t              *msg;
+
+               msg = cfs_list_entry(head->next, lnet_msg_t, msg_list);
+               cfs_list_del(&msg->msg_list);
+
+               id.nid = msg->msg_hdr.src_nid;
+               id.pid = msg->msg_hdr.src_pid;
+
+               LASSERT(msg->msg_md == NULL);
+               LASSERT(msg->msg_delayed);
+               LASSERT(msg->msg_rxpeer != NULL);
+               LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
+
+               CWARN("Dropping delayed PUT from %s portal %d match "LPU64
+                     " offset %d length %d: %s\n",
+                     libcfs_id2str(id),
+                     msg->msg_hdr.msg.put.ptl_index,
+                     msg->msg_hdr.msg.put.match_bits,
+                     msg->msg_hdr.msg.put.offset,
+                     msg->msg_hdr.payload_length, reason);
+
+               /* NB I can't drop msg's ref on msg_rxpeer until after I've
+                * called lnet_drop_message(), so I just hang onto msg as well
+                * until that's done */
+
+               lnet_drop_message(msg->msg_rxpeer->lp_ni,
+                                 msg->msg_private, msg->msg_len);
+
+               LNET_LOCK();
+               lnet_peer_decref_locked(msg->msg_rxpeer);
+               LNET_UNLOCK();
+
+               lnet_msg_free(msg);
+       }
+}
+
+void
+lnet_recv_delayed_msg_list(cfs_list_t *head)
+{
+       while (!cfs_list_empty(head)) {
+               lnet_msg_t        *msg;
+               lnet_process_id_t  id;
+
+               msg = cfs_list_entry(head->next, lnet_msg_t, msg_list);
+               cfs_list_del(&msg->msg_list);
+
+               /* md won't disappear under me, since each msg
+                * holds a ref on it */
+
+               id.nid = msg->msg_hdr.src_nid;
+               id.pid = msg->msg_hdr.src_pid;
+
+               LASSERT(msg->msg_delayed);
+               LASSERT(msg->msg_md != NULL);
+               LASSERT(msg->msg_rxpeer != NULL);
+               LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
+
+               CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
+                      "match "LPU64" offset %d length %d.\n",
+                       libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
+                       msg->msg_hdr.msg.put.match_bits,
+                       msg->msg_hdr.msg.put.offset,
+                       msg->msg_hdr.payload_length);
+
+               lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
+       }
+}
+
 /**
  * Initiate an asynchronous PUT operation.
  *
@@ -2477,7 +2153,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
 
         md = lnet_handle2md(&mdh);
         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
-                lnet_msg_free(msg);
+               lnet_msg_free_locked(msg);
 
                 CERROR("Dropping PUT ("LPU64":%d:%s): MD (%d) invalid\n",
                        match_bits, portal, libcfs_id2str(target),
@@ -2492,7 +2168,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
 
         CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
 
-        lnet_commit_md(md, msg);
+       lnet_commit_md(md, msg, 0, 0);
 
         lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
 
@@ -2514,26 +2190,13 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
                         LNET_WIRE_HANDLE_COOKIE_NONE;
         }
 
-        msg->msg_ev.type = LNET_EVENT_SEND;
-        msg->msg_ev.initiator.nid = LNET_NID_ANY;
-        msg->msg_ev.initiator.pid = the_lnet.ln_pid;
-        msg->msg_ev.target = target;
-        msg->msg_ev.sender = LNET_NID_ANY;
-        msg->msg_ev.pt_index = portal;
-        msg->msg_ev.match_bits = match_bits;
-        msg->msg_ev.rlength = md->md_length;
-        msg->msg_ev.mlength = md->md_length;
-        msg->msg_ev.offset = offset;
-        msg->msg_ev.hdr_data = hdr_data;
-
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
-
         the_lnet.ln_counters.send_count++;
         the_lnet.ln_counters.send_length += md->md_length;
 
         LNET_UNLOCK();
 
+       lnet_build_msg_event(msg, LNET_EVENT_SEND);
+
         rc = lnet_send(self, msg);
         if (rc != 0) {
                 CNETERR( "Error sending PUT to %s: %d\n",
@@ -2581,38 +2244,35 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
 
         LASSERT (getmd->md_offset == 0);
 
-        CDEBUG(D_NET, "%s: Reply from %s md %p\n", 
-               libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
-
-        lnet_commit_md (getmd, msg);
+       CDEBUG(D_NET, "%s: Reply from %s md %p\n",
+              libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
 
+       /* setup information for lnet_build_msg_event */
+       msg->msg_from = peer_id.nid;
         msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
+       msg->msg_hdr.src_nid = peer_id.nid;
+       msg->msg_hdr.payload_length = getmd->md_length;
 
-        msg->msg_ev.type = LNET_EVENT_REPLY;
-        msg->msg_ev.initiator = peer_id;
-        msg->msg_ev.sender = peer_id.nid;  /* optimized GETs can't be routed */
-        msg->msg_ev.rlength = msg->msg_ev.mlength = getmd->md_length;
-        msg->msg_ev.offset = 0;
+       lnet_commit_md(getmd, msg, getmd->md_offset, getmd->md_length);
 
-        lnet_md_deconstruct(getmd, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, getmd);
+       the_lnet.ln_counters.recv_count++;
+       the_lnet.ln_counters.recv_length += getmd->md_length;
 
-        the_lnet.ln_counters.recv_count++;
-        the_lnet.ln_counters.recv_length += getmd->md_length;
+       LNET_UNLOCK();
 
-        LNET_UNLOCK();
+       lnet_build_msg_event(msg, LNET_EVENT_REPLY);
 
-        return msg;
+       return msg;
 
  drop_msg:
-        lnet_msg_free(msg);
+       lnet_msg_free_locked(msg);
  drop:
-        the_lnet.ln_counters.drop_count++;
-        the_lnet.ln_counters.drop_length += getmd->md_length;
+       the_lnet.ln_counters.drop_count++;
+       the_lnet.ln_counters.drop_length += getmd->md_length;
 
-        LNET_UNLOCK ();
+       LNET_UNLOCK ();
 
-        return NULL;
+       return NULL;
 }
 
 void
@@ -2682,7 +2342,7 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
 
         md = lnet_handle2md(&mdh);
         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
-                lnet_msg_free(msg);
+               lnet_msg_free_locked(msg);
 
                 CERROR("Dropping GET ("LPU64":%d:%s): MD (%d) invalid\n",
                        match_bits, portal, libcfs_id2str(target),
@@ -2697,7 +2357,7 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
 
         CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
 
-        lnet_commit_md(md, msg);
+       lnet_commit_md(md, msg, 0, 0);
 
         lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
 
@@ -2712,25 +2372,12 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
         msg->msg_hdr.msg.get.return_wmd.wh_object_cookie = 
                 md->md_lh.lh_cookie;
 
-        msg->msg_ev.type = LNET_EVENT_SEND;
-        msg->msg_ev.initiator.nid = LNET_NID_ANY;
-        msg->msg_ev.initiator.pid = the_lnet.ln_pid;
-        msg->msg_ev.target = target;
-        msg->msg_ev.sender = LNET_NID_ANY;
-        msg->msg_ev.pt_index = portal;
-        msg->msg_ev.match_bits = match_bits;
-        msg->msg_ev.rlength = md->md_length;
-        msg->msg_ev.mlength = md->md_length;
-        msg->msg_ev.offset = offset;
-        msg->msg_ev.hdr_data = 0;
-
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
-
         the_lnet.ln_counters.send_count++;
 
         LNET_UNLOCK();
 
+       lnet_build_msg_event(msg, LNET_EVENT_SEND);
+
         rc = lnet_send(self, msg);
         if (rc < 0) {
                 CNETERR( "Error sending GET to %s: %d\n",