X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=a2527181d1e3dbf60441f0e81d232ac00a3dac49;hb=7337db63e1212ef4422a5fe88e1e42e03161daeb;hp=6c705f01c35384c3044352990c6e041f224db8c1;hpb=a07e9d350b3e500c7be877f6dcf54380b86a9cbe;p=fs%2Flustre-release.git diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 6c705f0..a252718 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -26,6 +26,8 @@ /* * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -74,7 +76,7 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) ev->target.pid = le32_to_cpu(hdr->dest_pid); ev->initiator.nid = LNET_NID_ANY; ev->initiator.pid = the_lnet.ln_pid; - ev->sender = LNET_NID_ANY; + ev->sender = LNET_NID_ANY; } else { /* event for passive message */ @@ -83,7 +85,7 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) ev->initiator.pid = hdr->src_pid; ev->initiator.nid = hdr->src_nid; ev->rlength = hdr->payload_length; - ev->sender = msg->msg_from; + ev->sender = msg->msg_from; ev->mlength = msg->msg_wanted; ev->offset = msg->msg_offset; } @@ -204,11 +206,10 @@ lnet_msg_decommit_tx(lnet_msg_t *msg, int status) case LNET_EVENT_GET: LASSERT(msg->msg_rx_committed); - /* overwritten while sending reply */ + /* overwritten while sending reply, we should never be + * here for optimized GET */ LASSERT(msg->msg_type == LNET_MSG_REPLY); - msg->msg_type = LNET_MSG_GET; /* fix type */ - counters->send_length += msg->msg_len; break; } @@ -230,6 +231,7 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status) if (status != 0) goto out; + counters = the_lnet.ln_counters[msg->msg_rx_cpt]; switch (ev->type) { default: LASSERT(ev->type == 0); @@ -241,7 +243,13 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status) break; case LNET_EVENT_GET: - LASSERT(msg->msg_type == LNET_MSG_GET); + /* type is "REPLY" if it's an optimized GET on passive side, + * because optimized GET will never be committed for sending, + * so message type wouldn't be changed back to "GET" by + * lnet_msg_decommit_tx(), see details in lnet_parse_get() */ + LASSERT(msg->msg_type == LNET_MSG_REPLY || + msg->msg_type == LNET_MSG_GET); + counters->send_length += msg->msg_wanted; break; case LNET_EVENT_PUT: @@ -249,12 +257,13 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status) break; case LNET_EVENT_REPLY: - LASSERT(msg->msg_type == LNET_MSG_REPLY || - msg->msg_type == LNET_MSG_GET); /* optimized GET */ + /* type is "GET" if it's an optimized GET on active side, + * see details in lnet_create_reply_msg() */ + LASSERT(msg->msg_type == LNET_MSG_GET || + msg->msg_type == LNET_MSG_REPLY); break; } - counters = the_lnet.ln_counters[msg->msg_rx_cpt]; counters->recv_count++; if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY) counters->recv_length += msg->msg_wanted; @@ -349,7 +358,7 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status) msg->msg_md = NULL; } -void +static int lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) { lnet_handle_wire_t ack_wmd; @@ -382,9 +391,19 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY); lnet_net_lock(cpt); + /* + * NB: message is committed for sending, we should return + * on success because LND will finalize this message later. + * + * Also, there is possibility that message is commited for + * sending and also failed before delivering to LND, + * i.e: ENOMEM, in that case we can't fall through either + * because CPT for sending can be different with CPT for + * receiving, so we should return back to lnet_finalize() + * to make sure we are locking the correct partition. + */ + return rc; - if (rc == 0) - return; } else if (status == 0 && /* OK so far */ (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */ @@ -394,13 +413,25 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY); lnet_net_lock(cpt); - - if (rc == 0) - return; + /* + * NB: message is committed for sending, we should return + * on success because LND will finalize this message later. + * + * Also, there is possibility that message is commited for + * sending and also failed before delivering to LND, + * i.e: ENOMEM, in that case we can't fall through either: + * - The rule is message must decommit for sending first if + * the it's committed for both sending and receiving + * - CPT for sending can be different with CPT for receiving, + * so we should return back to lnet_finalize() to make + * sure we are locking the correct partition. + */ + return rc; } lnet_msg_decommit(msg, cpt, status); lnet_msg_free_locked(msg); + return 0; } void @@ -409,6 +440,7 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) struct lnet_msg_container *container; int my_slot; int cpt; + int rc; int i; LASSERT (!cfs_in_interrupt ()); @@ -432,9 +464,6 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) msg->msg_txpeer == NULL ? "" : libcfs_nid2str(msg->msg_txpeer->lp_nid), msg->msg_rxpeer == NULL ? "" : libcfs_nid2str(msg->msg_rxpeer->lp_nid)); #endif - - LASSERT (msg->msg_onactivelist); - msg->msg_ev.status = status; if (msg->msg_md != NULL) { @@ -445,6 +474,8 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) lnet_res_unlock(cpt); } + again: + rc = 0; if (!msg->msg_tx_committed && !msg->msg_rx_committed) { /* not commited to network yet */ LASSERT(!msg->msg_onactivelist); @@ -470,20 +501,24 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) my_slot = -1; for (i = 0; i < container->msc_nfinalizers; i++) { if (container->msc_finalizers[i] == cfs_current()) - goto out; + break; if (my_slot < 0 && container->msc_finalizers[i] == NULL) my_slot = i; } - if (my_slot < 0) - goto out; + if (i < container->msc_nfinalizers || my_slot < 0) { + lnet_net_unlock(cpt); + return; + } container->msc_finalizers[my_slot] = cfs_current(); #else LASSERT(container->msc_nfinalizers == 1); - if (container->msc_finalizers[0] != NULL) - goto out; + if (container->msc_finalizers[0] != NULL) { + lnet_net_unlock(cpt); + return; + } my_slot = i = 0; container->msc_finalizers[0] = (struct lnet_msg_container *)1; @@ -497,13 +532,18 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) /* NB drops and regains the lnet lock if it actually does * anything, so my finalizing friends can chomp along too */ - lnet_complete_msg_locked(msg, cpt); + rc = lnet_complete_msg_locked(msg, cpt); + if (rc != 0) + break; } container->msc_finalizers[my_slot] = NULL; - out: lnet_net_unlock(cpt); + + if (rc != 0) + goto again; } +EXPORT_SYMBOL(lnet_finalize); void lnet_msg_container_cleanup(struct lnet_msg_container *container)