From 75a8f4b4aa9ad6bf697aedece539e62111e9029a Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Fri, 1 Jun 2012 13:53:01 +0800 Subject: [PATCH] LU-56 lnet: split lnet_commit_md and cleanup Briefly, LNet has two types of operations: 1) resource operations on EQ, MD and ME; 2) network operations on credits, peer, NI etc. Most of them are in different logic blocks, but there are still a few functions have mixed operations, lnet_commit_md() is one of these functions. This patch is a key step for LNet SMP improvements, lnet_commit_md() of current LNet did two things: 1) consume MD threshold and commit it to network; 2) attach the message on active message list to show it might have taken LNet message credits. This patch split it into two sets of functions, lnet_msg_attach/detach_md and lnet_msg_commit/decommit, We need to call lnet_msg_attach_md and lnet_msg_comit before deliver a message to network, call lnet_msg_detach_md and lnet_msg_decommit while finalizing a message. These two sets of functions will be protected by different locks in upcoming patches. Another change in this patch is, we moved most counters operations into lnet_msg_decommit, this will make code cleaner and easier for the next step work. The reason that we put counters operations in lnet_msg_decommit instead of lnet_msg_commit is because we can't guarantee lnet_msg_commit is always called after lnet_msg_attach_md, and counters need information filled by lnet_msg_attach_md, which means lnet_msg_commit might not have enough information for counters. The thing we know for sure is lnet_msg_decommit is always called after lnet_msg_attach_md, so it's safe to put counter operations inside lnet_msg_decommit. Signed-off-by: Liang Zhen Change-Id: Iee961fbff6e6e39ed76c14c0bd5b6560a7730af9 Reviewed-on: http://review.whamcloud.com/2997 Tested-by: Hudson Reviewed-by: Doug Oucharek Reviewed-by: Bobi Jam Tested-by: Maloo Reviewed-by: Andreas Dilger --- lnet/include/lnet/lib-lnet.h | 11 +- lnet/include/lnet/lib-types.h | 3 + lnet/include/lnet/types.h | 4 +- lnet/lnet/lib-move.c | 132 ++++++------------------ lnet/lnet/lib-msg.c | 235 +++++++++++++++++++++++++++++++++++++----- lnet/lnet/lib-ptl.c | 2 +- 6 files changed, 251 insertions(+), 136 deletions(-) diff --git a/lnet/include/lnet/lib-lnet.h b/lnet/include/lnet/lib-lnet.h index be1818a..4c06475 100644 --- a/lnet/include/lnet/lib-lnet.h +++ b/lnet/include/lnet/lib-lnet.h @@ -623,15 +623,20 @@ lnet_remotenet_t *lnet_find_net_locked (__u32 net); int lnet_islocalnid(lnet_nid_t nid); int lnet_islocalnet(__u32 net); -void lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg, - unsigned int offset, unsigned int mlen); +void lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md, + unsigned int offset, unsigned int mlen); +void lnet_msg_detach_md(lnet_msg_t *msg, int status); void lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev); void lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type); +void lnet_msg_commit(lnet_msg_t *msg, int sending); +void lnet_msg_decommit(lnet_msg_t *msg, int status); + void lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev); void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target, unsigned int offset, unsigned int len); int lnet_send(lnet_nid_t nid, lnet_msg_t *msg); -void lnet_return_credits_locked (lnet_msg_t *msg); +void lnet_return_tx_credits_locked(lnet_msg_t *msg); +void lnet_return_rx_credits_locked(lnet_msg_t *msg); /* portals functions */ static inline int diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index a06e33e..3cd61b8 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -191,6 +191,9 @@ typedef struct lnet_msg { lnet_nid_t msg_from; __u32 msg_type; + unsigned int msg_rx_committed:1; + unsigned int msg_tx_committed:1; + unsigned int msg_vmflush:1; /* VM trying to free memory */ unsigned int msg_target_is_router:1; /* sending to a router */ unsigned int msg_routing:1; /* being forwarded */ diff --git a/lnet/include/lnet/types.h b/lnet/include/lnet/types.h index 9ae5fcb..9f9cd8f 100644 --- a/lnet/include/lnet/types.h +++ b/lnet/include/lnet/types.h @@ -342,8 +342,8 @@ typedef struct { * Six types of events can be logged in an event queue. */ typedef enum { - /** An incoming GET operation has completed on the MD. */ - LNET_EVENT_GET, + /** An incoming GET operation has completed on the MD. */ + LNET_EVENT_GET = 1, /** * An incoming PUT operation has completed on the MD. The * underlying layers will not alter the memory (on behalf of this diff --git a/lnet/lnet/lib-move.c b/lnet/lnet/lib-move.c index 2b118a7..3605f4c 100644 --- a/lnet/lnet/lib-move.c +++ b/lnet/lnet/lib-move.c @@ -922,26 +922,6 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send) } #ifdef __KERNEL__ -static void -lnet_commit_routedmsg (lnet_msg_t *msg) -{ - /* ALWAYS called holding the LNET_LOCK */ - LASSERT (msg->msg_routing); - - the_lnet.ln_counters.msgs_alloc++; - if (the_lnet.ln_counters.msgs_alloc > - the_lnet.ln_counters.msgs_max) - the_lnet.ln_counters.msgs_max = - the_lnet.ln_counters.msgs_alloc; - - the_lnet.ln_counters.route_count++; - the_lnet.ln_counters.route_length += msg->msg_len; - - LASSERT (!msg->msg_onactivelist); - msg->msg_onactivelist = 1; - cfs_list_add(&msg->msg_activelist, - &the_lnet.ln_msg_container.msc_active); -} lnet_rtrbufpool_t * lnet_msg2bufpool(lnet_msg_t *msg) @@ -1031,12 +1011,11 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv) #endif void -lnet_return_credits_locked (lnet_msg_t *msg) +lnet_return_tx_credits_locked(lnet_msg_t *msg) { - lnet_peer_t *txpeer = msg->msg_txpeer; - lnet_peer_t *rxpeer = msg->msg_rxpeer; - lnet_msg_t *msg2; - lnet_ni_t *ni; + lnet_peer_t *txpeer = msg->msg_txpeer; + lnet_msg_t *msg2; + lnet_ni_t *ni; if (msg->msg_txcredit) { /* give back NI txcredits */ @@ -1085,8 +1064,15 @@ lnet_return_credits_locked (lnet_msg_t *msg) msg->msg_txpeer = NULL; lnet_peer_decref_locked(txpeer); } +} +void +lnet_return_rx_credits_locked(lnet_msg_t *msg) +{ + lnet_peer_t *rxpeer = msg->msg_rxpeer; #ifdef __KERNEL__ + lnet_msg_t *msg2; + if (msg->msg_rtrcredit) { /* give back global router credits */ lnet_rtrbuf_t *rb; @@ -1237,6 +1223,7 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg) LASSERT (!msg->msg_routing); } + lnet_msg_commit(msg, 1); /* Is this for someone on a local network? */ local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid)); @@ -1353,43 +1340,6 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg) return 0; } -void -lnet_commit_md(lnet_libmd_t *md, lnet_msg_t *msg, - unsigned int offset, unsigned int mlen) -{ - /* ALWAYS called holding the LNET_LOCK */ - /* Here, we commit the MD to a network OP by marking it busy and - * decrementing its threshold. Come what may, the network "owns" - * the MD until a call to lnet_finalize() signals completion. */ - LASSERT (!msg->msg_routing); - - msg->msg_md = md; - lnet_md_deconstruct(md, &msg->msg_ev.md); - lnet_md2handle(&msg->msg_ev.md_handle, md); - - if (msg->msg_receiving) { - msg->msg_offset = offset; - msg->msg_wanted = mlen; - } - - md->md_refcount++; - if (md->md_threshold != LNET_MD_THRESH_INF) { - LASSERT (md->md_threshold > 0); - md->md_threshold--; - } - - the_lnet.ln_counters.msgs_alloc++; - if (the_lnet.ln_counters.msgs_alloc > - the_lnet.ln_counters.msgs_max) - the_lnet.ln_counters.msgs_max = - the_lnet.ln_counters.msgs_alloc; - - LASSERT (!msg->msg_onactivelist); - msg->msg_onactivelist = 1; - cfs_list_add(&msg->msg_activelist, - &the_lnet.ln_msg_container.msc_active); -} - static void lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob) { @@ -1406,13 +1356,6 @@ lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg) { lnet_hdr_t *hdr = &msg->msg_hdr; - LNET_LOCK(); - - the_lnet.ln_counters.recv_count++; - the_lnet.ln_counters.recv_length += msg->msg_wanted; - - LNET_UNLOCK(); - if (msg->msg_wanted != 0) lnet_setpayloadbuffer(msg); @@ -1538,9 +1481,6 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get) LASSERT (rc == LNET_MATCHMD_OK); - the_lnet.ln_counters.send_count++; - the_lnet.ln_counters.send_length += msg->msg_wanted; - LNET_UNLOCK(); lnet_build_msg_event(msg, LNET_EVENT_GET); @@ -1626,14 +1566,11 @@ lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg) libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie); - lnet_commit_md(md, msg, 0, mlength); + lnet_msg_attach_md(msg, md, 0, mlength); if (mlength != 0) lnet_setpayloadbuffer(msg); - the_lnet.ln_counters.recv_count++; - the_lnet.ln_counters.recv_length += mlength; - LNET_UNLOCK(); lnet_build_msg_event(msg, LNET_EVENT_REPLY); @@ -1680,9 +1617,7 @@ lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg) libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), hdr->msg.ack.dst_wmd.wh_object_cookie); - lnet_commit_md(md, msg, 0, 0); - - the_lnet.ln_counters.recv_count++; + lnet_msg_attach_md(msg, md, 0, 0); LNET_UNLOCK(); @@ -1925,8 +1860,11 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, "(error %d looking up sender)\n", libcfs_nid2str(from_nid), libcfs_nid2str(src_nid), lnet_msgtyp2str(type), rc); - goto free_drop; - } + lnet_msg_free(msg); + goto drop; + } + + lnet_msg_commit(msg, 0); LNET_UNLOCK(); #ifndef __KERNEL__ @@ -1947,7 +1885,6 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, goto free_drop; } } - lnet_commit_routedmsg(msg); rc = lnet_post_routed_recv_locked(msg, 0); LNET_UNLOCK(); @@ -1980,6 +1917,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, break; default: LASSERT(0); + rc = -EPROTO; goto free_drop; /* prevent an unused label if !kernel */ } @@ -1989,18 +1927,12 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, LASSERT (rc == ENOENT); free_drop: - LASSERT (msg->msg_md == NULL); - LNET_LOCK(); - if (msg->msg_rxpeer != NULL) { - lnet_peer_decref_locked(msg->msg_rxpeer); - msg->msg_rxpeer = NULL; - } - lnet_msg_free_locked(msg); /* expects LNET_LOCK held */ - LNET_UNLOCK(); + LASSERT(msg->msg_md == NULL); + lnet_finalize(ni, msg, rc); drop: - lnet_drop_message(ni, private, payload_length); - return 0; + lnet_drop_message(ni, private, payload_length); + return 0; } void @@ -2168,7 +2100,7 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack, CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target)); - lnet_commit_md(md, msg, 0, 0); + lnet_msg_attach_md(msg, md, 0, 0); lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length); @@ -2190,9 +2122,6 @@ LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack, LNET_WIRE_HANDLE_COOKIE_NONE; } - the_lnet.ln_counters.send_count++; - the_lnet.ln_counters.send_length += md->md_length; - LNET_UNLOCK(); lnet_build_msg_event(msg, LNET_EVENT_SEND); @@ -2252,11 +2181,10 @@ lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg) msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */ msg->msg_hdr.src_nid = peer_id.nid; msg->msg_hdr.payload_length = getmd->md_length; + msg->msg_receiving = 1; /* required by lnet_msg_attach_md */ - lnet_commit_md(getmd, msg, getmd->md_offset, getmd->md_length); - - the_lnet.ln_counters.recv_count++; - the_lnet.ln_counters.recv_length += getmd->md_length; + lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length); + lnet_msg_commit(msg, 0); LNET_UNLOCK(); @@ -2357,7 +2285,7 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh, CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target)); - lnet_commit_md(md, msg, 0, 0); + lnet_msg_attach_md(msg, md, 0, 0); lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0); @@ -2372,8 +2300,6 @@ LNetGet(lnet_nid_t self, lnet_handle_md_t mdh, msg->msg_hdr.msg.get.return_wmd.wh_object_cookie = md->md_lh.lh_cookie; - the_lnet.ln_counters.send_count++; - LNET_UNLOCK(); lnet_build_msg_event(msg, LNET_EVENT_SEND); diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 59e2420..ca8201e 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -135,6 +135,203 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) } void +lnet_msg_commit(lnet_msg_t *msg, int sending) +{ + struct lnet_msg_container *container = &the_lnet.ln_msg_container; + lnet_counters_t *counters = &the_lnet.ln_counters; + + /* routed message can be committed for both receiving and sending */ + LASSERT(!msg->msg_tx_committed); + + if (msg->msg_rx_committed) { /* routed message, or reply for GET */ + LASSERT(sending); + LASSERT(msg->msg_onactivelist); + msg->msg_tx_committed = 1; + return; + } + + LASSERT(!msg->msg_onactivelist); + msg->msg_onactivelist = 1; + cfs_list_add(&msg->msg_activelist, &container->msc_active); + + counters->msgs_alloc++; + if (counters->msgs_alloc > counters->msgs_max) + counters->msgs_max = counters->msgs_alloc; + + if (sending) + msg->msg_tx_committed = 1; + else + msg->msg_rx_committed = 1; +} + +static void +lnet_msg_tx_decommit(lnet_msg_t *msg, int status) +{ + lnet_counters_t *counters = &the_lnet.ln_counters; + lnet_event_t *ev = &msg->msg_ev; + + LASSERT(msg->msg_tx_committed); + if (status != 0) + goto out; + + switch (ev->type) { + default: /* routed message */ + LASSERT(msg->msg_routing); + LASSERT(msg->msg_rx_committed); + LASSERT(ev->type == 0); + + counters->route_length += msg->msg_len; + counters->route_count++; + goto out; + + case LNET_EVENT_PUT: + /* should have been decommitted */ + LASSERT(!msg->msg_rx_committed); + /* overwritten while sending ACK */ + LASSERT(msg->msg_type == LNET_MSG_ACK); + msg->msg_type = LNET_MSG_PUT; /* fix type */ + break; + + case LNET_EVENT_SEND: + LASSERT(!msg->msg_rx_committed); + if (msg->msg_type == LNET_MSG_PUT) + counters->send_length += msg->msg_len; + break; + + case LNET_EVENT_GET: + LASSERT(msg->msg_rx_committed); + /* overwritten while sending reply */ + LASSERT(msg->msg_type == LNET_MSG_REPLY); + + msg->msg_type = LNET_MSG_GET; /* fix type */ + counters->send_length += msg->msg_len; + break; + } + + counters->send_count++; + out: + lnet_return_tx_credits_locked(msg); + msg->msg_tx_committed = 0; +} + +static void +lnet_msg_rx_decommit(lnet_msg_t *msg, int status) +{ + lnet_counters_t *counters = &the_lnet.ln_counters; + lnet_event_t *ev = &msg->msg_ev; + + LASSERT(!msg->msg_tx_committed); /* decommitted or uncommitted */ + LASSERT(msg->msg_rx_committed); + + if (status != 0) + goto out; + + switch (ev->type) { + default: + LASSERT(ev->type == 0); + LASSERT(msg->msg_routing); + goto out; + + case LNET_EVENT_ACK: + LASSERT(msg->msg_type == LNET_MSG_ACK); + break; + + case LNET_EVENT_GET: + LASSERT(msg->msg_type == LNET_MSG_GET); + break; + + case LNET_EVENT_PUT: + LASSERT(msg->msg_type == LNET_MSG_PUT); + break; + + case LNET_EVENT_REPLY: + LASSERT(msg->msg_type == LNET_MSG_REPLY || + msg->msg_type == LNET_MSG_GET); /* optimized GET */ + break; + } + + counters->recv_count++; + if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY) + counters->recv_length += msg->msg_wanted; + + out: + lnet_return_rx_credits_locked(msg); + msg->msg_rx_committed = 0; +} + +void +lnet_msg_decommit(lnet_msg_t *msg, int status) +{ + lnet_counters_t *counters = &the_lnet.ln_counters; + + LASSERT(msg->msg_tx_committed || msg->msg_rx_committed); + LASSERT(msg->msg_onactivelist); + + if (msg->msg_tx_committed) /* always decommit for sending first */ + lnet_msg_tx_decommit(msg, status); + + if (msg->msg_rx_committed) + lnet_msg_rx_decommit(msg, status); + + cfs_list_del(&msg->msg_activelist); + msg->msg_onactivelist = 0; + counters->msgs_alloc--; +} + +void +lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md, + unsigned int offset, unsigned int mlen) +{ + /* Here, we attach the MD on lnet_msg and mark it busy and + * decrementing its threshold. Come what may, the lnet_msg "owns" + * the MD until a call to lnet_msg_detach_md or lnet_finalize() + * signals completion. */ + LASSERT(!msg->msg_routing); + + msg->msg_md = md; + if (msg->msg_receiving) { /* commited for receiving */ + msg->msg_offset = offset; + msg->msg_wanted = mlen; + } + + md->md_refcount++; + if (md->md_threshold != LNET_MD_THRESH_INF) { + LASSERT(md->md_threshold > 0); + md->md_threshold--; + } + + /* build umd in event */ + lnet_md2handle(&msg->msg_ev.md_handle, md); + lnet_md_deconstruct(md, &msg->msg_ev.md); +} + +void +lnet_msg_detach_md(lnet_msg_t *msg, int status) +{ + lnet_libmd_t *md = msg->msg_md; + int unlink; + + if (md == NULL) + return; + + /* Now it's safe to drop my caller's ref */ + md->md_refcount--; + LASSERT(md->md_refcount >= 0); + + unlink = lnet_md_unlinkable(md); + if (md->md_eq != NULL) { + msg->msg_ev.status = status; + msg->msg_ev.unlinked = unlink; + lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev); + } + + if (unlink) + lnet_md_unlink(md); + + msg->msg_md = NULL; +} + +void lnet_complete_msg_locked(lnet_msg_t *msg) { lnet_handle_wire_t ack_wmd; @@ -146,7 +343,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg) if (status == 0 && msg->msg_ack) { /* Only send an ACK if the PUT completed successfully */ - lnet_return_credits_locked(msg); + lnet_msg_decommit(msg, 0); msg->msg_ack = 0; LNET_UNLOCK(); @@ -183,12 +380,7 @@ lnet_complete_msg_locked(lnet_msg_t *msg) return; } - lnet_return_credits_locked(msg); - - LASSERT (msg->msg_onactivelist); - msg->msg_onactivelist = 0; - cfs_list_del (&msg->msg_activelist); - the_lnet.ln_counters.msgs_alloc--; + lnet_msg_decommit(msg, status); lnet_msg_free_locked(msg); } @@ -197,7 +389,6 @@ void lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) { struct lnet_msg_container *container; - lnet_libmd_t *md; int my_slot; int i; @@ -228,26 +419,16 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) msg->msg_ev.status = status; - md = msg->msg_md; - if (md != NULL) { - int unlink; - - /* Now it's safe to drop my caller's ref */ - md->md_refcount--; - LASSERT (md->md_refcount >= 0); - - unlink = lnet_md_unlinkable(md); - - msg->msg_ev.unlinked = unlink; - - if (md->md_eq != NULL) - lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev); + if (msg->msg_md != NULL) + lnet_msg_detach_md(msg, status); - if (unlink) - lnet_md_unlink(md); - - msg->msg_md = NULL; - } + if (!msg->msg_tx_committed && !msg->msg_rx_committed) { + LNET_UNLOCK(); + /* not commited to network yet */ + LASSERT(!msg->msg_onactivelist); + lnet_msg_free(msg); + return; + } container = &the_lnet.ln_msg_container; cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing); diff --git a/lnet/lnet/lib-ptl.c b/lnet/lnet/lib-ptl.c index 114d02e..5932ad7 100644 --- a/lnet/lnet/lib-ptl.c +++ b/lnet/lnet/lib-ptl.c @@ -144,7 +144,7 @@ lnet_try_match_md(int index, int op_mask, lnet_process_id_t src, index, libcfs_id2str(src), mlength, rlength, md->md_lh.lh_cookie, md->md_niov, offset); - lnet_commit_md(md, msg, offset, mlength); + lnet_msg_attach_md(msg, md, offset, mlength); md->md_offset = offset + mlength; /* Auto-unlink NOW, so the ME gets unlinked if required. -- 1.8.3.1