X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lnet%2Flnet%2Flib-eq.c;h=5bae602fa88efd61097f6564ae07a61faf13efb1;hp=8ea6fddb12d745b59baf8a1efaf0a9be056668ad;hb=05f5c4ca167911c60e4a72beb411306363df877d;hpb=23de47e82bd999ec651f927097922413527cca71;ds=sidebyside diff --git a/lnet/lnet/lib-eq.c b/lnet/lnet/lib-eq.c index 8ea6fdd..5bae602 100644 --- a/lnet/lnet/lib-eq.c +++ b/lnet/lnet/lib-eq.c @@ -5,9 +5,8 @@ * Library level Event queue management routines * * Copyright (c) 2001-2003 Cluster File Systems, Inc. - * Copyright (c) 2001-2002 Sandia National Laboratories * - * This file is part of Lustre, http://www.sf.net/projects/lustre/ + * This file is part of Lustre, http://www.lustre.org * * Lustre is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public @@ -23,19 +22,18 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#define DEBUG_SUBSYSTEM S_PORTALS -#include +#define DEBUG_SUBSYSTEM S_LNET +#include -int -lib_api_eq_alloc (nal_t *apinal, ptl_size_t count, - ptl_eq_handler_t callback, - ptl_handle_eq_t *handle) +int +LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback, + lnet_handle_eq_t *handle) { - lib_nal_t *nal = apinal->nal_data; - lib_eq_t *eq; - unsigned long flags; - int rc; + lnet_eq_t *eq; + LASSERT (the_lnet.ln_init); + LASSERT (the_lnet.ln_refcount > 0); + /* We need count to be a power of 2 so that when eq_{enq,deq}_seq * overflow, they don't skip entries, so the queue has the same * apparant capacity at all times */ @@ -49,36 +47,24 @@ lib_api_eq_alloc (nal_t *apinal, ptl_size_t count, } if (count == 0) /* catch bad parameter / overflow on roundup */ - return (PTL_VAL_FAILED); + return (-EINVAL); - eq = lib_eq_alloc (nal); + eq = lnet_eq_alloc(); if (eq == NULL) - return (PTL_NO_SPACE); + return (-ENOMEM); - PORTAL_ALLOC(eq->eq_events, count * sizeof(ptl_event_t)); + LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t)); if (eq->eq_events == NULL) { - LIB_LOCK(nal, flags); - lib_eq_free (nal, eq); - LIB_UNLOCK(nal, flags); - } + LNET_LOCK(); + lnet_eq_free (eq); + LNET_UNLOCK(); - if (nal->libnal_map != NULL) { - struct iovec iov = { - .iov_base = eq->eq_events, - .iov_len = count * sizeof(ptl_event_t)}; - - rc = nal->libnal_map(nal, 1, &iov, &eq->eq_addrkey); - if (rc != PTL_OK) { - LIB_LOCK(nal, flags); - lib_eq_free (nal, eq); - LIB_UNLOCK(nal, flags); - return (rc); - } + return -ENOMEM; } /* NB this resets all event sequence numbers to 0, to be earlier * than eq_deq_seq */ - memset(eq->eq_events, 0, count * sizeof(ptl_event_t)); + memset(eq->eq_events, 0, count * sizeof(lnet_event_t)); eq->eq_deq_seq = 1; eq->eq_enq_seq = 1; @@ -86,77 +72,68 @@ lib_api_eq_alloc (nal_t *apinal, ptl_size_t count, eq->eq_refcount = 0; eq->eq_callback = callback; - LIB_LOCK(nal, flags); + LNET_LOCK(); - lib_initialise_handle (nal, &eq->eq_lh, PTL_COOKIE_TYPE_EQ); - list_add (&eq->eq_list, &nal->libnal_ni.ni_active_eqs); + lnet_initialise_handle (&eq->eq_lh, LNET_COOKIE_TYPE_EQ); + list_add (&eq->eq_list, &the_lnet.ln_active_eqs); - LIB_UNLOCK(nal, flags); + LNET_UNLOCK(); - ptl_eq2handle(handle, nal, eq); - return (PTL_OK); + lnet_eq2handle(handle, eq); + return (0); } -int -lib_api_eq_free(nal_t *apinal, ptl_handle_eq_t *eqh) +int +LNetEQFree(lnet_handle_eq_t eqh) { - lib_nal_t *nal = apinal->nal_data; - lib_eq_t *eq; + lnet_eq_t *eq; int size; - ptl_event_t *events; - void *addrkey; - unsigned long flags; + lnet_event_t *events; - LIB_LOCK(nal, flags); + LASSERT (the_lnet.ln_init); + LASSERT (the_lnet.ln_refcount > 0); + + LNET_LOCK(); - eq = ptl_handle2eq(eqh, nal); + eq = lnet_handle2eq(&eqh); if (eq == NULL) { - LIB_UNLOCK(nal, flags); - return (PTL_EQ_INVALID); + LNET_UNLOCK(); + return (-ENOENT); } if (eq->eq_refcount != 0) { - LIB_UNLOCK(nal, flags); - return (PTL_EQ_IN_USE); + LNET_UNLOCK(); + return (-EBUSY); } /* stash for free after lock dropped */ events = eq->eq_events; size = eq->eq_size; - addrkey = eq->eq_addrkey; - lib_invalidate_handle (nal, &eq->eq_lh); + lnet_invalidate_handle (&eq->eq_lh); list_del (&eq->eq_list); - lib_eq_free (nal, eq); - - LIB_UNLOCK(nal, flags); + lnet_eq_free (eq); - if (nal->libnal_unmap != NULL) { - struct iovec iov = { - .iov_base = events, - .iov_len = size * sizeof(ptl_event_t)}; - - nal->libnal_unmap(nal, 1, &iov, &addrkey); - } + LNET_UNLOCK(); - PORTAL_FREE(events, size * sizeof (ptl_event_t)); + LIBCFS_FREE(events, size * sizeof (lnet_event_t)); - return (PTL_OK); + return 0; } int -lib_get_event (lib_eq_t *eq, ptl_event_t *ev) +lib_get_event (lnet_eq_t *eq, lnet_event_t *ev) { - int new_index = eq->eq_deq_seq & (eq->eq_size - 1); - ptl_event_t *new_event = &eq->eq_events[new_index]; - int rc; + int new_index = eq->eq_deq_seq & (eq->eq_size - 1); + lnet_event_t *new_event = &eq->eq_events[new_index]; + int rc; ENTRY; CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n", new_event, eq->eq_deq_seq, eq->eq_size); - if (PTL_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) { - RETURN(PTL_EQ_EMPTY); + if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) { + RETURN(0); } /* We've got a new event... */ @@ -164,11 +141,13 @@ lib_get_event (lib_eq_t *eq, ptl_event_t *ev) /* ...but did it overwrite an event we've not seen yet? */ if (eq->eq_deq_seq == new_event->sequence) { - rc = PTL_OK; + rc = 1; } else { - CERROR("Event Queue Overflow: eq seq %lu ev seq %lu\n", + /* don't complain with CERROR: some EQs are sized small + * anyway; if it's important, the caller should complain */ + CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n", eq->eq_deq_seq, new_event->sequence); - rc = PTL_EQ_DROPPED; + rc = -EOVERFLOW; } eq->eq_deq_seq = new_event->sequence + 1; @@ -177,68 +156,141 @@ lib_get_event (lib_eq_t *eq, ptl_event_t *ev) int -lib_api_eq_poll (nal_t *apinal, - ptl_handle_eq_t *eventqs, int neq, int timeout_ms, - ptl_event_t *event, int *which) +LNetEQGet (lnet_handle_eq_t eventq, lnet_event_t *event) +{ + int which; + + return LNetEQPoll(&eventq, 1, 0, + event, &which); +} + +int +LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event) +{ + int which; + + return LNetEQPoll(&eventq, 1, LNET_TIME_FOREVER, + event, &which); +} + +int +LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms, + lnet_event_t *event, int *which) { - lib_nal_t *nal = apinal->nal_data; - lib_ni_t *ni = &nal->libnal_ni; - unsigned long flags; int i; int rc; #ifdef __KERNEL__ - wait_queue_t wq; - unsigned long now; + cfs_waitlink_t wl; + cfs_time_t now; #else struct timeval then; struct timeval now; +# ifdef HAVE_LIBPTHREAD struct timespec ts; +# endif + lnet_ni_t *eqwaitni = the_lnet.ln_eqwaitni; #endif ENTRY; - LIB_LOCK(nal, flags); + LASSERT (the_lnet.ln_init); + LASSERT (the_lnet.ln_refcount > 0); + + if (neq < 1) + RETURN(-ENOENT); + + LNET_LOCK(); for (;;) { for (i = 0; i < neq; i++) { - lib_eq_t *eq = ptl_handle2eq(&eventqs[i], nal); + lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]); + + if (eq == NULL) { + LNET_UNLOCK(); + RETURN(-ENOENT); + } rc = lib_get_event (eq, event); - if (rc != PTL_EQ_EMPTY) { - LIB_UNLOCK(nal, flags); + if (rc != 0) { + LNET_UNLOCK(); *which = i; RETURN(rc); } } +#ifdef __KERNEL__ if (timeout_ms == 0) { - LIB_UNLOCK (nal, flags); - RETURN (PTL_EQ_EMPTY); + LNET_UNLOCK (); + RETURN (0); } - /* Some architectures force us to do spin locking/unlocking - * in the same stack frame, means we can abstract the - * locking here */ -#ifdef __KERNEL__ - init_waitqueue_entry(&wq, current); + cfs_waitlink_init(&wl); set_current_state(TASK_INTERRUPTIBLE); - add_wait_queue(&ni->ni_waitq, &wq); + cfs_waitq_add(&the_lnet.ln_waitq, &wl); - LIB_UNLOCK(nal, flags); + LNET_UNLOCK(); if (timeout_ms < 0) { - schedule (); - } else { - now = jiffies; - schedule_timeout((timeout_ms * HZ)/1000); - timeout_ms -= ((jiffies - now) * 1000)/HZ; + cfs_waitq_wait (&wl, CFS_TASK_INTERRUPTIBLE); + } else { + struct timeval tv; + + now = cfs_time_current(); + cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE, + cfs_time_seconds(timeout_ms)/1000); + cfs_duration_usec(cfs_time_sub(cfs_time_current(), now), + &tv); + timeout_ms -= tv.tv_sec * 1000 + tv.tv_usec / 1000; if (timeout_ms < 0) timeout_ms = 0; } - LIB_LOCK(nal, flags); + LNET_LOCK(); + cfs_waitq_del(&the_lnet.ln_waitq, &wl); #else + if (eqwaitni != NULL) { + /* I have a single NI that I have to call into, to get + * events queued, or to block. */ + lnet_ni_addref_locked(eqwaitni); + LNET_UNLOCK(); + + if (timeout_ms <= 0) { + (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms); + } else { + gettimeofday(&then, NULL); + + (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms); + + gettimeofday(&now, NULL); + timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 + + (now.tv_usec - then.tv_usec) / 1000; + if (timeout_ms < 0) + timeout_ms = 0; + } + + LNET_LOCK(); + lnet_ni_decref_locked(eqwaitni); + + /* don't call into eqwaitni again if timeout has + * expired */ + if (timeout_ms == 0) + eqwaitni = NULL; + + continue; /* go back and check for events */ + } + + if (timeout_ms == 0) { + LNET_UNLOCK(); + RETURN (0); + } + +# ifndef HAVE_LIBPTHREAD + /* If I'm single-threaded, LNET fails at startup if it can't + * set the_lnet.ln_eqwaitni correctly. */ + LBUG(); +# else if (timeout_ms < 0) { - pthread_cond_wait(&ni->ni_cond, &ni->ni_mutex); + pthread_cond_wait(&the_lnet.ln_cond, + &the_lnet.ln_lock); } else { gettimeofday(&then, NULL); @@ -250,8 +302,8 @@ lib_api_eq_poll (nal_t *apinal, ts.tv_nsec -= 1000000000; } - pthread_cond_timedwait(&ni->ni_cond, - &ni->ni_mutex, &ts); + pthread_cond_timedwait(&the_lnet.ln_cond, + &the_lnet.ln_lock, &ts); gettimeofday(&now, NULL); timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 + @@ -260,6 +312,7 @@ lib_api_eq_poll (nal_t *apinal, if (timeout_ms < 0) timeout_ms = 0; } +# endif #endif } }