X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=4ab5fa4d8418d50f3571e7199e0e2b05575f5c18;hb=8d8d0f2cacd5769a2e324fbd37f5a0674748621f;hp=1b69533f09e9161d715a130a1e26abe30ebfd44f;hpb=944a01f5d1adb1ab5f7721d5c2d1cafcd3b2c915;p=fs%2Flustre-release.git diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 1b69533..4ab5fa4 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -1,155 +1,256 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lib/lib-msg.c - * Message decoding, parsing and finalizing routines + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * Copyright (c) 2001-2003 Cluster File Systems, Inc. - * Copyright (c) 2001-2002 Sandia National Laboratories + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * This file is part of Lustre, http://www.sf.net/projects/lustre/ + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * GPL HEADER END + */ +/* + * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * lnet/lnet/lib-msg.c * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * Message decoding, parsing and finalizing routines */ -#ifndef __KERNEL__ -# include -#else -# define DEBUG_SUBSYSTEM S_PORTALS -# include -#endif +#define DEBUG_SUBSYSTEM S_LNET -#include +#include void -lib_enq_event_locked (nal_cb_t *nal, void *private, - lib_eq_t *eq, ptl_event_t *ev) +lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev) { - ptl_event_t *eq_slot; - int rc; - - ev->sequence = eq->sequence++; /* Allocate the next queue slot */ + ENTRY; - /* size must be a power of 2 to handle a wrapped sequence # */ - LASSERT (eq->size != 0 && - eq->size == LOWEST_BIT_SET (eq->size)); - eq_slot = eq->base + (ev->sequence & (eq->size - 1)); + memset(ev, 0, sizeof(*ev)); + + ev->status = 0; + ev->unlinked = 1; + ev->type = LNET_EVENT_UNLINK; + lnet_md_deconstruct(md, &ev->md); + lnet_md2handle(&ev->md_handle, md); + EXIT; +} - /* Copy the event into the allocated slot, ensuring all the rest of - * the event's contents have been copied _before_ the sequence - * number gets updated. A processes 'getting' an event waits on - * the next queue slot's sequence to be 'new'. When it is, _all_ - * other event fields had better be consistent. I assert - * 'sequence' is the last member, so I only need a 2 stage copy. */ +void +lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev) +{ + lnet_event_t *eq_slot; - LASSERT(sizeof (ptl_event_t) == - offsetof(ptl_event_t, sequence) + sizeof(ev->sequence)); + /* Allocate the next queue slot */ + ev->sequence = eq->eq_enq_seq++; - rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev, - offsetof (ptl_event_t, sequence)); - LASSERT (rc == PTL_OK); + /* size must be a power of 2 to handle sequence # overflow */ + LASSERT (eq->eq_size != 0 && + eq->eq_size == LOWEST_BIT_SET (eq->eq_size)); + eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1)); -#ifdef __KERNEL__ - barrier(); -#endif - /* Updating the sequence number is what makes the event 'new' NB if - * the cb_write below isn't atomic, this could cause a race with - * PtlEQGet */ - rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence, - (void *)&ev->sequence,sizeof (ev->sequence)); - LASSERT (rc == PTL_OK); + /* There is no race since both event consumers and event producers + * take the LNET_LOCK, so we don't screw around with memory + * barriers, setting the sequence number last or weird structure + * layout assertions. */ + *eq_slot = *ev; + + /* Call the callback handler (if any) */ + if (eq->eq_callback != NULL) + eq->eq_callback (eq_slot); #ifdef __KERNEL__ - barrier(); + /* Wake anyone waiting in LNetEQPoll() */ + if (cfs_waitq_active(&the_lnet.ln_waitq)) + cfs_waitq_broadcast(&the_lnet.ln_waitq); +#else +# ifndef HAVE_LIBPTHREAD + /* LNetEQPoll() calls into _the_ LND to wait for action */ +# else + /* Wake anyone waiting in LNetEQPoll() */ + pthread_cond_broadcast(&the_lnet.ln_cond); +# endif #endif +} + +void +lnet_complete_msg_locked(lnet_msg_t *msg) +{ + lnet_handle_wire_t ack_wmd; + int rc; + int status = msg->msg_ev.status; + + LASSERT (msg->msg_onactivelist); + + if (status == 0 && msg->msg_ack) { + /* Only send an ACK if the PUT completed successfully */ + + lnet_return_credits_locked(msg); + + msg->msg_ack = 0; + LNET_UNLOCK(); + + LASSERT(msg->msg_ev.type == LNET_EVENT_PUT); + LASSERT(!msg->msg_routing); + + ack_wmd = msg->msg_hdr.msg.put.ack_wmd; + + lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0); + + msg->msg_hdr.msg.ack.dst_wmd = ack_wmd; + msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits; + msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength); + + rc = lnet_send(msg->msg_ev.target.nid, msg); - if (nal->cb_callback != NULL) - nal->cb_callback(nal, private, eq, ev); - else if (eq->event_callback != NULL) - eq->event_callback(ev); + LNET_LOCK(); + + if (rc == 0) + return; + } else if (status == 0 && /* OK so far */ + (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */ + + LASSERT (!msg->msg_receiving); /* called back recv already */ + + LNET_UNLOCK(); + + rc = lnet_send(LNET_NID_ANY, msg); + + LNET_LOCK(); + + if (rc == 0) + return; + } + + lnet_return_credits_locked(msg); + + LASSERT (msg->msg_onactivelist); + msg->msg_onactivelist = 0; + cfs_list_del (&msg->msg_activelist); + the_lnet.ln_counters.msgs_alloc--; + lnet_msg_free(msg); } -void -lib_finalize(nal_cb_t *nal, void *private, lib_msg_t *msg, ptl_err_t status) + +void +lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) { - lib_md_t *md; - int unlink; - unsigned long flags; - int rc; - ptl_hdr_t ack; +#ifdef __KERNEL__ + int i; + int my_slot; +#endif + lnet_libmd_t *md; + + LASSERT (!cfs_in_interrupt ()); if (msg == NULL) return; +#if 0 + CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n", + lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target), + msg->msg_target_is_router ? "t" : "", + msg->msg_routing ? "X" : "", + msg->msg_ack ? "A" : "", + msg->msg_sending ? "S" : "", + msg->msg_receiving ? "R" : "", + msg->msg_delayed ? "d" : "", + msg->msg_txcredit ? "C" : "", + msg->msg_peertxcredit ? "c" : "", + msg->msg_rtrcredit ? "F" : "", + msg->msg_peerrtrcredit ? "f" : "", + msg->msg_onactivelist ? "!" : "", + msg->msg_txpeer == NULL ? "" : libcfs_nid2str(msg->msg_txpeer->lp_nid), + msg->msg_rxpeer == NULL ? "" : libcfs_nid2str(msg->msg_rxpeer->lp_nid)); +#endif + LNET_LOCK(); + + LASSERT (msg->msg_onactivelist); + + msg->msg_ev.status = status; - /* Only send an ACK if the PUT completed successfully */ - if (status == PTL_OK && - !ptl_is_wire_handle_none(&msg->ack_wmd)) { - - LASSERT(msg->ev.type == PTL_EVENT_PUT_END); - - memset (&ack, 0, sizeof (ack)); - ack.type = HTON__u32 (PTL_MSG_ACK); - ack.dest_nid = HTON__u64 (msg->ev.initiator.nid); - ack.src_nid = HTON__u64 (nal->ni.nid); - ack.dest_pid = HTON__u32 (msg->ev.initiator.pid); - ack.src_pid = HTON__u32 (nal->ni.pid); - ack.payload_length = 0; - - ack.msg.ack.dst_wmd = msg->ack_wmd; - ack.msg.ack.match_bits = msg->ev.match_bits; - ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength); - - rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK, - msg->ev.initiator.nid, msg->ev.initiator.pid, - NULL, 0, 0); - if (rc != PTL_OK) { - /* send failed: there's nothing else to clean up. */ - CERROR("Error %d sending ACK to "LPX64"\n", - rc, msg->ev.initiator.nid); - } + md = msg->msg_md; + if (md != NULL) { + int unlink; + + /* Now it's safe to drop my caller's ref */ + md->md_refcount--; + LASSERT (md->md_refcount >= 0); + + unlink = lnet_md_unlinkable(md); + + msg->msg_ev.unlinked = unlink; + + if (md->md_eq != NULL) + lnet_enq_event_locked(md->md_eq, &msg->msg_ev); + + if (unlink) + lnet_md_unlink(md); + + msg->msg_md = NULL; } - md = msg->md; + cfs_list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq); - state_lock(nal, &flags); + /* Recursion breaker. Don't complete the message here if I am (or + * enough other threads are) already completing messages */ - /* Now it's safe to drop my caller's ref */ - md->pending--; - LASSERT (md->pending >= 0); +#ifdef __KERNEL__ + my_slot = -1; + for (i = 0; i < the_lnet.ln_nfinalizers; i++) { + if (the_lnet.ln_finalizers[i] == cfs_current()) + goto out; + if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL) + my_slot = i; + } + if (my_slot < 0) + goto out; - /* Should I unlink this MD? */ - if (md->pending != 0) /* other refs */ - unlink = 0; - else if ((md->md_flags & PTL_MD_FLAG_ZOMBIE) != 0) - unlink = 1; - else if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) == 0) - unlink = 0; - else - unlink = lib_md_exhausted(md); + the_lnet.ln_finalizers[my_slot] = cfs_current(); +#else + if (the_lnet.ln_finalizing) + goto out; + + the_lnet.ln_finalizing = 1; +#endif - msg->ev.ni_fail_type = status; - msg->ev.unlinked = unlink; + while (!cfs_list_empty(&the_lnet.ln_finalizeq)) { + msg = cfs_list_entry(the_lnet.ln_finalizeq.next, + lnet_msg_t, msg_list); - if (md->eq != NULL) - lib_enq_event_locked(nal, private, md->eq, &msg->ev); + cfs_list_del(&msg->msg_list); - if (unlink) - lib_md_unlink(nal, md); + /* NB drops and regains the lnet lock if it actually does + * anything, so my finalizing friends can chomp along too */ + lnet_complete_msg_locked(msg); + } - list_del (&msg->msg_list); - nal->ni.counters.msgs_alloc--; - lib_msg_free(nal, msg); +#ifdef __KERNEL__ + the_lnet.ln_finalizers[my_slot] = NULL; +#else + the_lnet.ln_finalizing = 0; +#endif - state_unlock(nal, &flags); + out: + LNET_UNLOCK(); }