X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Flnet%2Flib-eq.c;h=35801a4cd76f61c8f19a8ddf676dc413725eac62;hp=a886ff5b40b0a32d0f068f1f81c94c1b1f38432e;hb=03ff4ee6484faf914cadbbffedbcd0057708625a;hpb=cd7e28f9cee1a4888af825c1cb80e2111634d6eb diff --git a/lnet/lnet/lib-eq.c b/lnet/lnet/lib-eq.c index a886ff5..35801a4 100644 --- a/lnet/lnet/lib-eq.c +++ b/lnet/lnet/lib-eq.c @@ -22,18 +22,17 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define DEBUG_SUBSYSTEM S_PORTALS -#include +#define DEBUG_SUBSYSTEM S_LNET +#include -int -lib_api_eq_alloc (nal_t *apinal, ptl_size_t count, - ptl_eq_handler_t callback, - ptl_handle_eq_t *handle) +int +LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback, + lnet_handle_eq_t *handle) { - lib_nal_t *nal = apinal->nal_data; - lib_eq_t *eq; - unsigned long flags; - int rc; + lnet_eq_t *eq; + + LASSERT (the_lnet.ln_init); + LASSERT (the_lnet.ln_refcount > 0); /* We need count to be a power of 2 so that when eq_{enq,deq}_seq * overflow, they don't skip entries, so the queue has the same @@ -48,36 +47,24 @@ lib_api_eq_alloc (nal_t *apinal, ptl_size_t count, } if (count == 0) /* catch bad parameter / overflow on roundup */ - return (PTL_VAL_FAILED); - - eq = lib_eq_alloc (nal); + return (-EINVAL); + + eq = lnet_eq_alloc(); if (eq == NULL) - return (PTL_NO_SPACE); + return (-ENOMEM); - PORTAL_ALLOC(eq->eq_events, count * sizeof(ptl_event_t)); + LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t)); if (eq->eq_events == NULL) { - LIB_LOCK(nal, flags); - lib_eq_free (nal, eq); - LIB_UNLOCK(nal, flags); - } + LNET_LOCK(); + lnet_eq_free (eq); + LNET_UNLOCK(); - if (nal->libnal_map != NULL) { - struct iovec iov = { - .iov_base = eq->eq_events, - .iov_len = count * sizeof(ptl_event_t)}; - - rc = nal->libnal_map(nal, 1, &iov, &eq->eq_addrkey); - if (rc != PTL_OK) { - LIB_LOCK(nal, flags); - lib_eq_free (nal, eq); - LIB_UNLOCK(nal, flags); - return (rc); - } + return -ENOMEM; } /* NB this resets all event sequence numbers to 0, to be earlier * than eq_deq_seq */ - memset(eq->eq_events, 0, count * sizeof(ptl_event_t)); + memset(eq->eq_events, 0, count * sizeof(lnet_event_t)); eq->eq_deq_seq = 1; eq->eq_enq_seq = 1; @@ -85,77 +72,68 @@ lib_api_eq_alloc (nal_t *apinal, ptl_size_t count, eq->eq_refcount = 0; eq->eq_callback = callback; - LIB_LOCK(nal, flags); + LNET_LOCK(); - lib_initialise_handle (nal, &eq->eq_lh, PTL_COOKIE_TYPE_EQ); - list_add (&eq->eq_list, &nal->libnal_ni.ni_active_eqs); + lnet_initialise_handle (&eq->eq_lh, LNET_COOKIE_TYPE_EQ); + list_add (&eq->eq_list, &the_lnet.ln_active_eqs); - LIB_UNLOCK(nal, flags); + LNET_UNLOCK(); - ptl_eq2handle(handle, nal, eq); - return (PTL_OK); + lnet_eq2handle(handle, eq); + return (0); } -int -lib_api_eq_free(nal_t *apinal, ptl_handle_eq_t *eqh) +int +LNetEQFree(lnet_handle_eq_t eqh) { - lib_nal_t *nal = apinal->nal_data; - lib_eq_t *eq; + lnet_eq_t *eq; int size; - ptl_event_t *events; - void *addrkey; - unsigned long flags; + lnet_event_t *events; - LIB_LOCK(nal, flags); + LASSERT (the_lnet.ln_init); + LASSERT (the_lnet.ln_refcount > 0); - eq = ptl_handle2eq(eqh, nal); + LNET_LOCK(); + + eq = lnet_handle2eq(&eqh); if (eq == NULL) { - LIB_UNLOCK(nal, flags); - return (PTL_EQ_INVALID); + LNET_UNLOCK(); + return (-ENOENT); } if (eq->eq_refcount != 0) { - LIB_UNLOCK(nal, flags); - return (PTL_EQ_IN_USE); + LNET_UNLOCK(); + return (-EBUSY); } /* stash for free after lock dropped */ events = eq->eq_events; size = eq->eq_size; - addrkey = eq->eq_addrkey; - lib_invalidate_handle (nal, &eq->eq_lh); + lnet_invalidate_handle (&eq->eq_lh); list_del (&eq->eq_list); - lib_eq_free (nal, eq); - - LIB_UNLOCK(nal, flags); + lnet_eq_free (eq); - if (nal->libnal_unmap != NULL) { - struct iovec iov = { - .iov_base = events, - .iov_len = size * sizeof(ptl_event_t)}; + LNET_UNLOCK(); - nal->libnal_unmap(nal, 1, &iov, &addrkey); - } + LIBCFS_FREE(events, size * sizeof (lnet_event_t)); - PORTAL_FREE(events, size * sizeof (ptl_event_t)); - - return (PTL_OK); + return 0; } int -lib_get_event (lib_eq_t *eq, ptl_event_t *ev) +lib_get_event (lnet_eq_t *eq, lnet_event_t *ev) { - int new_index = eq->eq_deq_seq & (eq->eq_size - 1); - ptl_event_t *new_event = &eq->eq_events[new_index]; - int rc; + int new_index = eq->eq_deq_seq & (eq->eq_size - 1); + lnet_event_t *new_event = &eq->eq_events[new_index]; + int rc; ENTRY; CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n", new_event, eq->eq_deq_seq, eq->eq_size); - if (PTL_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) { - RETURN(PTL_EQ_EMPTY); + if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) { + RETURN(0); } /* We've got a new event... */ @@ -163,11 +141,13 @@ lib_get_event (lib_eq_t *eq, ptl_event_t *ev) /* ...but did it overwrite an event we've not seen yet? */ if (eq->eq_deq_seq == new_event->sequence) { - rc = PTL_OK; + rc = 1; } else { - CERROR("Event Queue Overflow: eq seq %lu ev seq %lu\n", + /* don't complain with CERROR: some EQs are sized small + * anyway; if it's important, the caller should complain */ + CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n", eq->eq_deq_seq, new_event->sequence); - rc = PTL_EQ_DROPPED; + rc = -EOVERFLOW; } eq->eq_deq_seq = new_event->sequence + 1; @@ -176,71 +156,144 @@ lib_get_event (lib_eq_t *eq, ptl_event_t *ev) int -lib_api_eq_poll (nal_t *apinal, - ptl_handle_eq_t *eventqs, int neq, int timeout_ms, - ptl_event_t *event, int *which) +LNetEQGet (lnet_handle_eq_t eventq, lnet_event_t *event) +{ + int which; + + return LNetEQPoll(&eventq, 1, 0, + event, &which); +} + +int +LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event) +{ + int which; + + return LNetEQPoll(&eventq, 1, LNET_TIME_FOREVER, + event, &which); +} + +int +LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms, + lnet_event_t *event, int *which) { - lib_nal_t *nal = apinal->nal_data; - lib_ni_t *ni = &nal->libnal_ni; - unsigned long flags; int i; int rc; #ifdef __KERNEL__ - wait_queue_t wq; - unsigned long now; + cfs_waitlink_t wl; + cfs_time_t now; #else struct timeval then; struct timeval now; +# ifdef HAVE_LIBPTHREAD struct timespec ts; +# endif + lnet_ni_t *eqwaitni = the_lnet.ln_eqwaitni; #endif ENTRY; - LIB_LOCK(nal, flags); + LASSERT (the_lnet.ln_init); + LASSERT (the_lnet.ln_refcount > 0); + + if (neq < 1) + RETURN(-ENOENT); + + LNET_LOCK(); for (;;) { for (i = 0; i < neq; i++) { - lib_eq_t *eq = ptl_handle2eq(&eventqs[i], nal); + lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]); + + if (eq == NULL) { + LNET_UNLOCK(); + RETURN(-ENOENT); + } rc = lib_get_event (eq, event); - if (rc != PTL_EQ_EMPTY) { - LIB_UNLOCK(nal, flags); + if (rc != 0) { + LNET_UNLOCK(); *which = i; RETURN(rc); } } - + +#ifdef __KERNEL__ if (timeout_ms == 0) { - LIB_UNLOCK (nal, flags); - RETURN (PTL_EQ_EMPTY); + LNET_UNLOCK(); + RETURN (0); } - /* Some architectures force us to do spin locking/unlocking - * in the same stack frame, means we can abstract the - * locking here */ -#ifdef __KERNEL__ - init_waitqueue_entry(&wq, current); + cfs_waitlink_init(&wl); set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&ni->ni_waitq, &wq); + cfs_waitq_add(&the_lnet.ln_waitq, &wl); - LIB_UNLOCK(nal, flags); + LNET_UNLOCK(); if (timeout_ms < 0) { - schedule (); + cfs_waitq_wait (&wl, CFS_TASK_INTERRUPTIBLE); } else { - now = jiffies; - schedule_timeout((timeout_ms * HZ)/1000); - timeout_ms -= ((jiffies - now) * 1000)/HZ; + struct timeval tv; + + now = cfs_time_current(); + cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE, + cfs_time_seconds(timeout_ms)/1000); + cfs_duration_usec(cfs_time_sub(cfs_time_current(), now), + &tv); + timeout_ms -= tv.tv_sec * 1000 + tv.tv_usec / 1000; if (timeout_ms < 0) timeout_ms = 0; } - - LIB_LOCK(nal, flags); + + LNET_LOCK(); + cfs_waitq_del(&the_lnet.ln_waitq, &wl); #else + if (eqwaitni != NULL) { + /* I have a single NI that I have to call into, to get + * events queued, or to block. */ + lnet_ni_addref_locked(eqwaitni); + LNET_UNLOCK(); + + if (timeout_ms <= 0) { + (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms); + } else { + gettimeofday(&then, NULL); + + (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms); + + gettimeofday(&now, NULL); + timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 + + (now.tv_usec - then.tv_usec) / 1000; + if (timeout_ms < 0) + timeout_ms = 0; + } + + LNET_LOCK(); + lnet_ni_decref_locked(eqwaitni); + + /* don't call into eqwaitni again if timeout has + * expired */ + if (timeout_ms == 0) + eqwaitni = NULL; + + continue; /* go back and check for events */ + } + + if (timeout_ms == 0) { + LNET_UNLOCK(); + RETURN (0); + } + +# ifndef HAVE_LIBPTHREAD + /* If I'm single-threaded, LNET fails at startup if it can't + * set the_lnet.ln_eqwaitni correctly. */ + LBUG(); +# else if (timeout_ms < 0) { - pthread_cond_wait(&ni->ni_cond, &ni->ni_mutex); + pthread_cond_wait(&the_lnet.ln_cond, + &the_lnet.ln_lock); } else { gettimeofday(&then, NULL); - + ts.tv_sec = then.tv_sec + timeout_ms/1000; ts.tv_nsec = then.tv_usec * 1000 + (timeout_ms%1000) * 1000000; @@ -248,17 +301,18 @@ lib_api_eq_poll (nal_t *apinal, ts.tv_sec++; ts.tv_nsec -= 1000000000; } - - pthread_cond_timedwait(&ni->ni_cond, - &ni->ni_mutex, &ts); - + + pthread_cond_timedwait(&the_lnet.ln_cond, + &the_lnet.ln_lock, &ts); + gettimeofday(&now, NULL); timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 + (now.tv_usec - then.tv_usec) / 1000; - + if (timeout_ms < 0) timeout_ms = 0; } +# endif #endif } }