X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=68286b3cec2a75cab4591aaf2694e656cdc7b47a;hb=2c2100329b3a57ee8fc3dff9e7cef18768a6f788;hp=328b8d8a7ddd5d9512a2ed2af477e7223dacb187;hpb=23de47e82bd999ec651f927097922413527cca71;p=fs%2Flustre-release.git diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index 328b8d8..68286b3 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -1,44 +1,69 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lib/lib-msg.c - * Message decoding, parsing and finalizing routines + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * Copyright (c) 2001-2003 Cluster File Systems, Inc. - * Copyright (c) 2001-2002 Sandia National Laboratories + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * This file is part of Lustre, http://www.sf.net/projects/lustre/ + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * lnet/lnet/lib-msg.c * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * Message decoding, parsing and finalizing routines */ -#ifndef __KERNEL__ -# include -#else -# define DEBUG_SUBSYSTEM S_PORTALS -# include -#endif +#define DEBUG_SUBSYSTEM S_LNET -#include +#include void -lib_enq_event_locked (lib_nal_t *nal, void *private, - lib_eq_t *eq, ptl_event_t *ev) +lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev) { - ptl_event_t *eq_slot; - - ev->sequence = eq->eq_enq_seq++; /* Allocate the next queue slot */ + ENTRY; + + memset(ev, 0, sizeof(*ev)); + + ev->status = 0; + ev->unlinked = 1; + ev->type = LNET_EVENT_UNLINK; + lnet_md_deconstruct(md, &ev->md); + lnet_md2handle(&ev->md_handle, md); + EXIT; +} + +void +lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev) +{ + lnet_event_t *eq_slot; + + /* Allocate the next queue slot */ + ev->sequence = eq->eq_enq_seq++; /* size must be a power of 2 to handle sequence # overflow */ LASSERT (eq->eq_size != 0 && @@ -46,7 +71,7 @@ lib_enq_event_locked (lib_nal_t *nal, void *private, eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1)); /* There is no race since both event consumers and event producers - * take the LIB_LOCK(), so we don't screw around with memory + * take the LNET_LOCK, so we don't screw around with memory * barriers, setting the sequence number last or wierd structure * layout assertions. */ *eq_slot = *ev; @@ -55,85 +80,177 @@ lib_enq_event_locked (lib_nal_t *nal, void *private, if (eq->eq_callback != NULL) eq->eq_callback (eq_slot); - /* Wake anyone sleeping for an event (see lib-eq.c) */ #ifdef __KERNEL__ - if (waitqueue_active(&nal->libnal_ni.ni_waitq)) - wake_up_all(&nal->libnal_ni.ni_waitq); + /* Wake anyone waiting in LNetEQPoll() */ + if (cfs_waitq_active(&the_lnet.ln_waitq)) + cfs_waitq_broadcast(&the_lnet.ln_waitq); #else - pthread_cond_broadcast(&nal->libnal_ni.ni_cond); +# ifndef HAVE_LIBPTHREAD + /* LNetEQPoll() calls into _the_ LND to wait for action */ +# else + /* Wake anyone waiting in LNetEQPoll() */ + pthread_cond_broadcast(&the_lnet.ln_cond); +# endif #endif } -void -lib_finalize (lib_nal_t *nal, void *private, lib_msg_t *msg, ptl_err_t status) +void +lnet_complete_msg_locked(lnet_msg_t *msg) +{ + lnet_handle_wire_t ack_wmd; + int rc; + int status = msg->msg_ev.status; + + LASSERT (msg->msg_onactivelist); + + if (status == 0 && msg->msg_ack) { + /* Only send an ACK if the PUT completed successfully */ + + lnet_return_credits_locked(msg); + + msg->msg_ack = 0; + LNET_UNLOCK(); + + LASSERT(msg->msg_ev.type == LNET_EVENT_PUT); + LASSERT(!msg->msg_routing); + + ack_wmd = msg->msg_hdr.msg.put.ack_wmd; + + lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0); + + msg->msg_hdr.msg.ack.dst_wmd = ack_wmd; + msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits; + msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength); + + rc = lnet_send(msg->msg_ev.target.nid, msg); + + LNET_LOCK(); + + if (rc == 0) + return; + } else if (status == 0 && /* OK so far */ + (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */ + + LASSERT (!msg->msg_receiving); /* called back recv already */ + + LNET_UNLOCK(); + + rc = lnet_send(LNET_NID_ANY, msg); + + LNET_LOCK(); + + if (rc == 0) + return; + } + + lnet_return_credits_locked(msg); + + LASSERT (msg->msg_onactivelist); + msg->msg_onactivelist = 0; + list_del (&msg->msg_activelist); + the_lnet.ln_counters.msgs_alloc--; + lnet_msg_free(msg); +} + + +void +lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) { - lib_md_t *md; - int unlink; - unsigned long flags; - int rc; - ptl_hdr_t ack; +#ifdef __KERNEL__ + int i; + int my_slot; +#endif + lnet_libmd_t *md; + + LASSERT (!in_interrupt ()); if (msg == NULL) return; +#if 0 + CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n", + lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target), + msg->msg_target_is_router ? "t" : "", + msg->msg_routing ? "X" : "", + msg->msg_ack ? "A" : "", + msg->msg_sending ? "S" : "", + msg->msg_receiving ? "R" : "", + msg->msg_delayed ? "d" : "", + msg->msg_txcredit ? "C" : "", + msg->msg_peertxcredit ? "c" : "", + msg->msg_rtrcredit ? "F" : "", + msg->msg_peerrtrcredit ? "f" : "", + msg->msg_onactivelist ? "!" : "", + msg->msg_txpeer == NULL ? "" : libcfs_nid2str(msg->msg_txpeer->lp_nid), + msg->msg_rxpeer == NULL ? "" : libcfs_nid2str(msg->msg_rxpeer->lp_nid)); +#endif + LNET_LOCK(); + + LASSERT (msg->msg_onactivelist); - /* Only send an ACK if the PUT completed successfully */ - if (status == PTL_OK && - !ptl_is_wire_handle_none(&msg->ack_wmd)) { - - LASSERT(msg->ev.type == PTL_EVENT_PUT_END); - - memset (&ack, 0, sizeof (ack)); - ack.type = HTON__u32 (PTL_MSG_ACK); - ack.dest_nid = HTON__u64 (msg->ev.initiator.nid); - ack.dest_pid = HTON__u32 (msg->ev.initiator.pid); - ack.src_nid = HTON__u64 (nal->libnal_ni.ni_pid.nid); - ack.src_pid = HTON__u32 (nal->libnal_ni.ni_pid.pid); - ack.payload_length = 0; - - ack.msg.ack.dst_wmd = msg->ack_wmd; - ack.msg.ack.match_bits = msg->ev.match_bits; - ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength); - - rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK, - msg->ev.initiator.nid, msg->ev.initiator.pid, - NULL, 0, 0); - if (rc != PTL_OK) { - /* send failed: there's nothing else to clean up. */ - CERROR("Error %d sending ACK to "LPX64"\n", - rc, msg->ev.initiator.nid); - } + msg->msg_ev.status = status; + + md = msg->msg_md; + if (md != NULL) { + int unlink; + + /* Now it's safe to drop my caller's ref */ + md->md_refcount--; + LASSERT (md->md_refcount >= 0); + + unlink = lnet_md_unlinkable(md); + + msg->msg_ev.unlinked = unlink; + + if (md->md_eq != NULL) + lnet_enq_event_locked(md->md_eq, &msg->msg_ev); + + if (unlink) + lnet_md_unlink(md); + + msg->msg_md = NULL; } - md = msg->md; + list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq); - LIB_LOCK(nal, flags); + /* Recursion breaker. Don't complete the message here if I am (or + * enough other threads are) already completing messages */ - /* Now it's safe to drop my caller's ref */ - md->pending--; - LASSERT (md->pending >= 0); +#ifdef __KERNEL__ + my_slot = -1; + for (i = 0; i < the_lnet.ln_nfinalizers; i++) { + if (the_lnet.ln_finalizers[i] == cfs_current()) + goto out; + if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL) + my_slot = i; + } + if (my_slot < 0) + goto out; - /* Should I unlink this MD? */ - if (md->pending != 0) /* other refs */ - unlink = 0; - else if ((md->md_flags & PTL_MD_FLAG_ZOMBIE) != 0) - unlink = 1; - else if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINK) == 0) - unlink = 0; - else - unlink = lib_md_exhausted(md); + the_lnet.ln_finalizers[my_slot] = cfs_current(); +#else + if (the_lnet.ln_finalizing) + goto out; - msg->ev.ni_fail_type = status; - msg->ev.unlinked = unlink; + the_lnet.ln_finalizing = 1; +#endif - if (md->eq != NULL) - lib_enq_event_locked(nal, private, md->eq, &msg->ev); + while (!list_empty(&the_lnet.ln_finalizeq)) { + msg = list_entry(the_lnet.ln_finalizeq.next, + lnet_msg_t, msg_list); + + list_del(&msg->msg_list); - if (unlink) - lib_md_unlink(nal, md); + /* NB drops and regains the lnet lock if it actually does + * anything, so my finalizing friends can chomp along too */ + lnet_complete_msg_locked(msg); + } - list_del (&msg->msg_list); - nal->libnal_ni.ni_counters.msgs_alloc--; - lib_msg_free(nal, msg); +#ifdef __KERNEL__ + the_lnet.ln_finalizers[my_slot] = NULL; +#else + the_lnet.ln_finalizing = 0; +#endif - LIB_UNLOCK(nal, flags); + out: + LNET_UNLOCK(); }