Whamcloud - gitweb
LU-56 lnet: LNet message event cleanup
[fs/lustre-release.git] / lnet / lnet / lib-move.c
index 96fb6db..2b118a7 100644 (file)
@@ -607,9 +607,9 @@ lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
                 LASSERT(!msg->msg_sending);
                 LASSERT(rlen == msg->msg_len);
                 LASSERT(mlen <= msg->msg_len);
+               LASSERT(msg->msg_offset == offset);
+               LASSERT(msg->msg_wanted == mlen);
 
-                msg->msg_wanted = mlen;
-                msg->msg_offset = offset;
                 msg->msg_receiving = 0;
 
                 if (mlen != 0) {
@@ -1354,7 +1354,8 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
 }
 
 void
-lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
+lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg,
+              unsigned int offset, unsigned int mlen)
 {
         /* ALWAYS called holding the LNET_LOCK */
         /* Here, we commit the MD to a network OP by marking it busy and
@@ -1363,6 +1364,13 @@ lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
         LASSERT (!msg->msg_routing);
 
         msg->msg_md = md;
+       lnet_md_deconstruct(md, &msg->msg_ev.md);
+       lnet_md2handle(&msg->msg_ev.md_handle, md);
+
+       if (msg->msg_receiving) {
+               msg->msg_offset = offset;
+               msg->msg_wanted = mlen;
+       }
 
         md->md_refcount++;
         if (md->md_threshold != LNET_MD_THRESH_INF) {
@@ -1394,35 +1402,29 @@ lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
 }
 
 static void
-lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
-              unsigned int offset, unsigned int mlength)
+lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
 {
-        lnet_hdr_t       *hdr = &msg->msg_hdr;
+       lnet_hdr_t      *hdr = &msg->msg_hdr;
 
-        LNET_LOCK();
+       LNET_LOCK();
 
-        the_lnet.ln_counters.recv_count++;
-        the_lnet.ln_counters.recv_length += mlength;
+       the_lnet.ln_counters.recv_count++;
+       the_lnet.ln_counters.recv_length += msg->msg_wanted;
 
-        LNET_UNLOCK();
+       LNET_UNLOCK();
 
-        if (mlength != 0)
-                lnet_setpayloadbuffer(msg);
+       if (msg->msg_wanted != 0)
+               lnet_setpayloadbuffer(msg);
 
-        msg->msg_ev.type       = LNET_EVENT_PUT;
-        msg->msg_ev.target.pid = hdr->dest_pid;
-        msg->msg_ev.target.nid = hdr->dest_nid;
-        msg->msg_ev.hdr_data   = hdr->msg.put.hdr_data;
+       lnet_build_msg_event(msg, LNET_EVENT_PUT);
 
         /* Must I ACK?  If so I'll grab the ack_wmd out of the header and put
          * it back into the ACK during lnet_finalize() */
-        msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
-                        (md->md_options & LNET_MD_ACK_DISABLE) == 0);
+       msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
+                       (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
 
-        lnet_ni_recv(msg->msg_rxpeer->lp_ni,
-                     msg->msg_private,
-                     msg, delayed, offset, mlength,
-                     hdr->payload_length);
+       lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_delayed,
+                    msg->msg_offset, msg->msg_wanted, hdr->payload_length);
 }
 
 static int
@@ -1433,10 +1435,7 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
         __u64             version;
         lnet_hdr_t       *hdr = &msg->msg_hdr;
         unsigned int      rlength = hdr->payload_length;
-        unsigned int      mlength = 0;
-        unsigned int      offset = 0;
         lnet_process_id_t src= {0};
-        lnet_libmd_t     *md;
         lnet_portal_t    *ptl;
 
         src.nid = hdr->src_nid;
@@ -1454,15 +1453,14 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
  again:
         rc = lnet_match_md(index, LNET_MD_OP_PUT, src,
                            rlength, hdr->msg.put.offset,
-                           hdr->msg.put.match_bits, msg,
-                           &mlength, &offset, &md);
+                          hdr->msg.put.match_bits, msg);
         switch (rc) {
         default:
                 LBUG();
 
         case LNET_MATCHMD_OK:
                 LNET_UNLOCK();
-                lnet_recv_put(md, msg, msg->msg_delayed, offset, mlength);
+               lnet_recv_put(ni, msg);
                 return 0;
 
         case LNET_MATCHMD_NONE:
@@ -1508,11 +1506,8 @@ static int
 lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
 {
         lnet_hdr_t        *hdr = &msg->msg_hdr;
-        unsigned int       mlength = 0;
-        unsigned int       offset = 0;
         lnet_process_id_t  src = {0};
         lnet_handle_wire_t reply_wmd;
-        lnet_libmd_t      *md;
         int                rc;
 
         src.nid = hdr->src_nid;
@@ -1528,8 +1523,7 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
 
         rc = lnet_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
                            hdr->msg.get.sink_length, hdr->msg.get.src_offset,
-                           hdr->msg.get.match_bits, msg,
-                           &mlength, &offset, &md);
+                          hdr->msg.get.match_bits, msg);
         if (rc == LNET_MATCHMD_DROP) {
                 CNETERR("Dropping GET from %s portal %d match "LPU64
                         " offset %d length %d\n",
@@ -1545,18 +1539,16 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
         LASSERT (rc == LNET_MATCHMD_OK);
 
         the_lnet.ln_counters.send_count++;
-        the_lnet.ln_counters.send_length += mlength;
+       the_lnet.ln_counters.send_length += msg->msg_wanted;
 
-        LNET_UNLOCK();
+       LNET_UNLOCK();
 
-        msg->msg_ev.type = LNET_EVENT_GET;
-        msg->msg_ev.target.pid = hdr->dest_pid;
-        msg->msg_ev.target.nid = hdr->dest_nid;
-        msg->msg_ev.hdr_data = 0;
+       lnet_build_msg_event(msg, LNET_EVENT_GET);
 
-        reply_wmd = hdr->msg.get.return_wmd;
+       reply_wmd = hdr->msg.get.return_wmd;
 
-        lnet_prep_send(msg, LNET_MSG_REPLY, src, offset, mlength);
+       lnet_prep_send(msg, LNET_MSG_REPLY, src,
+                      msg->msg_offset, msg->msg_wanted);
 
         msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
 
@@ -1634,27 +1626,18 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), 
                mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
 
-        lnet_commit_md(md, msg);
+       lnet_commit_md(md, msg, 0, mlength);
 
         if (mlength != 0)
                 lnet_setpayloadbuffer(msg);
 
-        msg->msg_ev.type = LNET_EVENT_REPLY;
-        msg->msg_ev.target.pid = hdr->dest_pid;
-        msg->msg_ev.target.nid = hdr->dest_nid;
-        msg->msg_ev.initiator = src;
-        msg->msg_ev.rlength = rlength;
-        msg->msg_ev.mlength = mlength;
-        msg->msg_ev.offset = 0;
-
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
-
         the_lnet.ln_counters.recv_count++;
         the_lnet.ln_counters.recv_length += mlength;
 
         LNET_UNLOCK();
 
+       lnet_build_msg_event(msg, LNET_EVENT_REPLY);
+
         lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
         return 0;
 }
@@ -1697,21 +1680,13 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), 
                hdr->msg.ack.dst_wmd.wh_object_cookie);
 
-        lnet_commit_md(md, msg);
+       lnet_commit_md(md, msg, 0, 0);
 
-        msg->msg_ev.type = LNET_EVENT_ACK;
-        msg->msg_ev.target.pid = hdr->dest_pid;
-        msg->msg_ev.target.nid = hdr->dest_nid;
-        msg->msg_ev.initiator = src;
-        msg->msg_ev.mlength = hdr->msg.ack.mlength;
-        msg->msg_ev.match_bits = hdr->msg.ack.match_bits;
+       the_lnet.ln_counters.recv_count++;
 
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
+       LNET_UNLOCK();
 
-        the_lnet.ln_counters.recv_count++;
-
-        LNET_UNLOCK();
+       lnet_build_msg_event(msg, LNET_EVENT_ACK);
 
         lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
         return 0;
@@ -1939,6 +1914,8 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         msg->msg_len = msg->msg_wanted = payload_length;
         msg->msg_offset = 0;
         msg->msg_hdr = *hdr;
+       /* for building message event */
+       msg->msg_from = from_nid;
 
         LNET_LOCK();
         rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid);
@@ -1988,8 +1965,6 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
         msg->msg_hdr.dest_pid = dest_pid;
         msg->msg_hdr.payload_length = payload_length;
 
-        msg->msg_ev.sender = from_nid;
-
         switch (type) {
         case LNET_MSG_ACK:
                 rc = lnet_parse_ack(ni, msg);
@@ -2097,8 +2072,7 @@ lnet_recv_delayed_msg_list(cfs_list_t *head)
                        msg->msg_hdr.msg.put.offset,
                        msg->msg_hdr.payload_length);
 
-               lnet_recv_put(msg->msg_md, msg, 1,
-                             msg->msg_ev.offset, msg->msg_ev.mlength);
+               lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
        }
 }
 
@@ -2194,7 +2168,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
 
         CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
 
-        lnet_commit_md(md, msg);
+       lnet_commit_md(md, msg, 0, 0);
 
         lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
 
@@ -2216,26 +2190,13 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
                         LNET_WIRE_HANDLE_COOKIE_NONE;
         }
 
-        msg->msg_ev.type = LNET_EVENT_SEND;
-        msg->msg_ev.initiator.nid = LNET_NID_ANY;
-        msg->msg_ev.initiator.pid = the_lnet.ln_pid;
-        msg->msg_ev.target = target;
-        msg->msg_ev.sender = LNET_NID_ANY;
-        msg->msg_ev.pt_index = portal;
-        msg->msg_ev.match_bits = match_bits;
-        msg->msg_ev.rlength = md->md_length;
-        msg->msg_ev.mlength = md->md_length;
-        msg->msg_ev.offset = offset;
-        msg->msg_ev.hdr_data = hdr_data;
-
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
-
         the_lnet.ln_counters.send_count++;
         the_lnet.ln_counters.send_length += md->md_length;
 
         LNET_UNLOCK();
 
+       lnet_build_msg_event(msg, LNET_EVENT_SEND);
+
         rc = lnet_send(self, msg);
         if (rc != 0) {
                 CNETERR( "Error sending PUT to %s: %d\n",
@@ -2283,38 +2244,35 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
 
         LASSERT (getmd->md_offset == 0);
 
-        CDEBUG(D_NET, "%s: Reply from %s md %p\n", 
-               libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
-
-        lnet_commit_md (getmd, msg);
+       CDEBUG(D_NET, "%s: Reply from %s md %p\n",
+              libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
 
+       /* setup information for lnet_build_msg_event */
+       msg->msg_from = peer_id.nid;
         msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
+       msg->msg_hdr.src_nid = peer_id.nid;
+       msg->msg_hdr.payload_length = getmd->md_length;
 
-        msg->msg_ev.type = LNET_EVENT_REPLY;
-        msg->msg_ev.initiator = peer_id;
-        msg->msg_ev.sender = peer_id.nid;  /* optimized GETs can't be routed */
-        msg->msg_ev.rlength = msg->msg_ev.mlength = getmd->md_length;
-        msg->msg_ev.offset = 0;
+       lnet_commit_md(getmd, msg, getmd->md_offset, getmd->md_length);
 
-        lnet_md_deconstruct(getmd, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, getmd);
+       the_lnet.ln_counters.recv_count++;
+       the_lnet.ln_counters.recv_length += getmd->md_length;
 
-        the_lnet.ln_counters.recv_count++;
-        the_lnet.ln_counters.recv_length += getmd->md_length;
+       LNET_UNLOCK();
 
-        LNET_UNLOCK();
+       lnet_build_msg_event(msg, LNET_EVENT_REPLY);
 
-        return msg;
+       return msg;
 
  drop_msg:
        lnet_msg_free_locked(msg);
  drop:
-        the_lnet.ln_counters.drop_count++;
-        the_lnet.ln_counters.drop_length += getmd->md_length;
+       the_lnet.ln_counters.drop_count++;
+       the_lnet.ln_counters.drop_length += getmd->md_length;
 
-        LNET_UNLOCK ();
+       LNET_UNLOCK ();
 
-        return NULL;
+       return NULL;
 }
 
 void
@@ -2399,7 +2357,7 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
 
         CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
 
-        lnet_commit_md(md, msg);
+       lnet_commit_md(md, msg, 0, 0);
 
         lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
 
@@ -2414,25 +2372,12 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
         msg->msg_hdr.msg.get.return_wmd.wh_object_cookie = 
                 md->md_lh.lh_cookie;
 
-        msg->msg_ev.type = LNET_EVENT_SEND;
-        msg->msg_ev.initiator.nid = LNET_NID_ANY;
-        msg->msg_ev.initiator.pid = the_lnet.ln_pid;
-        msg->msg_ev.target = target;
-        msg->msg_ev.sender = LNET_NID_ANY;
-        msg->msg_ev.pt_index = portal;
-        msg->msg_ev.match_bits = match_bits;
-        msg->msg_ev.rlength = md->md_length;
-        msg->msg_ev.mlength = md->md_length;
-        msg->msg_ev.offset = offset;
-        msg->msg_ev.hdr_data = 0;
-
-        lnet_md_deconstruct(md, &msg->msg_ev.md);
-        lnet_md2handle(&msg->msg_ev.md_handle, md);
-
         the_lnet.ln_counters.send_count++;
 
         LNET_UNLOCK();
 
+       lnet_build_msg_event(msg, LNET_EVENT_SEND);
+
         rc = lnet_send(self, msg);
         if (rc < 0) {
                 CNETERR( "Error sending GET to %s: %d\n",