X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=b1995f2da5e3a992afb91a3aa12576600ea55ee5;hb=ffd8e881bb98bf3fce0716b46cc51b1922642f6e;hp=6c705f01c35384c3044352990c6e041f224db8c1;hpb=a07e9d350b3e500c7be877f6dcf54380b86a9cbe;p=fs%2Flustre-release.git diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 6c705f0..b1995f2 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -26,6 +26,8 @@ /* * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2012, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -74,7 +76,7 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) ev->target.pid = le32_to_cpu(hdr->dest_pid); ev->initiator.nid = LNET_NID_ANY; ev->initiator.pid = the_lnet.ln_pid; - ev->sender = LNET_NID_ANY; + ev->sender = LNET_NID_ANY; } else { /* event for passive message */ @@ -83,7 +85,7 @@ lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) ev->initiator.pid = hdr->src_pid; ev->initiator.nid = hdr->src_nid; ev->rlength = hdr->payload_length; - ev->sender = msg->msg_from; + ev->sender = msg->msg_from; ev->mlength = msg->msg_wanted; ev->offset = msg->msg_offset; } @@ -160,7 +162,7 @@ lnet_msg_commit(lnet_msg_t *msg, int cpt) LASSERT(!msg->msg_onactivelist); msg->msg_onactivelist = 1; - cfs_list_add(&msg->msg_activelist, &container->msc_active); + list_add(&msg->msg_activelist, &container->msc_active); counters->msgs_alloc++; if (counters->msgs_alloc > counters->msgs_max) @@ -204,11 +206,10 @@ lnet_msg_decommit_tx(lnet_msg_t *msg, int status) case LNET_EVENT_GET: LASSERT(msg->msg_rx_committed); - /* overwritten while sending reply */ + /* overwritten while sending reply, we should never be + * here for optimized GET */ LASSERT(msg->msg_type == LNET_MSG_REPLY); - msg->msg_type = LNET_MSG_GET; /* fix type */ - counters->send_length += msg->msg_len; break; } @@ -230,6 +231,7 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status) if (status != 0) goto out; + counters = the_lnet.ln_counters[msg->msg_rx_cpt]; switch (ev->type) { default: LASSERT(ev->type == 0); @@ -241,7 +243,13 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status) break; case LNET_EVENT_GET: - LASSERT(msg->msg_type == LNET_MSG_GET); + /* type is "REPLY" if it's an optimized GET on passive side, + * because optimized GET will never be committed for sending, + * so message type wouldn't be changed back to "GET" by + * lnet_msg_decommit_tx(), see details in lnet_parse_get() */ + LASSERT(msg->msg_type == LNET_MSG_REPLY || + msg->msg_type == LNET_MSG_GET); + counters->send_length += msg->msg_wanted; break; case LNET_EVENT_PUT: @@ -249,12 +257,13 @@ lnet_msg_decommit_rx(lnet_msg_t *msg, int status) break; case LNET_EVENT_REPLY: - LASSERT(msg->msg_type == LNET_MSG_REPLY || - msg->msg_type == LNET_MSG_GET); /* optimized GET */ + /* type is "GET" if it's an optimized GET on active side, + * see details in lnet_create_reply_msg() */ + LASSERT(msg->msg_type == LNET_MSG_GET || + msg->msg_type == LNET_MSG_REPLY); break; } - counters = the_lnet.ln_counters[msg->msg_rx_cpt]; counters->recv_count++; if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY) counters->recv_length += msg->msg_wanted; @@ -287,7 +296,7 @@ lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status) lnet_msg_decommit_rx(msg, status); } - cfs_list_del(&msg->msg_activelist); + list_del(&msg->msg_activelist); msg->msg_onactivelist = 0; the_lnet.ln_counters[cpt2]->msgs_alloc--; @@ -310,7 +319,7 @@ lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md, LASSERT(!msg->msg_routing); msg->msg_md = md; - if (msg->msg_receiving) { /* commited for receiving */ + if (msg->msg_receiving) { /* committed for receiving */ msg->msg_offset = offset; msg->msg_wanted = mlen; } @@ -349,7 +358,7 @@ lnet_msg_detach_md(lnet_msg_t *msg, int status) msg->msg_md = NULL; } -void +static int lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) { lnet_handle_wire_t ack_wmd; @@ -382,9 +391,19 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY); lnet_net_lock(cpt); + /* + * NB: message is committed for sending, we should return + * on success because LND will finalize this message later. + * + * Also, there is possibility that message is committed for + * sending and also failed before delivering to LND, + * i.e: ENOMEM, in that case we can't fall through either + * because CPT for sending can be different with CPT for + * receiving, so we should return back to lnet_finalize() + * to make sure we are locking the correct partition. + */ + return rc; - if (rc == 0) - return; } else if (status == 0 && /* OK so far */ (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */ @@ -394,13 +413,25 @@ lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY); lnet_net_lock(cpt); - - if (rc == 0) - return; + /* + * NB: message is committed for sending, we should return + * on success because LND will finalize this message later. + * + * Also, there is possibility that message is committed for + * sending and also failed before delivering to LND, + * i.e: ENOMEM, in that case we can't fall through either: + * - The rule is message must decommit for sending first if + * the it's committed for both sending and receiving + * - CPT for sending can be different with CPT for receiving, + * so we should return back to lnet_finalize() to make + * sure we are locking the correct partition. + */ + return rc; } lnet_msg_decommit(msg, cpt, status); lnet_msg_free_locked(msg); + return 0; } void @@ -409,12 +440,13 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) struct lnet_msg_container *container; int my_slot; int cpt; + int rc; int i; - LASSERT (!cfs_in_interrupt ()); + LASSERT (!in_interrupt ()); - if (msg == NULL) - return; + if (msg == NULL) + return; #if 0 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n", lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target), @@ -432,9 +464,6 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) msg->msg_txpeer == NULL ? "" : libcfs_nid2str(msg->msg_txpeer->lp_nid), msg->msg_rxpeer == NULL ? "" : libcfs_nid2str(msg->msg_rxpeer->lp_nid)); #endif - - LASSERT (msg->msg_onactivelist); - msg->msg_ev.status = status; if (msg->msg_md != NULL) { @@ -445,15 +474,17 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) lnet_res_unlock(cpt); } + again: + rc = 0; if (!msg->msg_tx_committed && !msg->msg_rx_committed) { - /* not commited to network yet */ + /* not committed to network yet */ LASSERT(!msg->msg_onactivelist); lnet_msg_free(msg); return; } /* - * NB: routed message can be commited for both receiving and sending, + * NB: routed message can be committed for both receiving and sending, * we should finalize in LIFO order and keep counters correct. * (finalize sending first then finalize receiving) */ @@ -461,7 +492,7 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) lnet_net_lock(cpt); container = the_lnet.ln_msg_containers[cpt]; - cfs_list_add_tail(&msg->msg_list, &container->msc_finalizing); + list_add_tail(&msg->msg_list, &container->msc_finalizing); /* Recursion breaker. Don't complete the message here if I am (or * enough other threads are) already completing messages */ @@ -469,41 +500,50 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) #ifdef __KERNEL__ my_slot = -1; for (i = 0; i < container->msc_nfinalizers; i++) { - if (container->msc_finalizers[i] == cfs_current()) - goto out; + if (container->msc_finalizers[i] == current) + break; if (my_slot < 0 && container->msc_finalizers[i] == NULL) my_slot = i; } - if (my_slot < 0) - goto out; + if (i < container->msc_nfinalizers || my_slot < 0) { + lnet_net_unlock(cpt); + return; + } - container->msc_finalizers[my_slot] = cfs_current(); + container->msc_finalizers[my_slot] = current; #else LASSERT(container->msc_nfinalizers == 1); - if (container->msc_finalizers[0] != NULL) - goto out; + if (container->msc_finalizers[0] != NULL) { + lnet_net_unlock(cpt); + return; + } my_slot = i = 0; container->msc_finalizers[0] = (struct lnet_msg_container *)1; #endif - while (!cfs_list_empty(&container->msc_finalizing)) { - msg = cfs_list_entry(container->msc_finalizing.next, - lnet_msg_t, msg_list); + while (!list_empty(&container->msc_finalizing)) { + msg = list_entry(container->msc_finalizing.next, + lnet_msg_t, msg_list); - cfs_list_del(&msg->msg_list); + list_del(&msg->msg_list); /* NB drops and regains the lnet lock if it actually does * anything, so my finalizing friends can chomp along too */ - lnet_complete_msg_locked(msg, cpt); + rc = lnet_complete_msg_locked(msg, cpt); + if (rc != 0) + break; } container->msc_finalizers[my_slot] = NULL; - out: lnet_net_unlock(cpt); + + if (rc != 0) + goto again; } +EXPORT_SYMBOL(lnet_finalize); void lnet_msg_container_cleanup(struct lnet_msg_container *container) @@ -513,13 +553,13 @@ lnet_msg_container_cleanup(struct lnet_msg_container *container) if (container->msc_init == 0) return; - while (!cfs_list_empty(&container->msc_active)) { - lnet_msg_t *msg = cfs_list_entry(container->msc_active.next, - lnet_msg_t, msg_activelist); + while (!list_empty(&container->msc_active)) { + lnet_msg_t *msg = list_entry(container->msc_active.next, + lnet_msg_t, msg_activelist); LASSERT(msg->msg_onactivelist); msg->msg_onactivelist = 0; - cfs_list_del(&msg->msg_activelist); + list_del(&msg->msg_activelist); lnet_msg_free(msg); count++; } @@ -546,8 +586,8 @@ lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) container->msc_init = 1; - CFS_INIT_LIST_HEAD(&container->msc_active); - CFS_INIT_LIST_HEAD(&container->msc_finalizing); + INIT_LIST_HEAD(&container->msc_active); + INIT_LIST_HEAD(&container->msc_finalizing); #ifdef LNET_USE_LIB_FREELIST memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));