Whamcloud - gitweb
f24758dcbbb258e9081ab183834bf243549175b6
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/lnet/lib-eq.c
37  *
38  * Library level Event queue management routines
39  */
40
41 #define DEBUG_SUBSYSTEM S_LNET
42 #include <lnet/lib-lnet.h>
43
44 int
45 LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
46             lnet_handle_eq_t *handle)
47 {
48         lnet_eq_t     *eq;
49
50         LASSERT (the_lnet.ln_init);
51         LASSERT (the_lnet.ln_refcount > 0);
52
53         /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
54          * overflow, they don't skip entries, so the queue has the same
55          * apparant capacity at all times */
56
57         if (count != LOWEST_BIT_SET(count)) {   /* not a power of 2 already */
58                 do {                    /* knock off all but the top bit... */
59                         count &= ~LOWEST_BIT_SET (count);
60                 } while (count != LOWEST_BIT_SET(count));
61
62                 count <<= 1;                             /* ...and round up */
63         }
64
65         if (count == 0)        /* catch bad parameter / overflow on roundup */
66                 return (-EINVAL);
67
68         eq = lnet_eq_alloc();
69         if (eq == NULL)
70                 return (-ENOMEM);
71
72         LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
73         if (eq->eq_events == NULL) {
74                 LNET_LOCK();
75                 lnet_eq_free (eq);
76                 LNET_UNLOCK();
77
78                 return -ENOMEM;
79         }
80
81         /* NB this resets all event sequence numbers to 0, to be earlier
82          * than eq_deq_seq */
83         memset(eq->eq_events, 0, count * sizeof(lnet_event_t));
84
85         eq->eq_deq_seq = 1;
86         eq->eq_enq_seq = 1;
87         eq->eq_size = count;
88         eq->eq_refcount = 0;
89         eq->eq_callback = callback;
90
91         LNET_LOCK();
92
93         lnet_initialise_handle (&eq->eq_lh, LNET_COOKIE_TYPE_EQ);
94         list_add (&eq->eq_list, &the_lnet.ln_active_eqs);
95
96         LNET_UNLOCK();
97
98         lnet_eq2handle(handle, eq);
99         return (0);
100 }
101
102 int
103 LNetEQFree(lnet_handle_eq_t eqh)
104 {
105         lnet_eq_t     *eq;
106         int            size;
107         lnet_event_t  *events;
108
109         LASSERT (the_lnet.ln_init);
110         LASSERT (the_lnet.ln_refcount > 0);
111
112         LNET_LOCK();
113
114         eq = lnet_handle2eq(&eqh);
115         if (eq == NULL) {
116                 LNET_UNLOCK();
117                 return (-ENOENT);
118         }
119
120         if (eq->eq_refcount != 0) {
121                 CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n",
122                        eq->eq_refcount);
123                 LNET_UNLOCK();
124                 return (-EBUSY);
125         }
126
127         /* stash for free after lock dropped */
128         events  = eq->eq_events;
129         size    = eq->eq_size;
130
131         lnet_invalidate_handle (&eq->eq_lh);
132         list_del (&eq->eq_list);
133         lnet_eq_free (eq);
134
135         LNET_UNLOCK();
136
137         LIBCFS_FREE(events, size * sizeof (lnet_event_t));
138
139         return 0;
140 }
141
142 int
143 lib_get_event (lnet_eq_t *eq, lnet_event_t *ev)
144 {
145         int           new_index = eq->eq_deq_seq & (eq->eq_size - 1);
146         lnet_event_t *new_event = &eq->eq_events[new_index];
147         int           rc;
148         ENTRY;
149
150         CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
151                new_event, eq->eq_deq_seq, eq->eq_size);
152
153         if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
154                 RETURN(0);
155         }
156
157         /* We've got a new event... */
158         *ev = *new_event;
159
160         /* ...but did it overwrite an event we've not seen yet? */
161         if (eq->eq_deq_seq == new_event->sequence) {
162                 rc = 1;
163         } else {
164                 /* don't complain with CERROR: some EQs are sized small
165                  * anyway; if it's important, the caller should complain */
166                 CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n",
167                        eq->eq_deq_seq, new_event->sequence);
168                 rc = -EOVERFLOW;
169         }
170
171         eq->eq_deq_seq = new_event->sequence + 1;
172         RETURN(rc);
173 }
174
175
176 int
177 LNetEQGet (lnet_handle_eq_t eventq, lnet_event_t *event)
178 {
179         int which;
180
181         return LNetEQPoll(&eventq, 1, 0,
182                          event, &which);
183 }
184
185 int
186 LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event)
187 {
188         int which;
189
190         return LNetEQPoll(&eventq, 1, LNET_TIME_FOREVER,
191                          event, &which);
192 }
193
194 int
195 LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
196             lnet_event_t *event, int *which)
197 {
198         int              i;
199         int              rc;
200 #ifdef __KERNEL__
201         cfs_waitlink_t   wl;
202         cfs_time_t       now;
203 #else
204         struct timeval   then;
205         struct timeval   now;
206 # ifdef HAVE_LIBPTHREAD
207         struct timespec  ts;
208 # endif
209         lnet_ni_t       *eqwaitni = the_lnet.ln_eqwaitni;
210 #endif
211         ENTRY;
212
213         LASSERT (the_lnet.ln_init);
214         LASSERT (the_lnet.ln_refcount > 0);
215
216         if (neq < 1)
217                 RETURN(-ENOENT);
218
219         LNET_LOCK();
220
221         for (;;) {
222                 for (i = 0; i < neq; i++) {
223                         lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
224
225                         if (eq == NULL) {
226                                 LNET_UNLOCK();
227                                 RETURN(-ENOENT);
228                         }
229
230                         rc = lib_get_event (eq, event);
231                         if (rc != 0) {
232                                 LNET_UNLOCK();
233                                 *which = i;
234                                 RETURN(rc);
235                         }
236                 }
237
238 #ifdef __KERNEL__
239                 if (timeout_ms == 0) {
240                         LNET_UNLOCK();
241                         RETURN (0);
242                 }
243
244                 cfs_waitlink_init(&wl);
245                 set_current_state(TASK_INTERRUPTIBLE);
246                 cfs_waitq_add(&the_lnet.ln_waitq, &wl);
247
248                 LNET_UNLOCK();
249
250                 if (timeout_ms < 0) {
251                         cfs_waitq_wait (&wl, CFS_TASK_INTERRUPTIBLE);
252                 } else {
253                         struct timeval tv;
254
255                         now = cfs_time_current();
256                         cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE,
257                                             cfs_time_seconds(timeout_ms)/1000);
258                         cfs_duration_usec(cfs_time_sub(cfs_time_current(), now),
259                                             &tv);
260                         timeout_ms -= (int)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
261                         if (timeout_ms < 0)
262                                 timeout_ms = 0;
263                 }
264
265                 LNET_LOCK();
266                 cfs_waitq_del(&the_lnet.ln_waitq, &wl);
267 #else
268                 if (eqwaitni != NULL) {
269                         /* I have a single NI that I have to call into, to get
270                          * events queued, or to block. */
271                         lnet_ni_addref_locked(eqwaitni);
272                         LNET_UNLOCK();
273
274                         if (timeout_ms <= 0) {
275                                 (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
276                         } else {
277                                 gettimeofday(&then, NULL);
278
279                                 (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
280
281                                 gettimeofday(&now, NULL);
282                                 timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
283                                               (now.tv_usec - then.tv_usec) / 1000;
284                                 if (timeout_ms < 0)
285                                         timeout_ms = 0;
286                         }
287
288                         LNET_LOCK();
289                         lnet_ni_decref_locked(eqwaitni);
290
291                         /* don't call into eqwaitni again if timeout has
292                          * expired */
293                         if (timeout_ms == 0)
294                                 eqwaitni = NULL;
295
296                         continue;               /* go back and check for events */
297                 }
298
299                 if (timeout_ms == 0) {
300                         LNET_UNLOCK();
301                         RETURN (0);
302                 }
303
304 # ifndef HAVE_LIBPTHREAD
305                 /* If I'm single-threaded, LNET fails at startup if it can't
306                  * set the_lnet.ln_eqwaitni correctly.  */
307                 LBUG();
308 # else
309                 if (timeout_ms < 0) {
310                         pthread_cond_wait(&the_lnet.ln_cond,
311                                           &the_lnet.ln_lock);
312                 } else {
313                         gettimeofday(&then, NULL);
314
315                         ts.tv_sec = then.tv_sec + timeout_ms/1000;
316                         ts.tv_nsec = then.tv_usec * 1000 +
317                                      (timeout_ms%1000) * 1000000;
318                         if (ts.tv_nsec >= 1000000000) {
319                                 ts.tv_sec++;
320                                 ts.tv_nsec -= 1000000000;
321                         }
322
323                         pthread_cond_timedwait(&the_lnet.ln_cond,
324                                                &the_lnet.ln_lock, &ts);
325
326                         gettimeofday(&now, NULL);
327                         timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
328                                       (now.tv_usec - then.tv_usec) / 1000;
329
330                         if (timeout_ms < 0)
331                                 timeout_ms = 0;
332                 }
333 # endif
334 #endif
335         }
336 }