Whamcloud - gitweb
i=liang,b=13065:
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
index a886ff5..701352c 100644 (file)
@@ -1,39 +1,54 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * lib/lib-eq.c
- * Library level Event queue management routines
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   This file is part of Lustre, http://www.lustre.org
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lnet/lnet/lib-eq.c
+ *
+ * Library level Event queue management routines
  */
 
-#define DEBUG_SUBSYSTEM S_PORTALS
-#include <portals/lib-p30.h>
+#define DEBUG_SUBSYSTEM S_LNET
+#include <lnet/lib-lnet.h>
 
-int 
-lib_api_eq_alloc (nal_t *apinal, ptl_size_t count,
-                  ptl_eq_handler_t callback, 
-                  ptl_handle_eq_t *handle)
+int
+LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
+            lnet_handle_eq_t *handle)
 {
-        lib_nal_t     *nal = apinal->nal_data;
-        lib_eq_t      *eq;
-        unsigned long  flags;
-        int            rc;
+        lnet_eq_t     *eq;
+
+        LASSERT (the_lnet.ln_init);
+        LASSERT (the_lnet.ln_refcount > 0);
 
         /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
          * overflow, they don't skip entries, so the queue has the same
@@ -48,36 +63,24 @@ lib_api_eq_alloc (nal_t *apinal, ptl_size_t count,
         }
 
         if (count == 0)        /* catch bad parameter / overflow on roundup */
-                return (PTL_VAL_FAILED);
-        
-        eq = lib_eq_alloc (nal);
+                return (-EINVAL);
+
+        eq = lnet_eq_alloc();
         if (eq == NULL)
-                return (PTL_NO_SPACE);
+                return (-ENOMEM);
 
-        PORTAL_ALLOC(eq->eq_events, count * sizeof(ptl_event_t));
+        LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
         if (eq->eq_events == NULL) {
-                LIB_LOCK(nal, flags);
-                lib_eq_free (nal, eq);
-                LIB_UNLOCK(nal, flags);
-        }
+                LNET_LOCK();
+                lnet_eq_free (eq);
+                LNET_UNLOCK();
 
-        if (nal->libnal_map != NULL) {
-                struct iovec iov = {
-                        .iov_base = eq->eq_events,
-                        .iov_len = count * sizeof(ptl_event_t)};
-
-                rc = nal->libnal_map(nal, 1, &iov, &eq->eq_addrkey);
-                if (rc != PTL_OK) {
-                        LIB_LOCK(nal, flags);
-                        lib_eq_free (nal, eq);
-                        LIB_UNLOCK(nal, flags);
-                        return (rc);
-                }
+                return -ENOMEM;
         }
 
         /* NB this resets all event sequence numbers to 0, to be earlier
          * than eq_deq_seq */
-        memset(eq->eq_events, 0, count * sizeof(ptl_event_t));
+        memset(eq->eq_events, 0, count * sizeof(lnet_event_t));
 
         eq->eq_deq_seq = 1;
         eq->eq_enq_seq = 1;
@@ -85,77 +88,70 @@ lib_api_eq_alloc (nal_t *apinal, ptl_size_t count,
         eq->eq_refcount = 0;
         eq->eq_callback = callback;
 
-        LIB_LOCK(nal, flags);
+        LNET_LOCK();
 
-        lib_initialise_handle (nal, &eq->eq_lh, PTL_COOKIE_TYPE_EQ);
-        list_add (&eq->eq_list, &nal->libnal_ni.ni_active_eqs);
+        lnet_initialise_handle (&eq->eq_lh, LNET_COOKIE_TYPE_EQ);
+        list_add (&eq->eq_list, &the_lnet.ln_active_eqs);
 
-        LIB_UNLOCK(nal, flags);
+        LNET_UNLOCK();
 
-        ptl_eq2handle(handle, nal, eq);
-        return (PTL_OK);
+        lnet_eq2handle(handle, eq);
+        return (0);
 }
 
-int 
-lib_api_eq_free(nal_t *apinal, ptl_handle_eq_t *eqh)
+int
+LNetEQFree(lnet_handle_eq_t eqh)
 {
-        lib_nal_t     *nal = apinal->nal_data;
-        lib_eq_t      *eq;
+        lnet_eq_t     *eq;
         int            size;
-        ptl_event_t   *events;
-        void          *addrkey;
-        unsigned long  flags;
+        lnet_event_t  *events;
 
-        LIB_LOCK(nal, flags);
+        LASSERT (the_lnet.ln_init);
+        LASSERT (the_lnet.ln_refcount > 0);
 
-        eq = ptl_handle2eq(eqh, nal);
+        LNET_LOCK();
+
+        eq = lnet_handle2eq(&eqh);
         if (eq == NULL) {
-                LIB_UNLOCK(nal, flags);
-                return (PTL_EQ_INVALID);
+                LNET_UNLOCK();
+                return (-ENOENT);
         }
 
         if (eq->eq_refcount != 0) {
-                LIB_UNLOCK(nal, flags);
-                return (PTL_EQ_IN_USE);
+                CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n",
+                       eq->eq_refcount);
+                LNET_UNLOCK();
+                return (-EBUSY);
         }
 
         /* stash for free after lock dropped */
         events  = eq->eq_events;
         size    = eq->eq_size;
-        addrkey = eq->eq_addrkey;
 
-        lib_invalidate_handle (nal, &eq->eq_lh);
+        lnet_invalidate_handle (&eq->eq_lh);
         list_del (&eq->eq_list);
-        lib_eq_free (nal, eq);
-
-        LIB_UNLOCK(nal, flags);
+        lnet_eq_free (eq);
 
-        if (nal->libnal_unmap != NULL) {
-                struct iovec iov = {
-                        .iov_base = events,
-                        .iov_len = size * sizeof(ptl_event_t)};
-
-                nal->libnal_unmap(nal, 1, &iov, &addrkey);
-        }
+        LNET_UNLOCK();
 
-        PORTAL_FREE(events, size * sizeof (ptl_event_t));
+        LIBCFS_FREE(events, size * sizeof (lnet_event_t));
 
-        return (PTL_OK);
+        return 0;
 }
 
 int
-lib_get_event (lib_eq_t *eq, ptl_event_t *ev)
+lib_get_event (lnet_eq_t *eq, lnet_event_t *ev)
 {
-        int          new_index = eq->eq_deq_seq & (eq->eq_size - 1);
-        ptl_event_t *new_event = &eq->eq_events[new_index];
-        int          rc;
+        int           new_index = eq->eq_deq_seq & (eq->eq_size - 1);
+        lnet_event_t *new_event = &eq->eq_events[new_index];
+        int           rc;
         ENTRY;
 
         CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
                new_event, eq->eq_deq_seq, eq->eq_size);
 
-        if (PTL_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
-                RETURN(PTL_EQ_EMPTY);
+        if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
+                RETURN(0);
         }
 
         /* We've got a new event... */
@@ -163,11 +159,13 @@ lib_get_event (lib_eq_t *eq, ptl_event_t *ev)
 
         /* ...but did it overwrite an event we've not seen yet? */
         if (eq->eq_deq_seq == new_event->sequence) {
-                rc = PTL_OK;
+                rc = 1;
         } else {
-                CERROR("Event Queue Overflow: eq seq %lu ev seq %lu\n",
+                /* don't complain with CERROR: some EQs are sized small
+                 * anyway; if it's important, the caller should complain */
+                CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n",
                        eq->eq_deq_seq, new_event->sequence);
-                rc = PTL_EQ_DROPPED;
+                rc = -EOVERFLOW;
         }
 
         eq->eq_deq_seq = new_event->sequence + 1;
@@ -176,89 +174,173 @@ lib_get_event (lib_eq_t *eq, ptl_event_t *ev)
 
 
 int
-lib_api_eq_poll (nal_t *apinal, 
-                 ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
-                 ptl_event_t *event, int *which)
+LNetEQGet (lnet_handle_eq_t eventq, lnet_event_t *event)
+{
+        int which;
+
+        return LNetEQPoll(&eventq, 1, 0,
+                         event, &which);
+}
+
+int
+LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event)
+{
+        int which;
+
+        return LNetEQPoll(&eventq, 1, LNET_TIME_FOREVER,
+                         event, &which);
+}
+
+int
+LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
+            lnet_event_t *event, int *which)
 {
-        lib_nal_t       *nal = apinal->nal_data;
-        lib_ni_t        *ni = &nal->libnal_ni;
-        unsigned long    flags;
         int              i;
         int              rc;
 #ifdef __KERNEL__
-        wait_queue_t     wq;
-        unsigned long    now;
+        cfs_waitlink_t   wl;
+        cfs_time_t       now;
 #else
         struct timeval   then;
         struct timeval   now;
+# ifdef HAVE_LIBPTHREAD
         struct timespec  ts;
+# endif
+        lnet_ni_t       *eqwaitni = the_lnet.ln_eqwaitni;
 #endif
         ENTRY;
 
-        LIB_LOCK(nal, flags);
+        LASSERT (the_lnet.ln_init);
+        LASSERT (the_lnet.ln_refcount > 0);
+
+        if (neq < 1)
+                RETURN(-ENOENT);
+
+        LNET_LOCK();
 
         for (;;) {
+#ifndef __KERNEL__
+                LNET_UNLOCK();
+
+                /* Recursion breaker */
+                if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
+                    !LNetHandleIsEqual(eventqs[0], the_lnet.ln_rc_eqh))
+                        lnet_router_checker();
+
+                LNET_LOCK();
+#endif
                 for (i = 0; i < neq; i++) {
-                        lib_eq_t *eq = ptl_handle2eq(&eventqs[i], nal);
+                        lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
+
+                        if (eq == NULL) {
+                                LNET_UNLOCK();
+                                RETURN(-ENOENT);
+                        }
 
                         rc = lib_get_event (eq, event);
-                        if (rc != PTL_EQ_EMPTY) {
-                                LIB_UNLOCK(nal, flags);
+                        if (rc != 0) {
+                                LNET_UNLOCK();
                                 *which = i;
                                 RETURN(rc);
                         }
                 }
-                
+
+#ifdef __KERNEL__
                 if (timeout_ms == 0) {
-                        LIB_UNLOCK (nal, flags);
-                        RETURN (PTL_EQ_EMPTY);
+                        LNET_UNLOCK();
+                        RETURN (0);
                 }
 
-                /* Some architectures force us to do spin locking/unlocking
-                 * in the same stack frame, means we can abstract the
-                 * locking here */
-#ifdef __KERNEL__
-                init_waitqueue_entry(&wq, current);
+                cfs_waitlink_init(&wl);
                 set_current_state(TASK_INTERRUPTIBLE);
-                add_wait_queue(&ni->ni_waitq, &wq);
+                cfs_waitq_add(&the_lnet.ln_waitq, &wl);
 
-                LIB_UNLOCK(nal, flags);
+                LNET_UNLOCK();
 
                 if (timeout_ms < 0) {
-                        schedule ();
+                        cfs_waitq_wait (&wl, CFS_TASK_INTERRUPTIBLE);
                 } else {
-                        now = jiffies;
-                        schedule_timeout((timeout_ms * HZ)/1000);
-                        timeout_ms -= ((jiffies - now) * 1000)/HZ;
+                        struct timeval tv;
+
+                        now = cfs_time_current();
+                        cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE,
+                                            cfs_time_seconds(timeout_ms)/1000);
+                        cfs_duration_usec(cfs_time_sub(cfs_time_current(), now),
+                                            &tv);
+                        timeout_ms -= (int)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
                         if (timeout_ms < 0)
                                 timeout_ms = 0;
                 }
-                
-                LIB_LOCK(nal, flags);
+
+                LNET_LOCK();
+                cfs_waitq_del(&the_lnet.ln_waitq, &wl);
 #else
+                if (eqwaitni != NULL) {
+                        /* I have a single NI that I have to call into, to get
+                         * events queued, or to block. */
+                        lnet_ni_addref_locked(eqwaitni);
+                        LNET_UNLOCK();
+
+                        if (timeout_ms <= 0) {
+                                (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
+                        } else {
+                                gettimeofday(&then, NULL);
+
+                                (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
+
+                                gettimeofday(&now, NULL);
+                                timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
+                                              (now.tv_usec - then.tv_usec) / 1000;
+                                if (timeout_ms < 0)
+                                        timeout_ms = 0;
+                        }
+
+                        LNET_LOCK();
+                        lnet_ni_decref_locked(eqwaitni);
+
+                        /* don't call into eqwaitni again if timeout has
+                         * expired */
+                        if (timeout_ms == 0)
+                                eqwaitni = NULL;
+
+                        continue;               /* go back and check for events */
+                }
+
+                if (timeout_ms == 0) {
+                        LNET_UNLOCK();
+                        RETURN (0);
+                }
+
+# ifndef HAVE_LIBPTHREAD
+                /* If I'm single-threaded, LNET fails at startup if it can't
+                 * set the_lnet.ln_eqwaitni correctly.  */
+                LBUG();
+# else
                 if (timeout_ms < 0) {
-                        pthread_cond_wait(&ni->ni_cond, &ni->ni_mutex);
+                        pthread_cond_wait(&the_lnet.ln_cond,
+                                          &the_lnet.ln_lock);
                 } else {
                         gettimeofday(&then, NULL);
-                        
+
                         ts.tv_sec = then.tv_sec + timeout_ms/1000;
-                        ts.tv_nsec = then.tv_usec * 1000 + 
+                        ts.tv_nsec = then.tv_usec * 1000 +
                                      (timeout_ms%1000) * 1000000;
                         if (ts.tv_nsec >= 1000000000) {
                                 ts.tv_sec++;
                                 ts.tv_nsec -= 1000000000;
                         }
-                        
-                        pthread_cond_timedwait(&ni->ni_cond,
-                                               &ni->ni_mutex, &ts);
-                        
+
+                        pthread_cond_timedwait(&the_lnet.ln_cond,
+                                               &the_lnet.ln_lock, &ts);
+
                         gettimeofday(&now, NULL);
                         timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
                                       (now.tv_usec - then.tv_usec) / 1000;
-                        
+
                         if (timeout_ms < 0)
                                 timeout_ms = 0;
                 }
+# endif
 #endif
         }
 }