Whamcloud - gitweb
Changed quotes for newer bash
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
index 8a91860..4992fce 100644 (file)
@@ -5,9 +5,8 @@
  * Library level Event queue management routines
  *
  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
- *  Copyright (c) 2001-2002 Sandia National Laboratories
  *
- *   This file is part of Lustre, http://www.sf.net/projects/lustre/
+ *   This file is part of Lustre, http://www.lustre.org
  *
  *   Lustre is free software; you can redistribute it and/or
  *   modify it under the terms of version 2 of the GNU General Public
 
 #define DEBUG_SUBSYSTEM S_PORTALS
 #include <portals/lib-p30.h>
-#include <portals/arg-blocks.h>
 
-int do_PtlEQAlloc_internal(nal_cb_t * nal, void *private, void *v_args,
-                           void *v_ret)
+int 
+lib_api_eq_alloc (nal_t *apinal, ptl_size_t count,
+                  ptl_eq_handler_t callback, 
+                  ptl_handle_eq_t *handle)
 {
-        /*
-         * Incoming:
-         *      ptl_handle_ni_t ni_in
-         *      ptl_size_t count_in
-         *      void                    * base_in
-         *
-         * Outgoing:
-         *      ptl_handle_eq_t         * handle_out
-         */
-
-        PtlEQAlloc_in *args = v_args;
-        PtlEQAlloc_out *ret = v_ret;
-
-        lib_eq_t *eq;
-        unsigned long flags;
-
-        /* api should have rounded up */
-        if (args->count_in != LOWEST_BIT_SET (args->count_in))
-                return ret->rc = PTL_VAL_FAILED;
+        lib_nal_t     *nal = apinal->nal_data;
+        lib_eq_t      *eq;
+        unsigned long  flags;
+        int            rc;
 
+        /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
+         * overflow, they don't skip entries, so the queue has the same
+         * apparant capacity at all times */
+
+        if (count != LOWEST_BIT_SET(count)) {   /* not a power of 2 already */
+                do {                    /* knock off all but the top bit... */
+                        count &= ~LOWEST_BIT_SET (count);
+                } while (count != LOWEST_BIT_SET(count));
+
+                count <<= 1;                             /* ...and round up */
+        }
+
+        if (count == 0)        /* catch bad parameter / overflow on roundup */
+                return (PTL_VAL_FAILED);
+        
         eq = lib_eq_alloc (nal);
         if (eq == NULL)
-                return (ret->rc = PTL_NO_SPACE);
+                return (PTL_NO_SPACE);
 
-        state_lock(nal, &flags);
+        PORTAL_ALLOC(eq->eq_events, count * sizeof(ptl_event_t));
+        if (eq->eq_events == NULL) {
+                LIB_LOCK(nal, flags);
+                lib_eq_free (nal, eq);
+                LIB_UNLOCK(nal, flags);
+        }
 
-        if (nal->cb_map != NULL) {
+        if (nal->libnal_map != NULL) {
                 struct iovec iov = {
-                        .iov_base = args->base_in,
-                        .iov_len = args->count_in * sizeof (ptl_event_t) };
+                        .iov_base = eq->eq_events,
+                        .iov_len = count * sizeof(ptl_event_t)};
 
-                ret->rc = nal->cb_map (nal, 1, &iov, &eq->eq_addrkey);
-                if (ret->rc != PTL_OK) {
+                rc = nal->libnal_map(nal, 1, &iov, &eq->eq_addrkey);
+                if (rc != PTL_OK) {
+                        LIB_LOCK(nal, flags);
                         lib_eq_free (nal, eq);
-                        
-                        state_unlock (nal, &flags);
-                        return (ret->rc);
+                        LIB_UNLOCK(nal, flags);
+                        return (rc);
                 }
         }
 
-        eq->sequence = 1;
-        eq->base = args->base_in;
-        eq->size = args->count_in;
+        /* NB this resets all event sequence numbers to 0, to be earlier
+         * than eq_deq_seq */
+        memset(eq->eq_events, 0, count * sizeof(ptl_event_t));
+
+        eq->eq_deq_seq = 1;
+        eq->eq_enq_seq = 1;
+        eq->eq_size = count;
         eq->eq_refcount = 0;
-        eq->event_callback = args->callback_in;
+        eq->eq_callback = callback;
+
+        LIB_LOCK(nal, flags);
 
         lib_initialise_handle (nal, &eq->eq_lh, PTL_COOKIE_TYPE_EQ);
-        list_add (&eq->eq_list, &nal->ni.ni_active_eqs);
+        list_add (&eq->eq_list, &nal->libnal_ni.ni_active_eqs);
 
-        state_unlock(nal, &flags);
+        LIB_UNLOCK(nal, flags);
 
-        ptl_eq2handle(&ret->handle_out, eq);
-        return (ret->rc = PTL_OK);
+        ptl_eq2handle(handle, nal, eq);
+        return (PTL_OK);
 }
 
-int do_PtlEQFree_internal(nal_cb_t * nal, void *private, void *v_args,
-                          void *v_ret)
+int 
+lib_api_eq_free(nal_t *apinal, ptl_handle_eq_t *eqh)
 {
-        /*
-         * Incoming:
-         *      ptl_handle_eq_t eventq_in
-         *
-         * Outgoing:
-         */
-
-        PtlEQFree_in *args = v_args;
-        PtlEQFree_out *ret = v_ret;
-        lib_eq_t *eq;
-        long flags;
+        lib_nal_t     *nal = apinal->nal_data;
+        lib_eq_t      *eq;
+        int            size;
+        ptl_event_t   *events;
+        void          *addrkey;
+        unsigned long  flags;
 
-        state_lock (nal, &flags);
+        LIB_LOCK(nal, flags);
 
-        eq = ptl_handle2eq(&args->eventq_in, nal);
+        eq = ptl_handle2eq(eqh, nal);
         if (eq == NULL) {
-                ret->rc = PTL_EQ_INVALID;
-        } else if (eq->eq_refcount != 0) {
-                ret->rc = PTL_EQ_IN_USE;
+                LIB_UNLOCK(nal, flags);
+                return (PTL_EQ_INVALID);
+        }
+
+        if (eq->eq_refcount != 0) {
+                LIB_UNLOCK(nal, flags);
+                return (PTL_EQ_IN_USE);
+        }
+
+        /* stash for free after lock dropped */
+        events  = eq->eq_events;
+        size    = eq->eq_size;
+        addrkey = eq->eq_addrkey;
+
+        lib_invalidate_handle (nal, &eq->eq_lh);
+        list_del (&eq->eq_list);
+        lib_eq_free (nal, eq);
+
+        LIB_UNLOCK(nal, flags);
+
+        if (nal->libnal_unmap != NULL) {
+                struct iovec iov = {
+                        .iov_base = events,
+                        .iov_len = size * sizeof(ptl_event_t)};
+
+                nal->libnal_unmap(nal, 1, &iov, &addrkey);
+        }
+
+        PORTAL_FREE(events, size * sizeof (ptl_event_t));
+
+        return (PTL_OK);
+}
+
+int
+lib_get_event (lib_eq_t *eq, ptl_event_t *ev)
+{
+        int          new_index = eq->eq_deq_seq & (eq->eq_size - 1);
+        ptl_event_t *new_event = &eq->eq_events[new_index];
+        int          rc;
+        ENTRY;
+
+        CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
+               new_event, eq->eq_deq_seq, eq->eq_size);
+
+        if (PTL_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
+                RETURN(PTL_EQ_EMPTY);
+        }
+
+        /* We've got a new event... */
+        *ev = *new_event;
+
+        /* ...but did it overwrite an event we've not seen yet? */
+        if (eq->eq_deq_seq == new_event->sequence) {
+                rc = PTL_OK;
         } else {
-                if (nal->cb_unmap != NULL) {
-                        struct iovec iov = {
-                                .iov_base = eq->base,
-                                .iov_len = eq->size * sizeof (ptl_event_t) };
-                        
-                        nal->cb_unmap(nal, 1, &iov, &eq->eq_addrkey);
+                CERROR("Event Queue Overflow: eq seq %lu ev seq %lu\n",
+                       eq->eq_deq_seq, new_event->sequence);
+                rc = PTL_EQ_DROPPED;
+        }
+
+        eq->eq_deq_seq = new_event->sequence + 1;
+        RETURN(rc);
+}
+
+
+int
+lib_api_eq_poll (nal_t *apinal, 
+                 ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
+                 ptl_event_t *event, int *which)
+{
+        lib_nal_t       *nal = apinal->nal_data;
+        lib_ni_t        *ni = &nal->libnal_ni;
+        unsigned long    flags;
+        int              i;
+        int              rc;
+#ifdef __KERNEL__
+        cfs_waitlink_t   wl;
+        cfs_time_t       now;
+#else
+        struct timeval   then;
+        struct timeval   now;
+        struct timespec  ts;
+#endif
+        ENTRY;
+
+        LIB_LOCK(nal, flags);
+
+        for (;;) {
+                for (i = 0; i < neq; i++) {
+                        lib_eq_t *eq = ptl_handle2eq(&eventqs[i], nal);
+
+                        rc = lib_get_event (eq, event);
+                        if (rc != PTL_EQ_EMPTY) {
+                                LIB_UNLOCK(nal, flags);
+                                *which = i;
+                                RETURN(rc);
+                        }
+                }
+                
+                if (timeout_ms == 0) {
+                        LIB_UNLOCK (nal, flags);
+                        RETURN (PTL_EQ_EMPTY);
                 }
 
-                lib_invalidate_handle (nal, &eq->eq_lh);
-                list_del (&eq->eq_list);
-                lib_eq_free (nal, eq);
-                ret->rc = PTL_OK;
-        }
+                /* Some architectures force us to do spin locking/unlocking
+                 * in the same stack frame, means we can abstract the
+                 * locking here */
+#ifdef __KERNEL__
+                cfs_waitlink_init(&wl);
+                set_current_state(TASK_INTERRUPTIBLE);
+                cfs_waitq_add(&ni->ni_waitq, &wl);
+
+                LIB_UNLOCK(nal, flags);
 
-        state_unlock (nal, &flags);
+                if (timeout_ms < 0) {
+                        cfs_waitq_wait (&wl);
+                } else { 
+                        struct timeval tv;
 
-        return (ret->rc);
+                        now = cfs_time_current();
+                        cfs_waitq_timedwait(&wl, cfs_time_seconds(timeout_ms)/1000);
+                        cfs_duration_usec(cfs_time_sub(cfs_time_current(), now), &tv); 
+                        timeout_ms -= tv.tv_sec * 1000 + tv.tv_usec / 1000;
+                        if (timeout_ms < 0)
+                                timeout_ms = 0;
+                }
+                
+                LIB_LOCK(nal, flags);
+                cfs_waitq_del(&ni->ni_waitq, &wl);
+#else
+                if (timeout_ms < 0) {
+                        pthread_cond_wait(&ni->ni_cond, &ni->ni_mutex);
+                } else {
+                        gettimeofday(&then, NULL);
+                        
+                        ts.tv_sec = then.tv_sec + timeout_ms/1000;
+                        ts.tv_nsec = then.tv_usec * 1000 + 
+                                     (timeout_ms%1000) * 1000000;
+                        if (ts.tv_nsec >= 1000000000) {
+                                ts.tv_sec++;
+                                ts.tv_nsec -= 1000000000;
+                        }
+                        
+                        pthread_cond_timedwait(&ni->ni_cond,
+                                               &ni->ni_mutex, &ts);
+                        
+                        gettimeofday(&now, NULL);
+                        timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
+                                      (now.tv_usec - then.tv_usec) / 1000;
+                        
+                        if (timeout_ms < 0)
+                                timeout_ms = 0;
+                }
+#endif
+        }
 }