* Library level Event queue management routines
*
* Copyright (c) 2001-2003 Cluster File Systems, Inc.
- * Copyright (c) 2001-2002 Sandia National Laboratories
*
- * This file is part of Lustre, http://www.sf.net/projects/lustre/
+ * This file is part of Lustre, http://www.lustre.org
*
* Lustre is free software; you can redistribute it and/or
* modify it under the terms of version 2 of the GNU General Public
#define DEBUG_SUBSYSTEM S_PORTALS
#include <portals/lib-p30.h>
-#include <portals/arg-blocks.h>
-int do_PtlEQAlloc_internal(nal_cb_t * nal, void *private, void *v_args,
- void *v_ret)
+int
+lib_api_eq_alloc (nal_t *apinal, ptl_size_t count,
+ ptl_eq_handler_t callback,
+ ptl_handle_eq_t *handle)
{
- /*
- * Incoming:
- * ptl_handle_ni_t ni_in
- * ptl_size_t count_in
- * void * base_in
- *
- * Outgoing:
- * ptl_handle_eq_t * handle_out
- */
-
- PtlEQAlloc_in *args = v_args;
- PtlEQAlloc_out *ret = v_ret;
-
- lib_eq_t *eq;
- unsigned long flags;
-
- /* api should have rounded up */
- if (args->count_in != LOWEST_BIT_SET (args->count_in))
- return ret->rc = PTL_VAL_FAILED;
+ lib_nal_t *nal = apinal->nal_data;
+ lib_eq_t *eq;
+ unsigned long flags;
+ int rc;
+ /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
+ * overflow, they don't skip entries, so the queue has the same
+ * apparant capacity at all times */
+
+ if (count != LOWEST_BIT_SET(count)) { /* not a power of 2 already */
+ do { /* knock off all but the top bit... */
+ count &= ~LOWEST_BIT_SET (count);
+ } while (count != LOWEST_BIT_SET(count));
+
+ count <<= 1; /* ...and round up */
+ }
+
+ if (count == 0) /* catch bad parameter / overflow on roundup */
+ return (PTL_VAL_FAILED);
+
eq = lib_eq_alloc (nal);
if (eq == NULL)
- return (ret->rc = PTL_NO_SPACE);
+ return (PTL_NO_SPACE);
- state_lock(nal, &flags);
+ PORTAL_ALLOC(eq->eq_events, count * sizeof(ptl_event_t));
+ if (eq->eq_events == NULL) {
+ LIB_LOCK(nal, flags);
+ lib_eq_free (nal, eq);
+ LIB_UNLOCK(nal, flags);
+ }
- if (nal->cb_map != NULL) {
+ if (nal->libnal_map != NULL) {
struct iovec iov = {
- .iov_base = args->base_in,
- .iov_len = args->count_in * sizeof (ptl_event_t) };
+ .iov_base = eq->eq_events,
+ .iov_len = count * sizeof(ptl_event_t)};
- ret->rc = nal->cb_map (nal, 1, &iov, &eq->eq_addrkey);
- if (ret->rc != PTL_OK) {
+ rc = nal->libnal_map(nal, 1, &iov, &eq->eq_addrkey);
+ if (rc != PTL_OK) {
+ LIB_LOCK(nal, flags);
lib_eq_free (nal, eq);
-
- state_unlock (nal, &flags);
- return (ret->rc);
+ LIB_UNLOCK(nal, flags);
+ return (rc);
}
}
- eq->sequence = 1;
- eq->base = args->base_in;
- eq->size = args->count_in;
+ /* NB this resets all event sequence numbers to 0, to be earlier
+ * than eq_deq_seq */
+ memset(eq->eq_events, 0, count * sizeof(ptl_event_t));
+
+ eq->eq_deq_seq = 1;
+ eq->eq_enq_seq = 1;
+ eq->eq_size = count;
eq->eq_refcount = 0;
- eq->event_callback = args->callback_in;
+ eq->eq_callback = callback;
+
+ LIB_LOCK(nal, flags);
lib_initialise_handle (nal, &eq->eq_lh, PTL_COOKIE_TYPE_EQ);
- list_add (&eq->eq_list, &nal->ni.ni_active_eqs);
+ list_add (&eq->eq_list, &nal->libnal_ni.ni_active_eqs);
- state_unlock(nal, &flags);
+ LIB_UNLOCK(nal, flags);
- ptl_eq2handle(&ret->handle_out, eq);
- return (ret->rc = PTL_OK);
+ ptl_eq2handle(handle, nal, eq);
+ return (PTL_OK);
}
-int do_PtlEQFree_internal(nal_cb_t * nal, void *private, void *v_args,
- void *v_ret)
+int
+lib_api_eq_free(nal_t *apinal, ptl_handle_eq_t *eqh)
{
- /*
- * Incoming:
- * ptl_handle_eq_t eventq_in
- *
- * Outgoing:
- */
-
- PtlEQFree_in *args = v_args;
- PtlEQFree_out *ret = v_ret;
- lib_eq_t *eq;
- long flags;
+ lib_nal_t *nal = apinal->nal_data;
+ lib_eq_t *eq;
+ int size;
+ ptl_event_t *events;
+ void *addrkey;
+ unsigned long flags;
- state_lock (nal, &flags);
+ LIB_LOCK(nal, flags);
- eq = ptl_handle2eq(&args->eventq_in, nal);
+ eq = ptl_handle2eq(eqh, nal);
if (eq == NULL) {
- ret->rc = PTL_EQ_INVALID;
- } else if (eq->eq_refcount != 0) {
- ret->rc = PTL_EQ_IN_USE;
+ LIB_UNLOCK(nal, flags);
+ return (PTL_EQ_INVALID);
+ }
+
+ if (eq->eq_refcount != 0) {
+ LIB_UNLOCK(nal, flags);
+ return (PTL_EQ_IN_USE);
+ }
+
+ /* stash for free after lock dropped */
+ events = eq->eq_events;
+ size = eq->eq_size;
+ addrkey = eq->eq_addrkey;
+
+ lib_invalidate_handle (nal, &eq->eq_lh);
+ list_del (&eq->eq_list);
+ lib_eq_free (nal, eq);
+
+ LIB_UNLOCK(nal, flags);
+
+ if (nal->libnal_unmap != NULL) {
+ struct iovec iov = {
+ .iov_base = events,
+ .iov_len = size * sizeof(ptl_event_t)};
+
+ nal->libnal_unmap(nal, 1, &iov, &addrkey);
+ }
+
+ PORTAL_FREE(events, size * sizeof (ptl_event_t));
+
+ return (PTL_OK);
+}
+
+int
+lib_get_event (lib_eq_t *eq, ptl_event_t *ev)
+{
+ int new_index = eq->eq_deq_seq & (eq->eq_size - 1);
+ ptl_event_t *new_event = &eq->eq_events[new_index];
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
+ new_event, eq->eq_deq_seq, eq->eq_size);
+
+ if (PTL_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
+ RETURN(PTL_EQ_EMPTY);
+ }
+
+ /* We've got a new event... */
+ *ev = *new_event;
+
+ /* ...but did it overwrite an event we've not seen yet? */
+ if (eq->eq_deq_seq == new_event->sequence) {
+ rc = PTL_OK;
} else {
- if (nal->cb_unmap != NULL) {
- struct iovec iov = {
- .iov_base = eq->base,
- .iov_len = eq->size * sizeof (ptl_event_t) };
-
- nal->cb_unmap(nal, 1, &iov, &eq->eq_addrkey);
+ CERROR("Event Queue Overflow: eq seq %lu ev seq %lu\n",
+ eq->eq_deq_seq, new_event->sequence);
+ rc = PTL_EQ_DROPPED;
+ }
+
+ eq->eq_deq_seq = new_event->sequence + 1;
+ RETURN(rc);
+}
+
+
+int
+lib_api_eq_poll (nal_t *apinal,
+ ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
+ ptl_event_t *event, int *which)
+{
+ lib_nal_t *nal = apinal->nal_data;
+ lib_ni_t *ni = &nal->libnal_ni;
+ unsigned long flags;
+ int i;
+ int rc;
+#ifdef __KERNEL__
+ cfs_waitlink_t wl;
+ cfs_time_t now;
+#else
+ struct timeval then;
+ struct timeval now;
+ struct timespec ts;
+#endif
+ ENTRY;
+
+ LIB_LOCK(nal, flags);
+
+ for (;;) {
+ for (i = 0; i < neq; i++) {
+ lib_eq_t *eq = ptl_handle2eq(&eventqs[i], nal);
+
+ rc = lib_get_event (eq, event);
+ if (rc != PTL_EQ_EMPTY) {
+ LIB_UNLOCK(nal, flags);
+ *which = i;
+ RETURN(rc);
+ }
+ }
+
+ if (timeout_ms == 0) {
+ LIB_UNLOCK (nal, flags);
+ RETURN (PTL_EQ_EMPTY);
}
- lib_invalidate_handle (nal, &eq->eq_lh);
- list_del (&eq->eq_list);
- lib_eq_free (nal, eq);
- ret->rc = PTL_OK;
- }
+ /* Some architectures force us to do spin locking/unlocking
+ * in the same stack frame, means we can abstract the
+ * locking here */
+#ifdef __KERNEL__
+ cfs_waitlink_init(&wl);
+ set_current_state(TASK_INTERRUPTIBLE);
+ cfs_waitq_add(&ni->ni_waitq, &wl);
+
+ LIB_UNLOCK(nal, flags);
- state_unlock (nal, &flags);
+ if (timeout_ms < 0) {
+ cfs_waitq_wait (&wl);
+ } else {
+ struct timeval tv;
- return (ret->rc);
+ now = cfs_time_current();
+ cfs_waitq_timedwait(&wl, cfs_time_seconds(timeout_ms)/1000);
+ cfs_duration_usec(cfs_time_sub(cfs_time_current(), now), &tv);
+ timeout_ms -= tv.tv_sec * 1000 + tv.tv_usec / 1000;
+ if (timeout_ms < 0)
+ timeout_ms = 0;
+ }
+
+ LIB_LOCK(nal, flags);
+ cfs_waitq_del(&ni->ni_waitq, &wl);
+#else
+ if (timeout_ms < 0) {
+ pthread_cond_wait(&ni->ni_cond, &ni->ni_mutex);
+ } else {
+ gettimeofday(&then, NULL);
+
+ ts.tv_sec = then.tv_sec + timeout_ms/1000;
+ ts.tv_nsec = then.tv_usec * 1000 +
+ (timeout_ms%1000) * 1000000;
+ if (ts.tv_nsec >= 1000000000) {
+ ts.tv_sec++;
+ ts.tv_nsec -= 1000000000;
+ }
+
+ pthread_cond_timedwait(&ni->ni_cond,
+ &ni->ni_mutex, &ts);
+
+ gettimeofday(&now, NULL);
+ timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
+ (now.tv_usec - then.tv_usec) / 1000;
+
+ if (timeout_ms < 0)
+ timeout_ms = 0;
+ }
+#endif
+ }
}