Whamcloud - gitweb
b21619 hash ME on RDMA portal i=isaac i=maxim
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/lnet/lib-eq.c
37  *
38  * Library level Event queue management routines
39  */
40
41 #define DEBUG_SUBSYSTEM S_LNET
42 #include <lnet/lib-lnet.h>
43
44 int
45 LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
46             lnet_handle_eq_t *handle)
47 {
48         lnet_eq_t     *eq;
49
50         LASSERT (the_lnet.ln_init);
51         LASSERT (the_lnet.ln_refcount > 0);
52
53         /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
54          * overflow, they don't skip entries, so the queue has the same
55          * apparant capacity at all times */
56
57         if (count != LOWEST_BIT_SET(count)) {   /* not a power of 2 already */
58                 do {                    /* knock off all but the top bit... */
59                         count &= ~LOWEST_BIT_SET (count);
60                 } while (count != LOWEST_BIT_SET(count));
61
62                 count <<= 1;                             /* ...and round up */
63         }
64
65         if (count == 0)        /* catch bad parameter / overflow on roundup */
66                 return (-EINVAL);
67
68         eq = lnet_eq_alloc();
69         if (eq == NULL)
70                 return (-ENOMEM);
71
72         LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
73         if (eq->eq_events == NULL) {
74                 LNET_LOCK();
75                 lnet_eq_free (eq);
76                 LNET_UNLOCK();
77
78                 return -ENOMEM;
79         }
80
81         /* NB this resets all event sequence numbers to 0, to be earlier
82          * than eq_deq_seq */
83         memset(eq->eq_events, 0, count * sizeof(lnet_event_t));
84
85         eq->eq_deq_seq = 1;
86         eq->eq_enq_seq = 1;
87         eq->eq_size = count;
88         eq->eq_refcount = 0;
89         eq->eq_callback = callback;
90
91         LNET_LOCK();
92
93         lnet_initialise_handle (&eq->eq_lh, LNET_COOKIE_TYPE_EQ);
94         cfs_list_add (&eq->eq_list, &the_lnet.ln_active_eqs);
95
96         LNET_UNLOCK();
97
98         lnet_eq2handle(handle, eq);
99         return (0);
100 }
101
102 int
103 LNetEQFree(lnet_handle_eq_t eqh)
104 {
105         lnet_eq_t     *eq;
106         int            size;
107         lnet_event_t  *events;
108
109         LASSERT (the_lnet.ln_init);
110         LASSERT (the_lnet.ln_refcount > 0);
111
112         LNET_LOCK();
113
114         eq = lnet_handle2eq(&eqh);
115         if (eq == NULL) {
116                 LNET_UNLOCK();
117                 return (-ENOENT);
118         }
119
120         if (eq->eq_refcount != 0) {
121                 CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n",
122                        eq->eq_refcount);
123                 LNET_UNLOCK();
124                 return (-EBUSY);
125         }
126
127         /* stash for free after lock dropped */
128         events  = eq->eq_events;
129         size    = eq->eq_size;
130
131         lnet_invalidate_handle (&eq->eq_lh);
132         cfs_list_del (&eq->eq_list);
133         lnet_eq_free (eq);
134
135         LNET_UNLOCK();
136
137         LIBCFS_FREE(events, size * sizeof (lnet_event_t));
138
139         return 0;
140 }
141
142 int
143 lib_get_event (lnet_eq_t *eq, lnet_event_t *ev)
144 {
145         int           new_index = eq->eq_deq_seq & (eq->eq_size - 1);
146         lnet_event_t *new_event = &eq->eq_events[new_index];
147         int           rc;
148         ENTRY;
149
150         CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
151                new_event, eq->eq_deq_seq, eq->eq_size);
152
153         if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
154                 RETURN(0);
155         }
156
157         /* We've got a new event... */
158         *ev = *new_event;
159
160         /* ...but did it overwrite an event we've not seen yet? */
161         if (eq->eq_deq_seq == new_event->sequence) {
162                 rc = 1;
163         } else {
164                 /* don't complain with CERROR: some EQs are sized small
165                  * anyway; if it's important, the caller should complain */
166                 CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n",
167                        eq->eq_deq_seq, new_event->sequence);
168                 rc = -EOVERFLOW;
169         }
170
171         eq->eq_deq_seq = new_event->sequence + 1;
172         RETURN(rc);
173 }
174
175
176 int
177 LNetEQGet (lnet_handle_eq_t eventq, lnet_event_t *event)
178 {
179         int which;
180
181         return LNetEQPoll(&eventq, 1, 0,
182                          event, &which);
183 }
184
185 int
186 LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event)
187 {
188         int which;
189
190         return LNetEQPoll(&eventq, 1, LNET_TIME_FOREVER,
191                          event, &which);
192 }
193
194 int
195 LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
196             lnet_event_t *event, int *which)
197 {
198         int              i;
199         int              rc;
200 #ifdef __KERNEL__
201         cfs_waitlink_t   wl;
202         cfs_time_t       now;
203 #else
204         struct timeval   then;
205         struct timeval   now;
206 # ifdef HAVE_LIBPTHREAD
207         struct timespec  ts;
208 # endif
209         lnet_ni_t       *eqwaitni = the_lnet.ln_eqwaitni;
210 #endif
211         ENTRY;
212
213         LASSERT (the_lnet.ln_init);
214         LASSERT (the_lnet.ln_refcount > 0);
215
216         if (neq < 1)
217                 RETURN(-ENOENT);
218
219         LNET_LOCK();
220
221         for (;;) {
222 #ifndef __KERNEL__
223                 LNET_UNLOCK();
224
225                 /* Recursion breaker */
226                 if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
227                     !LNetHandleIsEqual(eventqs[0], the_lnet.ln_rc_eqh))
228                         lnet_router_checker();
229
230                 LNET_LOCK();
231 #endif
232                 for (i = 0; i < neq; i++) {
233                         lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
234
235                         if (eq == NULL) {
236                                 LNET_UNLOCK();
237                                 RETURN(-ENOENT);
238                         }
239
240                         rc = lib_get_event (eq, event);
241                         if (rc != 0) {
242                                 LNET_UNLOCK();
243                                 *which = i;
244                                 RETURN(rc);
245                         }
246                 }
247
248 #ifdef __KERNEL__
249                 if (timeout_ms == 0) {
250                         LNET_UNLOCK();
251                         RETURN (0);
252                 }
253
254                 cfs_waitlink_init(&wl);
255                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
256                 cfs_waitq_add(&the_lnet.ln_waitq, &wl);
257
258                 LNET_UNLOCK();
259
260                 if (timeout_ms < 0) {
261                         cfs_waitq_wait (&wl, CFS_TASK_INTERRUPTIBLE);
262                 } else {
263                         struct timeval tv;
264
265                         now = cfs_time_current();
266                         cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE,
267                                             cfs_time_seconds(timeout_ms)/1000);
268                         cfs_duration_usec(cfs_time_sub(cfs_time_current(), now),
269                                             &tv);
270                         timeout_ms -= (int)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
271                         if (timeout_ms < 0)
272                                 timeout_ms = 0;
273                 }
274
275                 LNET_LOCK();
276                 cfs_waitq_del(&the_lnet.ln_waitq, &wl);
277 #else
278                 if (eqwaitni != NULL) {
279                         /* I have a single NI that I have to call into, to get
280                          * events queued, or to block. */
281                         lnet_ni_addref_locked(eqwaitni);
282                         LNET_UNLOCK();
283
284                         if (timeout_ms <= 0) {
285                                 (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
286                         } else {
287                                 gettimeofday(&then, NULL);
288
289                                 (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
290
291                                 gettimeofday(&now, NULL);
292                                 timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
293                                               (now.tv_usec - then.tv_usec) / 1000;
294                                 if (timeout_ms < 0)
295                                         timeout_ms = 0;
296                         }
297
298                         LNET_LOCK();
299                         lnet_ni_decref_locked(eqwaitni);
300
301                         /* don't call into eqwaitni again if timeout has
302                          * expired */
303                         if (timeout_ms == 0)
304                                 eqwaitni = NULL;
305
306                         continue;               /* go back and check for events */
307                 }
308
309                 if (timeout_ms == 0) {
310                         LNET_UNLOCK();
311                         RETURN (0);
312                 }
313
314 # ifndef HAVE_LIBPTHREAD
315                 /* If I'm single-threaded, LNET fails at startup if it can't
316                  * set the_lnet.ln_eqwaitni correctly.  */
317                 LBUG();
318 # else
319                 if (timeout_ms < 0) {
320                         pthread_cond_wait(&the_lnet.ln_cond,
321                                           &the_lnet.ln_lock);
322                 } else {
323                         gettimeofday(&then, NULL);
324
325                         ts.tv_sec = then.tv_sec + timeout_ms/1000;
326                         ts.tv_nsec = then.tv_usec * 1000 +
327                                      (timeout_ms%1000) * 1000000;
328                         if (ts.tv_nsec >= 1000000000) {
329                                 ts.tv_sec++;
330                                 ts.tv_nsec -= 1000000000;
331                         }
332
333                         pthread_cond_timedwait(&the_lnet.ln_cond,
334                                                &the_lnet.ln_lock, &ts);
335
336                         gettimeofday(&now, NULL);
337                         timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
338                                       (now.tv_usec - then.tv_usec) / 1000;
339
340                         if (timeout_ms < 0)
341                                 timeout_ms = 0;
342                 }
343 # endif
344 #endif
345         }
346 }