1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Library level Event queue management routines
7 * Copyright (c) 2001-2003 Cluster File Systems, Inc.
8 * Copyright (c) 2001-2002 Sandia National Laboratories
10 * This file is part of Lustre, http://www.sf.net/projects/lustre/
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 #define DEBUG_SUBSYSTEM S_PORTALS
27 #include <portals/lib-p30.h>
30 lib_api_eq_alloc (nal_t *apinal, ptl_size_t count,
31 ptl_eq_handler_t callback,
32 ptl_handle_eq_t *handle)
34 lib_nal_t *nal = apinal->nal_data;
39 /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
40 * overflow, they don't skip entries, so the queue has the same
41 * apparant capacity at all times */
43 if (count != LOWEST_BIT_SET(count)) { /* not a power of 2 already */
44 do { /* knock off all but the top bit... */
45 count &= ~LOWEST_BIT_SET (count);
46 } while (count != LOWEST_BIT_SET(count));
48 count <<= 1; /* ...and round up */
51 if (count == 0) /* catch bad parameter / overflow on roundup */
52 return (PTL_VAL_FAILED);
54 eq = lib_eq_alloc (nal);
56 return (PTL_NO_SPACE);
58 PORTAL_ALLOC(eq->eq_events, count * sizeof(ptl_event_t));
59 if (eq->eq_events == NULL) {
61 lib_eq_free (nal, eq);
62 LIB_UNLOCK(nal, flags);
65 if (nal->libnal_map != NULL) {
67 .iov_base = eq->eq_events,
68 .iov_len = count * sizeof(ptl_event_t)};
70 rc = nal->libnal_map(nal, 1, &iov, &eq->eq_addrkey);
73 lib_eq_free (nal, eq);
74 LIB_UNLOCK(nal, flags);
79 /* NB this resets all event sequence numbers to 0, to be earlier
81 memset(eq->eq_events, 0, count * sizeof(ptl_event_t));
87 eq->eq_callback = callback;
91 lib_initialise_handle (nal, &eq->eq_lh, PTL_COOKIE_TYPE_EQ);
92 list_add (&eq->eq_list, &nal->libnal_ni.ni_active_eqs);
94 LIB_UNLOCK(nal, flags);
96 ptl_eq2handle(handle, nal, eq);
101 lib_api_eq_free(nal_t *apinal, ptl_handle_eq_t *eqh)
103 lib_nal_t *nal = apinal->nal_data;
110 LIB_LOCK(nal, flags);
112 eq = ptl_handle2eq(eqh, nal);
114 LIB_UNLOCK(nal, flags);
115 return (PTL_EQ_INVALID);
118 if (eq->eq_refcount != 0) {
119 LIB_UNLOCK(nal, flags);
120 return (PTL_EQ_IN_USE);
123 /* stash for free after lock dropped */
124 events = eq->eq_events;
126 addrkey = eq->eq_addrkey;
128 lib_invalidate_handle (nal, &eq->eq_lh);
129 list_del (&eq->eq_list);
130 lib_eq_free (nal, eq);
132 LIB_UNLOCK(nal, flags);
134 if (nal->libnal_unmap != NULL) {
137 .iov_len = size * sizeof(ptl_event_t)};
139 nal->libnal_unmap(nal, 1, &iov, &addrkey);
142 PORTAL_FREE(events, size * sizeof (ptl_event_t));
148 lib_get_event (lib_eq_t *eq, ptl_event_t *ev)
150 int new_index = eq->eq_deq_seq & (eq->eq_size - 1);
151 ptl_event_t *new_event = &eq->eq_events[new_index];
155 CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
156 new_event, eq->eq_deq_seq, eq->eq_size);
158 if (PTL_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
159 RETURN(PTL_EQ_EMPTY);
162 /* We've got a new event... */
165 /* ...but did it overwrite an event we've not seen yet? */
166 if (eq->eq_deq_seq == new_event->sequence) {
169 CERROR("Event Queue Overflow: eq seq %lu ev seq %lu\n",
170 eq->eq_deq_seq, new_event->sequence);
174 eq->eq_deq_seq = new_event->sequence + 1;
180 lib_api_eq_poll (nal_t *apinal,
181 ptl_handle_eq_t *eventqs, int neq, int timeout_ms,
182 ptl_event_t *event, int *which)
184 lib_nal_t *nal = apinal->nal_data;
185 lib_ni_t *ni = &nal->libnal_ni;
199 LIB_LOCK(nal, flags);
202 for (i = 0; i < neq; i++) {
203 lib_eq_t *eq = ptl_handle2eq(&eventqs[i], nal);
205 rc = lib_get_event (eq, event);
206 if (rc != PTL_EQ_EMPTY) {
207 LIB_UNLOCK(nal, flags);
213 if (timeout_ms == 0) {
214 LIB_UNLOCK (nal, flags);
215 RETURN (PTL_EQ_EMPTY);
218 /* Some architectures force us to do spin locking/unlocking
219 * in the same stack frame, means we can abstract the
222 init_waitqueue_entry(&wq, current);
223 set_current_state(TASK_INTERRUPTIBLE);
224 add_wait_queue(&ni->ni_waitq, &wq);
226 LIB_UNLOCK(nal, flags);
228 if (timeout_ms < 0) {
232 schedule_timeout((timeout_ms * HZ)/1000);
233 timeout_ms -= ((jiffies - now) * 1000)/HZ;
238 LIB_LOCK(nal, flags);
240 if (timeout_ms < 0) {
241 pthread_cond_wait(&ni->ni_cond, &ni->ni_mutex);
243 gettimeofday(&then, NULL);
245 ts.tv_sec = then.tv_sec + timeout_ms/1000;
246 ts.tv_nsec = then.tv_usec * 1000 +
247 (timeout_ms%1000) * 1000000;
248 if (ts.tv_nsec >= 1000000000) {
250 ts.tv_nsec -= 1000000000;
253 pthread_cond_timedwait(&ni->ni_cond,
256 gettimeofday(&now, NULL);
257 timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
258 (now.tv_usec - then.tv_usec) / 1000;