1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Library level Event queue management routines
7 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
9 * This file is part of Lustre, http://www.lustre.org
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #define DEBUG_SUBSYSTEM S_PORTALS
26 #include <portals/lib-p30.h>
29 lib_api_eq_alloc (nal_t *apinal, ptl_size_t count,
30 ptl_eq_handler_t callback,
31 ptl_handle_eq_t *handle)
33 lib_nal_t *nal = apinal->nal_data;
38 /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
39 * overflow, they don't skip entries, so the queue has the same
40 * apparant capacity at all times */
42 if (count != LOWEST_BIT_SET(count)) { /* not a power of 2 already */
43 do { /* knock off all but the top bit... */
44 count &= ~LOWEST_BIT_SET (count);
45 } while (count != LOWEST_BIT_SET(count));
47 count <<= 1; /* ...and round up */
50 if (count == 0) /* catch bad parameter / overflow on roundup */
51 return (PTL_VAL_FAILED);
53 eq = lib_eq_alloc (nal);
55 return (PTL_NO_SPACE);
57 PORTAL_ALLOC(eq->eq_events, count * sizeof(ptl_event_t));
58 if (eq->eq_events == NULL) {
60 lib_eq_free (nal, eq);
61 LIB_UNLOCK(nal, flags);
64 if (nal->libnal_map != NULL) {
66 .iov_base = eq->eq_events,
67 .iov_len = count * sizeof(ptl_event_t)};
69 rc = nal->libnal_map(nal, 1, &iov, &eq->eq_addrkey);
72 lib_eq_free (nal, eq);
73 LIB_UNLOCK(nal, flags);
78 /* NB this resets all event sequence numbers to 0, to be earlier
80 memset(eq->eq_events, 0, count * sizeof(ptl_event_t));
86 eq->eq_callback = callback;
90 lib_initialise_handle (nal, &eq->eq_lh, PTL_COOKIE_TYPE_EQ);
91 list_add (&eq->eq_list, &nal->libnal_ni.ni_active_eqs);
93 LIB_UNLOCK(nal, flags);
95 ptl_eq2handle(handle, nal, eq);
100 lib_api_eq_free(nal_t *apinal, ptl_handle_eq_t *eqh)
102 lib_nal_t *nal = apinal->nal_data;
109 LIB_LOCK(nal, flags);
111 eq = ptl_handle2eq(eqh, nal);
113 LIB_UNLOCK(nal, flags);
114 return (PTL_EQ_INVALID);
117 if (eq->eq_refcount != 0) {
118 LIB_UNLOCK(nal, flags);
119 return (PTL_EQ_IN_USE);
122 /* stash for free after lock dropped */
123 events = eq->eq_events;
125 addrkey = eq->eq_addrkey;
127 lib_invalidate_handle (nal, &eq->eq_lh);
128 list_del (&eq->eq_list);
129 lib_eq_free (nal, eq);
131 LIB_UNLOCK(nal, flags);
133 if (nal->libnal_unmap != NULL) {
136 .iov_len = size * sizeof(ptl_event_t)};
138 nal->libnal_unmap(nal, 1, &iov, &addrkey);
141 PORTAL_FREE(events, size * sizeof (ptl_event_t));
147 lib_get_event (lib_eq_t *eq, ptl_event_t *ev)
149 int new_index = eq->eq_deq_seq & (eq->eq_size - 1);
150 ptl_event_t *new_event = &eq->eq_events[new_index];
154 CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
155 new_event, eq->eq_deq_seq, eq->eq_size);
157 if (PTL_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
158 RETURN(PTL_EQ_EMPTY);
161 /* We've got a new event... */
164 /* ...but did it overwrite an event we've not seen yet? */
165 if (eq->eq_deq_seq == new_event->sequence) {
168 CERROR("Event Queue Overflow: eq seq %lu ev seq %lu\n",
169 eq->eq_deq_seq, new_event->sequence);
173 eq->eq_deq_seq = new_event->sequence + 1;
179 lib_api_eq_poll (nal_t *apinal,
180 ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
181 ptl_event_t *event, int *which)
183 lib_nal_t *nal = apinal->nal_data;
184 lib_ni_t *ni = &nal->libnal_ni;
198 LIB_LOCK(nal, flags);
201 for (i = 0; i < neq; i++) {
202 lib_eq_t *eq = ptl_handle2eq(&eventqs[i], nal);
204 rc = lib_get_event (eq, event);
205 if (rc != PTL_EQ_EMPTY) {
206 LIB_UNLOCK(nal, flags);
212 if (timeout_ms == 0) {
213 LIB_UNLOCK (nal, flags);
214 RETURN (PTL_EQ_EMPTY);
217 /* Some architectures force us to do spin locking/unlocking
218 * in the same stack frame, means we can abstract the
221 cfs_waitlink_init(&wl);
222 set_current_state(TASK_INTERRUPTIBLE);
223 cfs_waitq_add(&ni->ni_waitq, &wl);
225 LIB_UNLOCK(nal, flags);
227 if (timeout_ms < 0) {
228 cfs_waitq_wait (&wl);
232 now = cfs_time_current();
233 cfs_waitq_timedwait(&wl, cfs_time_seconds(timeout_ms)/1000);
234 cfs_duration_usec(cfs_time_sub(cfs_time_current(), now), &tv);
235 timeout_ms -= tv.tv_sec * 1000 + tv.tv_usec / 1000;
240 LIB_LOCK(nal, flags);
241 cfs_waitq_del(&ni->ni_waitq, &wl);
243 if (timeout_ms < 0) {
244 pthread_cond_wait(&ni->ni_cond, &ni->ni_mutex);
246 gettimeofday(&then, NULL);
248 ts.tv_sec = then.tv_sec + timeout_ms/1000;
249 ts.tv_nsec = then.tv_usec * 1000 +
250 (timeout_ms%1000) * 1000000;
251 if (ts.tv_nsec >= 1000000000) {
253 ts.tv_nsec -= 1000000000;
256 pthread_cond_timedwait(&ni->ni_cond,
259 gettimeofday(&now, NULL);
260 timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
261 (now.tv_usec - then.tv_usec) / 1000;