Whamcloud - gitweb
LU-56 lnet: split lnet_commit_md and cleanup
authorLiang Zhen <liang@whamcloud.com>
Fri, 1 Jun 2012 05:53:01 +0000 (13:53 +0800)
committerAndreas Dilger <adilger@whamcloud.com>
Fri, 22 Jun 2012 07:41:15 +0000 (03:41 -0400)
Briefly, LNet has two types of operations: 1) resource operations
on EQ, MD and ME; 2) network operations on credits, peer, NI etc.
Most of them are in different logic blocks, but there are still
a few functions have mixed operations, lnet_commit_md() is one of
these functions.

This patch is a key step for LNet SMP improvements, lnet_commit_md()
of current LNet did two things: 1) consume MD threshold and commit
it to network; 2) attach the message on active message list to show
it might have taken LNet message credits. This patch split it into
two sets of functions, lnet_msg_attach/detach_md and
lnet_msg_commit/decommit,

We need to call lnet_msg_attach_md and lnet_msg_comit before deliver
a message to network, call lnet_msg_detach_md and lnet_msg_decommit
while finalizing a message. These two sets of functions will be
protected by different locks in upcoming patches.

Another change in this patch is, we moved most counters operations
into lnet_msg_decommit, this will make code cleaner and easier for
the next step work. The reason that we put counters operations in
lnet_msg_decommit instead of lnet_msg_commit is because we can't
guarantee lnet_msg_commit is always called after lnet_msg_attach_md,
and counters need information filled by lnet_msg_attach_md, which
means lnet_msg_commit might not have enough information for counters.
The thing we know for sure is lnet_msg_decommit is always called
after lnet_msg_attach_md, so it's safe to put counter operations
inside lnet_msg_decommit.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: Iee961fbff6e6e39ed76c14c0bd5b6560a7730af9
Reviewed-on: http://review.whamcloud.com/2997
Tested-by: Hudson
Reviewed-by: Doug Oucharek <doug@whamcloud.com>
Reviewed-by: Bobi Jam <bobijam@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/include/lnet/types.h
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/lnet/lib-ptl.c

index be1818a..4c06475 100644 (file)
@@ -623,15 +623,20 @@ lnet_remotenet_t *lnet_find_net_locked (__u32 net);
 int lnet_islocalnid(lnet_nid_t nid);
 int lnet_islocalnet(__u32 net);
 
-void lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg,
-                   unsigned int offset, unsigned int mlen);
+void lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
+                       unsigned int offset, unsigned int mlen);
+void lnet_msg_detach_md(lnet_msg_t *msg, int status);
 void lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev);
 void lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type);
+void lnet_msg_commit(lnet_msg_t *msg, int sending);
+void lnet_msg_decommit(lnet_msg_t *msg, int status);
+
 void lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev);
 void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
                     unsigned int offset, unsigned int len);
 int lnet_send(lnet_nid_t nid, lnet_msg_t *msg);
-void lnet_return_credits_locked (lnet_msg_t *msg);
+void lnet_return_tx_credits_locked(lnet_msg_t *msg);
+void lnet_return_rx_credits_locked(lnet_msg_t *msg);
 
 /* portals functions */
 static inline int
index a06e33e..3cd61b8 100644 (file)
@@ -191,6 +191,9 @@ typedef struct lnet_msg {
        lnet_nid_t              msg_from;
        __u32                   msg_type;
 
+       unsigned int            msg_rx_committed:1;
+       unsigned int            msg_tx_committed:1;
+
         unsigned int          msg_vmflush:1;      /* VM trying to free memory */
         unsigned int          msg_target_is_router:1; /* sending to a router */
         unsigned int          msg_routing:1;      /* being forwarded */
index 9ae5fcb..9f9cd8f 100644 (file)
@@ -342,8 +342,8 @@ typedef struct {
  * Six types of events can be logged in an event queue.
  */
 typedef enum {
-        /** An incoming GET operation has completed on the MD. */
-        LNET_EVENT_GET,
+       /** An incoming GET operation has completed on the MD. */
+       LNET_EVENT_GET          = 1,
         /**
          * An incoming PUT operation has completed on the MD. The
          * underlying layers will not alter the memory (on behalf of this
index 2b118a7..3605f4c 100644 (file)
@@ -922,26 +922,6 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send)
 }
 
 #ifdef __KERNEL__
-static void
-lnet_commit_routedmsg (lnet_msg_t *msg)
-{
-        /* ALWAYS called holding the LNET_LOCK */
-        LASSERT (msg->msg_routing);
-
-        the_lnet.ln_counters.msgs_alloc++;
-        if (the_lnet.ln_counters.msgs_alloc >
-            the_lnet.ln_counters.msgs_max)
-                the_lnet.ln_counters.msgs_max =
-                        the_lnet.ln_counters.msgs_alloc;
-
-        the_lnet.ln_counters.route_count++;
-        the_lnet.ln_counters.route_length += msg->msg_len;
-
-        LASSERT (!msg->msg_onactivelist);
-        msg->msg_onactivelist = 1;
-       cfs_list_add(&msg->msg_activelist,
-                    &the_lnet.ln_msg_container.msc_active);
-}
 
 lnet_rtrbufpool_t *
 lnet_msg2bufpool(lnet_msg_t *msg)
@@ -1031,12 +1011,11 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
 #endif
 
 void
-lnet_return_credits_locked (lnet_msg_t *msg)
+lnet_return_tx_credits_locked(lnet_msg_t *msg)
 {
-        lnet_peer_t       *txpeer = msg->msg_txpeer;
-        lnet_peer_t       *rxpeer = msg->msg_rxpeer;
-        lnet_msg_t        *msg2;
-        lnet_ni_t         *ni;
+       lnet_peer_t     *txpeer = msg->msg_txpeer;
+       lnet_msg_t      *msg2;
+       lnet_ni_t       *ni;
 
         if (msg->msg_txcredit) {
                 /* give back NI txcredits */
@@ -1085,8 +1064,15 @@ lnet_return_credits_locked (lnet_msg_t *msg)
                 msg->msg_txpeer = NULL;
                 lnet_peer_decref_locked(txpeer);
         }
+}
 
+void
+lnet_return_rx_credits_locked(lnet_msg_t *msg)
+{
+       lnet_peer_t     *rxpeer = msg->msg_rxpeer;
 #ifdef __KERNEL__
+       lnet_msg_t      *msg2;
+
         if (msg->msg_rtrcredit) {
                 /* give back global router credits */
                 lnet_rtrbuf_t     *rb;
@@ -1237,6 +1223,7 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
                 LASSERT (!msg->msg_routing);
         }
 
+       lnet_msg_commit(msg, 1);
         /* Is this for someone on a local network? */
         local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid));
 
@@ -1353,43 +1340,6 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
         return 0;
 }
 
-void
-lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg,
-              unsigned int offset, unsigned int mlen)
-{
-        /* ALWAYS called holding the LNET_LOCK */
-        /* Here, we commit the MD to a network OP by marking it busy and
-         * decrementing its threshold.  Come what may, the network "owns"
-         * the MD until a call to lnet_finalize() signals completion. */
-        LASSERT (!msg->msg_routing);
-
-        msg->msg_md = md;
-       lnet_md_deconstruct(md, &msg->msg_ev.md);
-       lnet_md2handle(&msg->msg_ev.md_handle, md);
-
-       if (msg->msg_receiving) {
-               msg->msg_offset = offset;
-               msg->msg_wanted = mlen;
-       }
-
-        md->md_refcount++;
-        if (md->md_threshold != LNET_MD_THRESH_INF) {
-                LASSERT (md->md_threshold > 0);
-                md->md_threshold--;
-        }
-
-        the_lnet.ln_counters.msgs_alloc++;
-        if (the_lnet.ln_counters.msgs_alloc > 
-            the_lnet.ln_counters.msgs_max)
-                the_lnet.ln_counters.msgs_max = 
-                        the_lnet.ln_counters.msgs_alloc;
-
-        LASSERT (!msg->msg_onactivelist);
-        msg->msg_onactivelist = 1;
-       cfs_list_add(&msg->msg_activelist,
-                    &the_lnet.ln_msg_container.msc_active);
-}
-
 static void
 lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
 {
@@ -1406,13 +1356,6 @@ lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
 {
        lnet_hdr_t      *hdr = &msg->msg_hdr;
 
-       LNET_LOCK();
-
-       the_lnet.ln_counters.recv_count++;
-       the_lnet.ln_counters.recv_length += msg->msg_wanted;
-
-       LNET_UNLOCK();
-
        if (msg->msg_wanted != 0)
                lnet_setpayloadbuffer(msg);
 
@@ -1538,9 +1481,6 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
 
         LASSERT (rc == LNET_MATCHMD_OK);
 
-        the_lnet.ln_counters.send_count++;
-       the_lnet.ln_counters.send_length += msg->msg_wanted;
-
        LNET_UNLOCK();
 
        lnet_build_msg_event(msg, LNET_EVENT_GET);
@@ -1626,14 +1566,11 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), 
                mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
 
-       lnet_commit_md(md, msg, 0, mlength);
+       lnet_msg_attach_md(msg, md, 0, mlength);
 
         if (mlength != 0)
                 lnet_setpayloadbuffer(msg);
 
-        the_lnet.ln_counters.recv_count++;
-        the_lnet.ln_counters.recv_length += mlength;
-
         LNET_UNLOCK();
 
        lnet_build_msg_event(msg, LNET_EVENT_REPLY);
@@ -1680,9 +1617,7 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), 
                hdr->msg.ack.dst_wmd.wh_object_cookie);
 
-       lnet_commit_md(md, msg, 0, 0);
-
-       the_lnet.ln_counters.recv_count++;
+       lnet_msg_attach_md(msg, md, 0, 0);
 
        LNET_UNLOCK();
 
@@ -1925,8 +1860,11 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
                        "(error %d looking up sender)\n",
                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
                        lnet_msgtyp2str(type), rc);
-                goto free_drop;
-        }
+               lnet_msg_free(msg);
+               goto drop;
+       }
+
+       lnet_msg_commit(msg, 0);
         LNET_UNLOCK();
 
 #ifndef __KERNEL__
@@ -1947,7 +1885,6 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
                                 goto free_drop;
                         }
                 }
-                lnet_commit_routedmsg(msg);
                 rc = lnet_post_routed_recv_locked(msg, 0);
                 LNET_UNLOCK();
 
@@ -1980,6 +1917,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
                 break;
         default:
                 LASSERT(0);
+               rc = -EPROTO;
                 goto free_drop;  /* prevent an unused label if !kernel */
         }
 
@@ -1989,18 +1927,12 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         LASSERT (rc == ENOENT);
 
  free_drop:
-        LASSERT (msg->msg_md == NULL);
-        LNET_LOCK();
-        if (msg->msg_rxpeer != NULL) {
-                lnet_peer_decref_locked(msg->msg_rxpeer);
-                msg->msg_rxpeer = NULL;
-        }
-       lnet_msg_free_locked(msg);      /* expects LNET_LOCK held */
-        LNET_UNLOCK();
+       LASSERT(msg->msg_md == NULL);
+       lnet_finalize(ni, msg, rc);
 
  drop:
-        lnet_drop_message(ni, private, payload_length);
-        return 0;
+       lnet_drop_message(ni, private, payload_length);
+       return 0;
 }
 
 void
@@ -2168,7 +2100,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
 
         CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
 
-       lnet_commit_md(md, msg, 0, 0);
+       lnet_msg_attach_md(msg, md, 0, 0);
 
         lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
 
@@ -2190,9 +2122,6 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
                         LNET_WIRE_HANDLE_COOKIE_NONE;
         }
 
-        the_lnet.ln_counters.send_count++;
-        the_lnet.ln_counters.send_length += md->md_length;
-
         LNET_UNLOCK();
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
@@ -2252,11 +2181,10 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
         msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
        msg->msg_hdr.src_nid = peer_id.nid;
        msg->msg_hdr.payload_length = getmd->md_length;
+       msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
 
-       lnet_commit_md(getmd, msg, getmd->md_offset, getmd->md_length);
-
-       the_lnet.ln_counters.recv_count++;
-       the_lnet.ln_counters.recv_length += getmd->md_length;
+       lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
+       lnet_msg_commit(msg, 0);
 
        LNET_UNLOCK();
 
@@ -2357,7 +2285,7 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
 
         CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
 
-       lnet_commit_md(md, msg, 0, 0);
+       lnet_msg_attach_md(msg, md, 0, 0);
 
         lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
 
@@ -2372,8 +2300,6 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
         msg->msg_hdr.msg.get.return_wmd.wh_object_cookie = 
                 md->md_lh.lh_cookie;
 
-        the_lnet.ln_counters.send_count++;
-
         LNET_UNLOCK();
 
        lnet_build_msg_event(msg, LNET_EVENT_SEND);
index 59e2420..ca8201e 100644 (file)
@@ -135,6 +135,203 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
 }
 
 void
+lnet_msg_commit(lnet_msg_t *msg, int sending)
+{
+       struct lnet_msg_container *container = &the_lnet.ln_msg_container;
+       lnet_counters_t           *counters  = &the_lnet.ln_counters;
+
+       /* routed message can be committed for both receiving and sending */
+       LASSERT(!msg->msg_tx_committed);
+
+       if (msg->msg_rx_committed) { /* routed message, or reply for GET */
+               LASSERT(sending);
+               LASSERT(msg->msg_onactivelist);
+               msg->msg_tx_committed = 1;
+               return;
+       }
+
+       LASSERT(!msg->msg_onactivelist);
+       msg->msg_onactivelist = 1;
+       cfs_list_add(&msg->msg_activelist, &container->msc_active);
+
+       counters->msgs_alloc++;
+       if (counters->msgs_alloc > counters->msgs_max)
+               counters->msgs_max = counters->msgs_alloc;
+
+       if (sending)
+               msg->msg_tx_committed = 1;
+       else
+               msg->msg_rx_committed = 1;
+}
+
+static void
+lnet_msg_tx_decommit(lnet_msg_t *msg, int status)
+{
+       lnet_counters_t *counters = &the_lnet.ln_counters;
+       lnet_event_t *ev = &msg->msg_ev;
+
+       LASSERT(msg->msg_tx_committed);
+       if (status != 0)
+               goto out;
+
+       switch (ev->type) {
+       default: /* routed message */
+               LASSERT(msg->msg_routing);
+               LASSERT(msg->msg_rx_committed);
+               LASSERT(ev->type == 0);
+
+               counters->route_length += msg->msg_len;
+               counters->route_count++;
+               goto out;
+
+       case LNET_EVENT_PUT:
+               /* should have been decommitted */
+               LASSERT(!msg->msg_rx_committed);
+               /* overwritten while sending ACK */
+               LASSERT(msg->msg_type == LNET_MSG_ACK);
+               msg->msg_type = LNET_MSG_PUT; /* fix type */
+               break;
+
+       case LNET_EVENT_SEND:
+               LASSERT(!msg->msg_rx_committed);
+               if (msg->msg_type == LNET_MSG_PUT)
+                       counters->send_length += msg->msg_len;
+               break;
+
+       case LNET_EVENT_GET:
+               LASSERT(msg->msg_rx_committed);
+               /* overwritten while sending reply */
+               LASSERT(msg->msg_type == LNET_MSG_REPLY);
+
+               msg->msg_type = LNET_MSG_GET; /* fix type */
+               counters->send_length += msg->msg_len;
+               break;
+       }
+
+       counters->send_count++;
+ out:
+       lnet_return_tx_credits_locked(msg);
+       msg->msg_tx_committed = 0;
+}
+
+static void
+lnet_msg_rx_decommit(lnet_msg_t *msg, int status)
+{
+       lnet_counters_t *counters = &the_lnet.ln_counters;
+       lnet_event_t *ev = &msg->msg_ev;
+
+       LASSERT(!msg->msg_tx_committed); /* decommitted or uncommitted */
+       LASSERT(msg->msg_rx_committed);
+
+       if (status != 0)
+               goto out;
+
+       switch (ev->type) {
+       default:
+               LASSERT(ev->type == 0);
+               LASSERT(msg->msg_routing);
+               goto out;
+
+       case LNET_EVENT_ACK:
+               LASSERT(msg->msg_type == LNET_MSG_ACK);
+               break;
+
+       case LNET_EVENT_GET:
+               LASSERT(msg->msg_type == LNET_MSG_GET);
+               break;
+
+       case LNET_EVENT_PUT:
+               LASSERT(msg->msg_type == LNET_MSG_PUT);
+               break;
+
+       case LNET_EVENT_REPLY:
+               LASSERT(msg->msg_type == LNET_MSG_REPLY ||
+                       msg->msg_type == LNET_MSG_GET); /* optimized GET */
+               break;
+       }
+
+       counters->recv_count++;
+       if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
+               counters->recv_length += msg->msg_wanted;
+
+ out:
+       lnet_return_rx_credits_locked(msg);
+       msg->msg_rx_committed = 0;
+}
+
+void
+lnet_msg_decommit(lnet_msg_t *msg, int status)
+{
+       lnet_counters_t *counters = &the_lnet.ln_counters;
+
+       LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
+       LASSERT(msg->msg_onactivelist);
+
+       if (msg->msg_tx_committed) /* always decommit for sending first */
+               lnet_msg_tx_decommit(msg, status);
+
+       if (msg->msg_rx_committed)
+               lnet_msg_rx_decommit(msg, status);
+
+       cfs_list_del(&msg->msg_activelist);
+       msg->msg_onactivelist = 0;
+       counters->msgs_alloc--;
+}
+
+void
+lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
+                  unsigned int offset, unsigned int mlen)
+{
+       /* Here, we attach the MD on lnet_msg and mark it busy and
+        * decrementing its threshold. Come what may, the lnet_msg "owns"
+        * the MD until a call to lnet_msg_detach_md or lnet_finalize()
+        * signals completion. */
+       LASSERT(!msg->msg_routing);
+
+       msg->msg_md = md;
+       if (msg->msg_receiving) { /* commited for receiving */
+               msg->msg_offset = offset;
+               msg->msg_wanted = mlen;
+       }
+
+       md->md_refcount++;
+       if (md->md_threshold != LNET_MD_THRESH_INF) {
+               LASSERT(md->md_threshold > 0);
+               md->md_threshold--;
+       }
+
+       /* build umd in event */
+       lnet_md2handle(&msg->msg_ev.md_handle, md);
+       lnet_md_deconstruct(md, &msg->msg_ev.md);
+}
+
+void
+lnet_msg_detach_md(lnet_msg_t *msg, int status)
+{
+       lnet_libmd_t    *md = msg->msg_md;
+       int             unlink;
+
+       if (md == NULL)
+               return;
+
+       /* Now it's safe to drop my caller's ref */
+       md->md_refcount--;
+       LASSERT(md->md_refcount >= 0);
+
+       unlink = lnet_md_unlinkable(md);
+       if (md->md_eq != NULL) {
+               msg->msg_ev.status   = status;
+               msg->msg_ev.unlinked = unlink;
+               lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
+       }
+
+       if (unlink)
+               lnet_md_unlink(md);
+
+       msg->msg_md = NULL;
+}
+
+void
 lnet_complete_msg_locked(lnet_msg_t *msg)
 {
         lnet_handle_wire_t ack_wmd;
@@ -146,7 +343,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg)
         if (status == 0 && msg->msg_ack) {
                 /* Only send an ACK if the PUT completed successfully */
 
-                lnet_return_credits_locked(msg);
+               lnet_msg_decommit(msg, 0);
 
                 msg->msg_ack = 0;
                 LNET_UNLOCK();
@@ -183,12 +380,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg)
                         return;
         }
 
-        lnet_return_credits_locked(msg);
-
-        LASSERT (msg->msg_onactivelist);
-        msg->msg_onactivelist = 0;
-        cfs_list_del (&msg->msg_activelist);
-        the_lnet.ln_counters.msgs_alloc--;
+       lnet_msg_decommit(msg, status);
        lnet_msg_free_locked(msg);
 }
 
@@ -197,7 +389,6 @@ void
 lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
 {
        struct lnet_msg_container       *container;
-       lnet_libmd_t                    *md;
        int                             my_slot;
        int                             i;
 
@@ -228,26 +419,16 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
 
         msg->msg_ev.status = status;
 
-        md = msg->msg_md;
-        if (md != NULL) {
-                int      unlink;
-
-                /* Now it's safe to drop my caller's ref */
-                md->md_refcount--;
-                LASSERT (md->md_refcount >= 0);
-
-                unlink = lnet_md_unlinkable(md);
-
-                msg->msg_ev.unlinked = unlink;
-
-                if (md->md_eq != NULL)
-                       lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
+       if (msg->msg_md != NULL)
+               lnet_msg_detach_md(msg, status);
 
-                if (unlink)
-                        lnet_md_unlink(md);
-
-                msg->msg_md = NULL;
-        }
+       if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
+               LNET_UNLOCK();
+               /* not commited to network yet */
+               LASSERT(!msg->msg_onactivelist);
+               lnet_msg_free(msg);
+               return;
+       }
 
        container = &the_lnet.ln_msg_container;
        cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing);
index 114d02e..5932ad7 100644 (file)
@@ -144,7 +144,7 @@ lnet_try_match_md(int index, int op_mask, lnet_process_id_t src,
               index, libcfs_id2str(src), mlength, rlength,
               md->md_lh.lh_cookie, md->md_niov, offset);
 
-       lnet_commit_md(md, msg, offset, mlength);
+       lnet_msg_attach_md(msg, md, offset, mlength);
        md->md_offset = offset + mlength;
 
        /* Auto-unlink NOW, so the ME gets unlinked if required.