X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=a2527181d1e3dbf60441f0e81d232ac00a3dac49;hp=1d78ddbf87e72aa6cdb165b38916e651546875cc;hb=08aa217ce49aba1ded52e0f7adb8a607035123fd;hpb=5e1957841df3e771f3d72d8ea59180213430bbb9 diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 1d78ddb..a252718 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -26,6 +26,8 @@ /* * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -74,7 +76,7 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) ev->target.pid = le32_to_cpu(hdr->dest_pid); ev->initiator.nid = LNET_NID_ANY; ev->initiator.pid = the_lnet.ln_pid; - ev->sender = LNET_NID_ANY; + ev->sender = LNET_NID_ANY; } else { /* event for passive message */ @@ -83,7 +85,7 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) ev->initiator.pid = hdr->src_pid; ev->initiator.nid = hdr->src_nid; ev->rlength = hdr->payload_length; - ev->sender = msg->msg_from; + ev->sender = msg->msg_from; ev->mlength = msg->msg_wanted; ev->offset = msg->msg_offset; } @@ -135,19 +137,27 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) } void -lnet_msg_commit(lnet_msg_t *msg, int sending) +lnet_msg_commit(lnet_msg_t *msg, int cpt) { - struct lnet_msg_container *container = &the_lnet.ln_msg_container; - lnet_counters_t *counters = the_lnet.ln_counters; + struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt]; + lnet_counters_t *counters = the_lnet.ln_counters[cpt]; /* routed message can be committed for both receiving and sending */ LASSERT(!msg->msg_tx_committed); - if (msg->msg_rx_committed) { /* routed message, or reply for GET */ - LASSERT(sending); - LASSERT(msg->msg_onactivelist); + if (msg->msg_sending) { + LASSERT(!msg->msg_receiving); + + msg->msg_tx_cpt = cpt; msg->msg_tx_committed = 1; - return; + if (msg->msg_rx_committed) { /* routed message REPLY */ + LASSERT(msg->msg_onactivelist); + return; + } + } else { + LASSERT(!msg->msg_sending); + msg->msg_rx_cpt = cpt; + msg->msg_rx_committed = 1; } LASSERT(!msg->msg_onactivelist); @@ -157,23 +167,19 @@ lnet_msg_commit(lnet_msg_t *msg, int sending) counters->msgs_alloc++; if (counters->msgs_alloc > counters->msgs_max) counters->msgs_max = counters->msgs_alloc; - - if (sending) - msg->msg_tx_committed = 1; - else - msg->msg_rx_committed = 1; } static void -lnet_msg_tx_decommit(lnet_msg_t *msg, int status) +lnet_msg_decommit_tx(lnet_msg_t *msg, int status) { - lnet_counters_t *counters = the_lnet.ln_counters; - lnet_event_t *ev = &msg->msg_ev; + lnet_counters_t *counters; + lnet_event_t *ev = &msg->msg_ev; LASSERT(msg->msg_tx_committed); if (status != 0) goto out; + counters = the_lnet.ln_counters[msg->msg_tx_cpt]; switch (ev->type) { default: /* routed message */ LASSERT(msg->msg_routing); @@ -200,11 +206,10 @@ lnet_msg_tx_decommit(lnet_msg_t *msg, int status) case LNET_EVENT_GET: LASSERT(msg->msg_rx_committed); - /* overwritten while sending reply */ + /* overwritten while sending reply, we should never be + * here for optimized GET */ LASSERT(msg->msg_type == LNET_MSG_REPLY); - msg->msg_type = LNET_MSG_GET; /* fix type */ - counters->send_length += msg->msg_len; break; } @@ -215,17 +220,18 @@ lnet_msg_tx_decommit(lnet_msg_t *msg, int status) } static void -lnet_msg_rx_decommit(lnet_msg_t *msg, int status) +lnet_msg_decommit_rx(lnet_msg_t *msg, int status) { - lnet_counters_t *counters = the_lnet.ln_counters; - lnet_event_t *ev = &msg->msg_ev; + lnet_counters_t *counters; + lnet_event_t *ev = &msg->msg_ev; - LASSERT(!msg->msg_tx_committed); /* decommitted or uncommitted */ + LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */ LASSERT(msg->msg_rx_committed); if (status != 0) goto out; + counters = the_lnet.ln_counters[msg->msg_rx_cpt]; switch (ev->type) { default: LASSERT(ev->type == 0); @@ -237,7 +243,13 @@ lnet_msg_rx_decommit(lnet_msg_t *msg, int status) break; case LNET_EVENT_GET: - LASSERT(msg->msg_type == LNET_MSG_GET); + /* type is "REPLY" if it's an optimized GET on passive side, + * because optimized GET will never be committed for sending, + * so message type wouldn't be changed back to "GET" by + * lnet_msg_decommit_tx(), see details in lnet_parse_get() */ + LASSERT(msg->msg_type == LNET_MSG_REPLY || + msg->msg_type == LNET_MSG_GET); + counters->send_length += msg->msg_wanted; break; case LNET_EVENT_PUT: @@ -245,8 +257,10 @@ lnet_msg_rx_decommit(lnet_msg_t *msg, int status) break; case LNET_EVENT_REPLY: - LASSERT(msg->msg_type == LNET_MSG_REPLY || - msg->msg_type == LNET_MSG_GET); /* optimized GET */ + /* type is "GET" if it's an optimized GET on active side, + * see details in lnet_create_reply_msg() */ + LASSERT(msg->msg_type == LNET_MSG_GET || + msg->msg_type == LNET_MSG_REPLY); break; } @@ -260,28 +274,44 @@ lnet_msg_rx_decommit(lnet_msg_t *msg, int status) } void -lnet_msg_decommit(lnet_msg_t *msg, int status) +lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status) { - lnet_counters_t *counters = the_lnet.ln_counters; + int cpt2 = cpt; LASSERT(msg->msg_tx_committed || msg->msg_rx_committed); LASSERT(msg->msg_onactivelist); - if (msg->msg_tx_committed) /* always decommit for sending first */ - lnet_msg_tx_decommit(msg, status); + if (msg->msg_tx_committed) { /* always decommit for sending first */ + LASSERT(cpt == msg->msg_tx_cpt); + lnet_msg_decommit_tx(msg, status); + } - if (msg->msg_rx_committed) - lnet_msg_rx_decommit(msg, status); + if (msg->msg_rx_committed) { + /* forwarding msg committed for both receiving and sending */ + if (cpt != msg->msg_rx_cpt) { + lnet_net_unlock(cpt); + cpt2 = msg->msg_rx_cpt; + lnet_net_lock(cpt2); + } + lnet_msg_decommit_rx(msg, status); + } cfs_list_del(&msg->msg_activelist); msg->msg_onactivelist = 0; - counters->msgs_alloc--; + + the_lnet.ln_counters[cpt2]->msgs_alloc--; + + if (cpt2 != cpt) { + lnet_net_unlock(cpt2); + lnet_net_lock(cpt); + } } void lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md, unsigned int offset, unsigned int mlen) { + /* NB: @offset and @len are only useful for receiving */ /* Here, we attach the MD on lnet_msg and mark it busy and * decrementing its threshold. Come what may, the lnet_msg "owns" * the MD until a call to lnet_msg_detach_md or lnet_finalize() @@ -328,8 +358,8 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status) msg->msg_md = NULL; } -void -lnet_complete_msg_locked(lnet_msg_t *msg) +static int +lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) { lnet_handle_wire_t ack_wmd; int rc; @@ -340,10 +370,10 @@ lnet_complete_msg_locked(lnet_msg_t *msg) if (status == 0 && msg->msg_ack) { /* Only send an ACK if the PUT completed successfully */ - lnet_msg_decommit(msg, 0); + lnet_msg_decommit(msg, cpt, 0); - msg->msg_ack = 0; - LNET_UNLOCK(); + msg->msg_ack = 0; + lnet_net_unlock(cpt); LASSERT(msg->msg_ev.type == LNET_EVENT_PUT); LASSERT(!msg->msg_routing); @@ -356,38 +386,61 @@ lnet_complete_msg_locked(lnet_msg_t *msg) msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits; msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength); - rc = lnet_send(msg->msg_ev.target.nid, msg); - - LNET_LOCK(); - - if (rc == 0) - return; - } else if (status == 0 && /* OK so far */ - (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */ - - LASSERT (!msg->msg_receiving); /* called back recv already */ - - LNET_UNLOCK(); - - rc = lnet_send(LNET_NID_ANY, msg); - - LNET_LOCK(); + /* NB: we probably want to use NID of msg::msg_from as 3rd + * parameter (router NID) if it's routed message */ + rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY); + + lnet_net_lock(cpt); + /* + * NB: message is committed for sending, we should return + * on success because LND will finalize this message later. + * + * Also, there is possibility that message is commited for + * sending and also failed before delivering to LND, + * i.e: ENOMEM, in that case we can't fall through either + * because CPT for sending can be different with CPT for + * receiving, so we should return back to lnet_finalize() + * to make sure we are locking the correct partition. + */ + return rc; - if (rc == 0) - return; - } + } else if (status == 0 && /* OK so far */ + (msg->msg_routing && !msg->msg_sending)) { + /* not forwarded */ + LASSERT(!msg->msg_receiving); /* called back recv already */ + lnet_net_unlock(cpt); + + rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY); + + lnet_net_lock(cpt); + /* + * NB: message is committed for sending, we should return + * on success because LND will finalize this message later. + * + * Also, there is possibility that message is commited for + * sending and also failed before delivering to LND, + * i.e: ENOMEM, in that case we can't fall through either: + * - The rule is message must decommit for sending first if + * the it's committed for both sending and receiving + * - CPT for sending can be different with CPT for receiving, + * so we should return back to lnet_finalize() to make + * sure we are locking the correct partition. + */ + return rc; + } - lnet_msg_decommit(msg, status); + lnet_msg_decommit(msg, cpt, status); lnet_msg_free_locked(msg); + return 0; } - void lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) { struct lnet_msg_container *container; int my_slot; int cpt; + int rc; int i; LASSERT (!cfs_in_interrupt ()); @@ -411,9 +464,6 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) msg->msg_txpeer == NULL ? "" : libcfs_nid2str(msg->msg_txpeer->lp_nid), msg->msg_rxpeer == NULL ? "" : libcfs_nid2str(msg->msg_rxpeer->lp_nid)); #endif - - LASSERT (msg->msg_onactivelist); - msg->msg_ev.status = status; if (msg->msg_md != NULL) { @@ -424,6 +474,8 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) lnet_res_unlock(cpt); } + again: + rc = 0; if (!msg->msg_tx_committed && !msg->msg_rx_committed) { /* not commited to network yet */ LASSERT(!msg->msg_onactivelist); @@ -431,8 +483,15 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) return; } - LNET_LOCK(); - container = &the_lnet.ln_msg_container; + /* + * NB: routed message can be commited for both receiving and sending, + * we should finalize in LIFO order and keep counters correct. + * (finalize sending first then finalize receiving) + */ + cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt; + lnet_net_lock(cpt); + + container = the_lnet.ln_msg_containers[cpt]; cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing); /* Recursion breaker. Don't complete the message here if I am (or @@ -442,20 +501,24 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) my_slot = -1; for (i = 0; i < container->msc_nfinalizers; i++) { if (container->msc_finalizers[i] == cfs_current()) - goto out; + break; if (my_slot < 0 && container->msc_finalizers[i] == NULL) my_slot = i; } - if (my_slot < 0) - goto out; + if (i < container->msc_nfinalizers || my_slot < 0) { + lnet_net_unlock(cpt); + return; + } container->msc_finalizers[my_slot] = cfs_current(); #else LASSERT(container->msc_nfinalizers == 1); - if (container->msc_finalizers[0] != NULL) - goto out; + if (container->msc_finalizers[0] != NULL) { + lnet_net_unlock(cpt); + return; + } my_slot = i = 0; container->msc_finalizers[0] = (struct lnet_msg_container *)1; @@ -463,19 +526,24 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) while (!cfs_list_empty(&container->msc_finalizing)) { msg = cfs_list_entry(container->msc_finalizing.next, - lnet_msg_t, msg_list); + lnet_msg_t, msg_list); - cfs_list_del(&msg->msg_list); + cfs_list_del(&msg->msg_list); - /* NB drops and regains the lnet lock if it actually does - * anything, so my finalizing friends can chomp along too */ - lnet_complete_msg_locked(msg); - } + /* NB drops and regains the lnet lock if it actually does + * anything, so my finalizing friends can chomp along too */ + rc = lnet_complete_msg_locked(msg, cpt); + if (rc != 0) + break; + } container->msc_finalizers[my_slot] = NULL; - out: - LNET_UNLOCK(); + lnet_net_unlock(cpt); + + if (rc != 0) + goto again; } +EXPORT_SYMBOL(lnet_finalize); void lnet_msg_container_cleanup(struct lnet_msg_container *container) @@ -512,7 +580,7 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container) } int -lnet_msg_container_setup(struct lnet_msg_container *container) +lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) { int rc; @@ -535,11 +603,11 @@ lnet_msg_container_setup(struct lnet_msg_container *container) rc = 0; #endif /* number of CPUs */ - container->msc_nfinalizers = cfs_cpt_weight(cfs_cpt_table, - CFS_CPT_ANY); - LIBCFS_ALLOC(container->msc_finalizers, - container->msc_nfinalizers * - sizeof(*container->msc_finalizers)); + container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt); + + LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt, + container->msc_nfinalizers * + sizeof(*container->msc_finalizers)); if (container->msc_finalizers == NULL) { CERROR("Failed to allocate message finalizers\n"); @@ -549,3 +617,45 @@ lnet_msg_container_setup(struct lnet_msg_container *container) return rc; } + +void +lnet_msg_containers_destroy(void) +{ + struct lnet_msg_container *container; + int i; + + if (the_lnet.ln_msg_containers == NULL) + return; + + cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) + lnet_msg_container_cleanup(container); + + cfs_percpt_free(the_lnet.ln_msg_containers); + the_lnet.ln_msg_containers = NULL; +} + +int +lnet_msg_containers_create(void) +{ + struct lnet_msg_container *container; + int rc; + int i; + + the_lnet.ln_msg_containers = cfs_percpt_alloc(lnet_cpt_table(), + sizeof(*container)); + + if (the_lnet.ln_msg_containers == NULL) { + CERROR("Failed to allocate cpu-partition data for network\n"); + return -ENOMEM; + } + + cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) { + rc = lnet_msg_container_setup(container, i); + if (rc != 0) { + lnet_msg_containers_destroy(); + return rc; + } + } + + return 0; +}