Whamcloud - gitweb
i=liang,b=13065:
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
index f10892c..68286b3 100644 (file)
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * lib/lib-msg.c
- * Message decoding, parsing and finalizing routines
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
- *  Copyright (c) 2001-2002 Sandia National Laboratories
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   This file is part of Lustre, http://www.sf.net/projects/lustre/
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ * lnet/lnet/lib-msg.c
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * Message decoding, parsing and finalizing routines
  */
 
-#ifndef __KERNEL__
-# include <stdio.h>
+#define DEBUG_SUBSYSTEM S_LNET
+
+#include <lnet/lib-lnet.h>
+
+void
+lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
+{
+        ENTRY;
+
+        memset(ev, 0, sizeof(*ev));
+
+        ev->status   = 0;
+        ev->unlinked = 1;
+        ev->type     = LNET_EVENT_UNLINK;
+        lnet_md_deconstruct(md, &ev->md);
+        lnet_md2handle(&ev->md_handle, md);
+        EXIT;
+}
+
+void
+lnet_enq_event_locked (lnet_eq_t *eq, lnet_event_t *ev)
+{
+        lnet_event_t  *eq_slot;
+
+        /* Allocate the next queue slot */
+        ev->sequence = eq->eq_enq_seq++;
+
+        /* size must be a power of 2 to handle sequence # overflow */
+        LASSERT (eq->eq_size != 0 &&
+                 eq->eq_size == LOWEST_BIT_SET (eq->eq_size));
+        eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
+
+        /* There is no race since both event consumers and event producers
+         * take the LNET_LOCK, so we don't screw around with memory
+         * barriers, setting the sequence number last or wierd structure
+         * layout assertions. */
+        *eq_slot = *ev;
+
+        /* Call the callback handler (if any) */
+        if (eq->eq_callback != NULL)
+                eq->eq_callback (eq_slot);
+
+#ifdef __KERNEL__
+        /* Wake anyone waiting in LNetEQPoll() */
+        if (cfs_waitq_active(&the_lnet.ln_waitq))
+                cfs_waitq_broadcast(&the_lnet.ln_waitq);
 #else
-# define DEBUG_SUBSYSTEM S_PORTALS
-# include <linux/kp30.h>
+# ifndef HAVE_LIBPTHREAD
+        /* LNetEQPoll() calls into _the_ LND to wait for action */
+# else
+        /* Wake anyone waiting in LNetEQPoll() */
+        pthread_cond_broadcast(&the_lnet.ln_cond);
+# endif
 #endif
+}
 
-#include <portals/lib-p30.h>
-
-int lib_finalize(nal_cb_t * nal, void *private, lib_msg_t *msg)
+void
+lnet_complete_msg_locked(lnet_msg_t *msg)
 {
-        lib_md_t     *md;
-        lib_eq_t     *eq;
-        int           rc;
-        unsigned long flags;
-
-        /* ni went down while processing this message */
-        if (nal->ni.up == 0) {
-                return -1;
-        }
+        lnet_handle_wire_t ack_wmd;
+        int                rc;
+        int                status = msg->msg_ev.status;
 
-        if (msg == NULL)
-                return 0;
+        LASSERT (msg->msg_onactivelist);
+
+        if (status == 0 && msg->msg_ack) {
+                /* Only send an ACK if the PUT completed successfully */
+
+                lnet_return_credits_locked(msg);
+
+                msg->msg_ack = 0;
+                LNET_UNLOCK();
+
+                LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
+                LASSERT(!msg->msg_routing);
 
-        rc = 0;
-        if (msg->send_ack) {
-                ptl_hdr_t ack;
+                ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
 
-                LASSERT (!ptl_is_wire_handle_none (&msg->ack_wmd));
+                lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
 
-                memset (&ack, 0, sizeof (ack));
-                ack.type     = HTON__u32 (PTL_MSG_ACK);
-                ack.dest_nid = HTON__u64 (msg->nid);
-                ack.src_nid  = HTON__u64 (nal->ni.nid);
-                ack.dest_pid = HTON__u32 (msg->pid);
-                ack.src_pid  = HTON__u32 (nal->ni.pid);
-                PTL_HDR_LENGTH(&ack) = 0;
+                msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
+                msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
+                msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
 
-                ack.msg.ack.dst_wmd = msg->ack_wmd;
-                ack.msg.ack.match_bits = msg->ev.match_bits;
-                ack.msg.ack.mlength = HTON__u32 (msg->ev.mlength);
+                rc = lnet_send(msg->msg_ev.target.nid, msg);
 
-                rc = lib_send (nal, private, NULL, &ack, PTL_MSG_ACK,
-                               msg->nid, msg->pid, NULL, 0, 0);
+                LNET_LOCK();
+
+                if (rc == 0)
+                        return;
+        } else if (status == 0 &&               /* OK so far */
+                   (msg->msg_routing && !msg->msg_sending)) { /* not forwarded */
+                
+                LASSERT (!msg->msg_receiving);  /* called back recv already */
+        
+                LNET_UNLOCK();
+                
+                rc = lnet_send(LNET_NID_ANY, msg);
+
+                LNET_LOCK();
+
+                if (rc == 0)
+                        return;
         }
 
-        md = msg->md;
-        LASSERT (md->pending > 0);  /* I've not dropped my ref yet */
-        eq = md->eq;
-
-        state_lock(nal, &flags);
-
-        if (eq != NULL) {
-                ptl_event_t  *ev = &msg->ev;
-                ptl_event_t  *eq_slot;
-
-                /* I have to hold the lock while I bump the sequence number
-                 * and copy the event into the queue.  If not, and I was
-                 * interrupted after bumping the sequence number, other
-                 * events could fill the queue, including the slot I just
-                 * allocated to this event.  On resuming, I would overwrite
-                 * a more 'recent' event with old event state, and
-                 * processes taking events off the queue would not detect
-                 * overflow correctly.
-                 */
-
-                ev->sequence = eq->sequence++;/* Allocate the next queue slot */
-
-                /* size must be a power of 2 to handle a wrapped sequence # */
-                LASSERT (eq->size != 0 &&
-                         eq->size == LOWEST_BIT_SET (eq->size));
-                eq_slot = eq->base + (ev->sequence & (eq->size - 1));
-
-                /* Invalidate unlinked_me unless this is the last
-                 * event for an auto-unlinked MD.  Note that if md was
-                 * auto-unlinked, md->pending can only decrease
-                 */
-                if ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 || /* not auto-unlinked */
-                    md->pending != 1)                       /* not last ref */
-                        ev->unlinked_me = PTL_HANDLE_NONE;
-
-                /* Copy the event into the allocated slot, ensuring all the
-                 * rest of the event's contents have been copied _before_
-                 * the sequence number gets updated.  A processes 'getting'
-                 * an event waits on the next queue slot's sequence to be
-                 * 'new'.  When it is, _all_ other event fields had better
-                 * be consistent.  I assert 'sequence' is the last member,
-                 * so I only need a 2 stage copy.
-                 */
-                LASSERT(sizeof (ptl_event_t) ==
-                        offsetof(ptl_event_t, sequence) + sizeof(ev->sequence));
-
-                rc = nal->cb_write (nal, private, (user_ptr)eq_slot, ev,
-                                    offsetof (ptl_event_t, sequence));
-                LASSERT (rc == 0);
+        lnet_return_credits_locked(msg);
+
+        LASSERT (msg->msg_onactivelist);
+        msg->msg_onactivelist = 0;
+        list_del (&msg->msg_activelist);
+        the_lnet.ln_counters.msgs_alloc--;
+        lnet_msg_free(msg);
+}
+
 
+void
+lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
+{
 #ifdef __KERNEL__
-                barrier();
+        int                i;
+        int                my_slot;
 #endif
-                /* Updating the sequence number is what makes the event 'new' */
+        lnet_libmd_t      *md;
 
-                /* cb_write is not necessarily atomic, so this could
-                   cause a race with PtlEQGet */
-                rc = nal->cb_write(nal, private, (user_ptr)&eq_slot->sequence,
-                                   (void *)&ev->sequence,sizeof (ev->sequence));
-                LASSERT (rc == 0);
+        LASSERT (!in_interrupt ());
 
-#ifdef __KERNEL__
-                barrier();
+        if (msg == NULL)
+                return;
+#if 0
+        CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
+               lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
+               msg->msg_target_is_router ? "t" : "",
+               msg->msg_routing ? "X" : "",
+               msg->msg_ack ? "A" : "",
+               msg->msg_sending ? "S" : "",
+               msg->msg_receiving ? "R" : "",
+               msg->msg_delayed ? "d" : "",
+               msg->msg_txcredit ? "C" : "",
+               msg->msg_peertxcredit ? "c" : "",
+               msg->msg_rtrcredit ? "F" : "",
+               msg->msg_peerrtrcredit ? "f" : "",
+               msg->msg_onactivelist ? "!" : "",
+               msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
+               msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
 #endif
+        LNET_LOCK();
+
+        LASSERT (msg->msg_onactivelist);
+
+        msg->msg_ev.status = status;
+
+        md = msg->msg_md;
+        if (md != NULL) {
+                int      unlink;
+
+                /* Now it's safe to drop my caller's ref */
+                md->md_refcount--;
+                LASSERT (md->md_refcount >= 0);
+
+                unlink = lnet_md_unlinkable(md);
+
+                msg->msg_ev.unlinked = unlink;
 
-                /* I must also ensure that (a) callbacks are made in the
-                 * same order as the events land in the queue, and (b) the
-                 * callback occurs before the event can be removed from the
-                 * queue, so I can't drop the lock during the callback. */
-                if (nal->cb_callback != NULL)
-                        nal->cb_callback(nal, private, eq, ev);
-                else  if (eq->event_callback != NULL)
-                        (void)((eq->event_callback) (ev));
+                if (md->md_eq != NULL)
+                        lnet_enq_event_locked(md->md_eq, &msg->msg_ev);
+
+                if (unlink)
+                        lnet_md_unlink(md);
+
+                msg->msg_md = NULL;
+        }
+
+        list_add_tail (&msg->msg_list, &the_lnet.ln_finalizeq);
+
+        /* Recursion breaker.  Don't complete the message here if I am (or
+         * enough other threads are) already completing messages */
+
+#ifdef __KERNEL__
+        my_slot = -1;
+        for (i = 0; i < the_lnet.ln_nfinalizers; i++) {
+                if (the_lnet.ln_finalizers[i] == cfs_current())
+                        goto out;
+                if (my_slot < 0 && the_lnet.ln_finalizers[i] == NULL)
+                        my_slot = i;
         }
+        if (my_slot < 0)
+                goto out;
 
-        LASSERT ((md->md_flags & PTL_MD_FLAG_AUTO_UNLINKED) == 0 ||
-                 (md->md_flags & PTL_MD_FLAG_UNLINK) != 0);
+        the_lnet.ln_finalizers[my_slot] = cfs_current();
+#else
+        if (the_lnet.ln_finalizing)
+                goto out;
+
+        the_lnet.ln_finalizing = 1;
+#endif
 
-        md->pending--;
-        if (md->pending == 0 && /* no more outstanding operations on this md */
-            (md->threshold == 0 ||              /* done its business */
-             (md->md_flags & PTL_MD_FLAG_UNLINK) != 0)) /* marked for death */
-                lib_md_unlink(nal, md);
+        while (!list_empty(&the_lnet.ln_finalizeq)) {
+                msg = list_entry(the_lnet.ln_finalizeq.next,
+                                 lnet_msg_t, msg_list);
+                
+                list_del(&msg->msg_list);
 
-        list_del (&msg->msg_list);
-        nal->ni.counters.msgs_alloc--;
-        lib_msg_free(nal, msg);
+                /* NB drops and regains the lnet lock if it actually does
+                 * anything, so my finalizing friends can chomp along too */
+                lnet_complete_msg_locked(msg);
+        }
 
-        state_unlock(nal, &flags);
+#ifdef __KERNEL__
+        the_lnet.ln_finalizers[my_slot] = NULL;
+#else
+        the_lnet.ln_finalizing = 0;
+#endif
 
-        return rc;
+ out:
+        LNET_UNLOCK();
 }