X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Flnet%2Flib-msg.c;h=68286b3cec2a75cab4591aaf2694e656cdc7b47a;hp=f10892cc51e4a5502e36551681f4cf13fa235c0b;hb=2c56a64e065f471057928f16b9b550ca0050bdc1;hpb=96ec6856f91f7f9031cfce4273c714d72cfe59ae diff --git a/lnet/lnet/lib-msg.c b/lnet/lnet/lib-msg.c index f10892c..68286b3 100644 --- a/lnet/lnet/lib-msg.c +++ b/lnet/lnet/lib-msg.c @@ -1,163 +1,256 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lib/lib-msg.c - * Message decoding, parsing and finalizing routines + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * Copyright (c) 2001-2003 Cluster File Systems, Inc. - * Copyright (c) 2001-2002 Sandia National Laboratories + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * This file is part of Lustre, http://www.sf.net/projects/lustre/ + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * lnet/lnet/lib-msg.c * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * Message decoding, parsing and finalizing routines */ -#ifndef __KERNEL__ -# include +#define DEBUG_SUBSYSTEM S_LNET + +#include + +void +lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev) +{ + ENTRY; + + memset(ev, 0, sizeof(*ev)); + + ev->status = 0; + ev->unlinked = 1; + ev->type = LNET_EVENT_UNLINK; + lnet_md_deconstruct(md, &ev->md); + lnet_md2handle(&ev->md_handle, md); + EXIT; +} + +void +lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev) +{ + lnet_event_t *eq_slot; + + /* Allocate the next queue slot */ + ev->sequence = eq->eq_enq_seq++; + + /* size must be a power of 2 to handle sequence # overflow */ + LASSERT (eq->eq_size != 0 && + eq->eq_size == LOWEST_BIT_SET (eq->eq_size)); + eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1)); + + /* There is no race since both event consumers and event producers + * take the LNET_LOCK, so we don't screw around with memory + * barriers, setting the sequence number last or wierd structure + * layout assertions. */ + *eq_slot = *ev; + + /* Call the callback handler (if any) */ + if (eq->eq_callback != NULL) + eq->eq_callback (eq_slot); + +#ifdef __KERNEL__ + /* Wake anyone waiting in LNetEQPoll() */ + if (cfs_waitq_active(&the_lnet.ln_waitq)) + cfs_waitq_broadcast(&the_lnet.ln_waitq); #else -# define DEBUG_SUBSYSTEM S_PORTALS -# include +# ifndef HAVE_LIBPTHREAD + /* LNetEQPoll() calls into _the_ LND to wait for action */ +# else + /* Wake anyone waiting in LNetEQPoll() */ + pthread_cond_broadcast(&the_lnet.ln_cond); +# endif #endif +} -#include - -int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg) +void +lnet_complete_msg_locked(lnet_msg_t *msg) { - lib_md_t *md; - lib_eq_t *eq; - int rc; - unsigned long flags; - - /* ni went down while processing this message */ - if (nal->ni.up == 0) { - return -1; - } + lnet_handle_wire_t ack_wmd; + int rc; + int status = msg->msg_ev.status; - if (msg == NULL) - return 0; + LASSERT (msg->msg_onactivelist); + + if (status == 0 && msg->msg_ack) { + /* Only send an ACK if the PUT completed successfully */ + + lnet_return_credits_locked(msg); + + msg->msg_ack = 0; + LNET_UNLOCK(); + + LASSERT(msg->msg_ev.type == LNET_EVENT_PUT); + LASSERT(!msg->msg_routing); - rc = 0; - if (msg->send_ack) { - ptl_hdr_t ack; + ack_wmd = msg->msg_hdr.msg.put.ack_wmd; - LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd)); + lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0); - memset (&ack, 0, sizeof (ack)); - ack.type = HTON__u32 (PTL_MSG_ACK); - ack.dest_nid = HTON__u64 (msg->nid); - ack.src_nid = HTON__u64 (nal->ni.nid); - ack.dest_pid = HTON__u32 (msg->pid); - ack.src_pid = HTON__u32 (nal->ni.pid); - PTL_HDR_LENGTH(&ack) = 0; + msg->msg_hdr.msg.ack.dst_wmd = ack_wmd; + msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits; + msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength); - ack.msg.ack.dst_wmd = msg->ack_wmd; - ack.msg.ack.match_bits = msg->ev.match_bits; - ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength); + rc = lnet_send(msg->msg_ev.target.nid, msg); - rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK, - msg->nid, msg->pid, NULL, 0, 0); + LNET_LOCK(); + + if (rc == 0) + return; + } else if (status == 0 && /* OK so far */ + (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */ + + LASSERT (!msg->msg_receiving); /* called back recv already */ + + LNET_UNLOCK(); + + rc = lnet_send(LNET_NID_ANY, msg); + + LNET_LOCK(); + + if (rc == 0) + return; } - md = msg->md; - LASSERT (md->pending > 0); /* I've not dropped my ref yet */ - eq = md->eq; - - state_lock(nal, &flags); - - if (eq != NULL) { - ptl_event_t *ev = &msg->ev; - ptl_event_t *eq_slot; - - /* I have to hold the lock while I bump the sequence number - * and copy the event into the queue. If not, and I was - * interrupted after bumping the sequence number, other - * events could fill the queue, including the slot I just - * allocated to this event. On resuming, I would overwrite - * a more 'recent' event with old event state, and - * processes taking events off the queue would not detect - * overflow correctly. - */ - - ev->sequence = eq->sequence++;/* Allocate the next queue slot */ - - /* size must be a power of 2 to handle a wrapped sequence # */ - LASSERT (eq->size != 0 && - eq->size == LOWEST_BIT_SET (eq->size)); - eq_slot = eq->base + (ev->sequence & (eq->size - 1)); - - /* Invalidate unlinked_me unless this is the last - * event for an auto-unlinked MD. Note that if md was - * auto-unlinked, md->pending can only decrease - */ - if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */ - md->pending != 1) /* not last ref */ - ev->unlinked_me = PTL_HANDLE_NONE; - - /* Copy the event into the allocated slot, ensuring all the - * rest of the event's contents have been copied _before_ - * the sequence number gets updated. A processes 'getting' - * an event waits on the next queue slot's sequence to be - * 'new'. When it is, _all_ other event fields had better - * be consistent. I assert 'sequence' is the last member, - * so I only need a 2 stage copy. - */ - LASSERT(sizeof (ptl_event_t) == - offsetof(ptl_event_t, sequence) + sizeof(ev->sequence)); - - rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev, - offsetof (ptl_event_t, sequence)); - LASSERT (rc == 0); + lnet_return_credits_locked(msg); + + LASSERT (msg->msg_onactivelist); + msg->msg_onactivelist = 0; + list_del (&msg->msg_activelist); + the_lnet.ln_counters.msgs_alloc--; + lnet_msg_free(msg); +} + +void +lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) +{ #ifdef __KERNEL__ - barrier(); + int i; + int my_slot; #endif - /* Updating the sequence number is what makes the event 'new' */ + lnet_libmd_t *md; - /* cb_write is not necessarily atomic, so this could - cause a race with PtlEQGet */ - rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence, - (void *)&ev->sequence,sizeof (ev->sequence)); - LASSERT (rc == 0); + LASSERT (!in_interrupt ()); -#ifdef __KERNEL__ - barrier(); + if (msg == NULL) + return; +#if 0 + CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n", + lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target), + msg->msg_target_is_router ? "t" : "", + msg->msg_routing ? "X" : "", + msg->msg_ack ? "A" : "", + msg->msg_sending ? "S" : "", + msg->msg_receiving ? "R" : "", + msg->msg_delayed ? "d" : "", + msg->msg_txcredit ? "C" : "", + msg->msg_peertxcredit ? "c" : "", + msg->msg_rtrcredit ? "F" : "", + msg->msg_peerrtrcredit ? "f" : "", + msg->msg_onactivelist ? "!" : "", + msg->msg_txpeer == NULL ? "" : libcfs_nid2str(msg->msg_txpeer->lp_nid), + msg->msg_rxpeer == NULL ? "" : libcfs_nid2str(msg->msg_rxpeer->lp_nid)); #endif + LNET_LOCK(); + + LASSERT (msg->msg_onactivelist); + + msg->msg_ev.status = status; + + md = msg->msg_md; + if (md != NULL) { + int unlink; + + /* Now it's safe to drop my caller's ref */ + md->md_refcount--; + LASSERT (md->md_refcount >= 0); + + unlink = lnet_md_unlinkable(md); + + msg->msg_ev.unlinked = unlink; - /* I must also ensure that (a) callbacks are made in the - * same order as the events land in the queue, and (b) the - * callback occurs before the event can be removed from the - * queue, so I can't drop the lock during the callback. */ - if (nal->cb_callback != NULL) - nal->cb_callback(nal, private, eq, ev); - else if (eq->event_callback != NULL) - (void)((eq->event_callback) (ev)); + if (md->md_eq != NULL) + lnet_enq_event_locked(md->md_eq, &msg->msg_ev); + + if (unlink) + lnet_md_unlink(md); + + msg->msg_md = NULL; + } + + list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq); + + /* Recursion breaker. Don't complete the message here if I am (or + * enough other threads are) already completing messages */ + +#ifdef __KERNEL__ + my_slot = -1; + for (i = 0; i < the_lnet.ln_nfinalizers; i++) { + if (the_lnet.ln_finalizers[i] == cfs_current()) + goto out; + if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL) + my_slot = i; } + if (my_slot < 0) + goto out; - LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || - (md->md_flags & PTL_MD_FLAG_UNLINK) != 0); + the_lnet.ln_finalizers[my_slot] = cfs_current(); +#else + if (the_lnet.ln_finalizing) + goto out; + + the_lnet.ln_finalizing = 1; +#endif - md->pending--; - if (md->pending == 0 && /* no more outstanding operations on this md */ - (md->threshold == 0 || /* done its business */ - (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */ - lib_md_unlink(nal, md); + while (!list_empty(&the_lnet.ln_finalizeq)) { + msg = list_entry(the_lnet.ln_finalizeq.next, + lnet_msg_t, msg_list); + + list_del(&msg->msg_list); - list_del (&msg->msg_list); - nal->ni.counters.msgs_alloc--; - lib_msg_free(nal, msg); + /* NB drops and regains the lnet lock if it actually does + * anything, so my finalizing friends can chomp along too */ + lnet_complete_msg_locked(msg); + } - state_unlock(nal, &flags); +#ifdef __KERNEL__ + the_lnet.ln_finalizers[my_slot] = NULL; +#else + the_lnet.ln_finalizing = 0; +#endif - return rc; + out: + LNET_UNLOCK(); }