1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Library level Event queue management routines
41 #define DEBUG_SUBSYSTEM S_LNET
42 #include <lnet/lib-lnet.h>
45 LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
46 lnet_handle_eq_t *handle)
50 LASSERT (the_lnet.ln_init);
51 LASSERT (the_lnet.ln_refcount > 0);
53 /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
54 * overflow, they don't skip entries, so the queue has the same
55 * apparant capacity at all times */
57 if (count != LOWEST_BIT_SET(count)) { /* not a power of 2 already */
58 do { /* knock off all but the top bit... */
59 count &= ~LOWEST_BIT_SET (count);
60 } while (count != LOWEST_BIT_SET(count));
62 count <<= 1; /* ...and round up */
65 if (count == 0) /* catch bad parameter / overflow on roundup */
72 LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
73 if (eq->eq_events == NULL) {
81 /* NB this resets all event sequence numbers to 0, to be earlier
83 memset(eq->eq_events, 0, count * sizeof(lnet_event_t));
89 eq->eq_callback = callback;
93 lnet_initialise_handle (&eq->eq_lh, LNET_COOKIE_TYPE_EQ);
94 list_add (&eq->eq_list, &the_lnet.ln_active_eqs);
98 lnet_eq2handle(handle, eq);
103 LNetEQFree(lnet_handle_eq_t eqh)
107 lnet_event_t *events;
109 LASSERT (the_lnet.ln_init);
110 LASSERT (the_lnet.ln_refcount > 0);
114 eq = lnet_handle2eq(&eqh);
120 if (eq->eq_refcount != 0) {
121 CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n",
127 /* stash for free after lock dropped */
128 events = eq->eq_events;
131 lnet_invalidate_handle (&eq->eq_lh);
132 list_del (&eq->eq_list);
137 LIBCFS_FREE(events, size * sizeof (lnet_event_t));
143 lib_get_event (lnet_eq_t *eq, lnet_event_t *ev)
145 int new_index = eq->eq_deq_seq & (eq->eq_size - 1);
146 lnet_event_t *new_event = &eq->eq_events[new_index];
150 CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
151 new_event, eq->eq_deq_seq, eq->eq_size);
153 if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
157 /* We've got a new event... */
160 /* ...but did it overwrite an event we've not seen yet? */
161 if (eq->eq_deq_seq == new_event->sequence) {
164 /* don't complain with CERROR: some EQs are sized small
165 * anyway; if it's important, the caller should complain */
166 CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n",
167 eq->eq_deq_seq, new_event->sequence);
171 eq->eq_deq_seq = new_event->sequence + 1;
177 LNetEQGet (lnet_handle_eq_t eventq, lnet_event_t *event)
181 return LNetEQPoll(&eventq, 1, 0,
186 LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event)
190 return LNetEQPoll(&eventq, 1, LNET_TIME_FOREVER,
195 LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
196 lnet_event_t *event, int *which)
206 # ifdef HAVE_LIBPTHREAD
209 lnet_ni_t *eqwaitni = the_lnet.ln_eqwaitni;
213 LASSERT (the_lnet.ln_init);
214 LASSERT (the_lnet.ln_refcount > 0);
222 for (i = 0; i < neq; i++) {
223 lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
230 rc = lib_get_event (eq, event);
239 if (timeout_ms == 0) {
244 cfs_waitlink_init(&wl);
245 set_current_state(TASK_INTERRUPTIBLE);
246 cfs_waitq_add(&the_lnet.ln_waitq, &wl);
250 if (timeout_ms < 0) {
251 cfs_waitq_wait (&wl, CFS_TASK_INTERRUPTIBLE);
255 now = cfs_time_current();
256 cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE,
257 cfs_time_seconds(timeout_ms)/1000);
258 cfs_duration_usec(cfs_time_sub(cfs_time_current(), now),
260 timeout_ms -= (int)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
266 cfs_waitq_del(&the_lnet.ln_waitq, &wl);
268 if (eqwaitni != NULL) {
269 /* I have a single NI that I have to call into, to get
270 * events queued, or to block. */
271 lnet_ni_addref_locked(eqwaitni);
274 if (timeout_ms <= 0) {
275 (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
277 gettimeofday(&then, NULL);
279 (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
281 gettimeofday(&now, NULL);
282 timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
283 (now.tv_usec - then.tv_usec) / 1000;
289 lnet_ni_decref_locked(eqwaitni);
291 /* don't call into eqwaitni again if timeout has
296 continue; /* go back and check for events */
299 if (timeout_ms == 0) {
304 # ifndef HAVE_LIBPTHREAD
305 /* If I'm single-threaded, LNET fails at startup if it can't
306 * set the_lnet.ln_eqwaitni correctly. */
309 if (timeout_ms < 0) {
310 pthread_cond_wait(&the_lnet.ln_cond,
313 gettimeofday(&then, NULL);
315 ts.tv_sec = then.tv_sec + timeout_ms/1000;
316 ts.tv_nsec = then.tv_usec * 1000 +
317 (timeout_ms%1000) * 1000000;
318 if (ts.tv_nsec >= 1000000000) {
320 ts.tv_nsec -= 1000000000;
323 pthread_cond_timedwait(&the_lnet.ln_cond,
324 &the_lnet.ln_lock, &ts);
326 gettimeofday(&now, NULL);
327 timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
328 (now.tv_usec - then.tv_usec) / 1000;