Whamcloud - gitweb
- on returning -EBUSY in LNetEQFree, print eq_refcount which is a useful piece
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-eq.c
5  * Library level Event queue management routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LNET
26 #include <lnet/lib-lnet.h>
27
28 int
29 LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
30             lnet_handle_eq_t *handle)
31 {
32         lnet_eq_t     *eq;
33
34         LASSERT (the_lnet.ln_init);
35         LASSERT (the_lnet.ln_refcount > 0);
36
37         /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
38          * overflow, they don't skip entries, so the queue has the same
39          * apparant capacity at all times */
40
41         if (count != LOWEST_BIT_SET(count)) {   /* not a power of 2 already */
42                 do {                    /* knock off all but the top bit... */
43                         count &= ~LOWEST_BIT_SET (count);
44                 } while (count != LOWEST_BIT_SET(count));
45
46                 count <<= 1;                             /* ...and round up */
47         }
48
49         if (count == 0)        /* catch bad parameter / overflow on roundup */
50                 return (-EINVAL);
51
52         eq = lnet_eq_alloc();
53         if (eq == NULL)
54                 return (-ENOMEM);
55
56         LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
57         if (eq->eq_events == NULL) {
58                 LNET_LOCK();
59                 lnet_eq_free (eq);
60                 LNET_UNLOCK();
61
62                 return -ENOMEM;
63         }
64
65         /* NB this resets all event sequence numbers to 0, to be earlier
66          * than eq_deq_seq */
67         memset(eq->eq_events, 0, count * sizeof(lnet_event_t));
68
69         eq->eq_deq_seq = 1;
70         eq->eq_enq_seq = 1;
71         eq->eq_size = count;
72         eq->eq_refcount = 0;
73         eq->eq_callback = callback;
74
75         LNET_LOCK();
76
77         lnet_initialise_handle (&eq->eq_lh, LNET_COOKIE_TYPE_EQ);
78         list_add (&eq->eq_list, &the_lnet.ln_active_eqs);
79
80         LNET_UNLOCK();
81
82         lnet_eq2handle(handle, eq);
83         return (0);
84 }
85
86 int
87 LNetEQFree(lnet_handle_eq_t eqh)
88 {
89         lnet_eq_t     *eq;
90         int            size;
91         lnet_event_t  *events;
92
93         LASSERT (the_lnet.ln_init);
94         LASSERT (the_lnet.ln_refcount > 0);
95
96         LNET_LOCK();
97
98         eq = lnet_handle2eq(&eqh);
99         if (eq == NULL) {
100                 LNET_UNLOCK();
101                 return (-ENOENT);
102         }
103
104         if (eq->eq_refcount != 0) {
105                 CDEBUG(D_NET, "Event queue (%d) busy on destroy.\n",
106                        eq->eq_refcount);
107                 LNET_UNLOCK();
108                 return (-EBUSY);
109         }
110
111         /* stash for free after lock dropped */
112         events  = eq->eq_events;
113         size    = eq->eq_size;
114
115         lnet_invalidate_handle (&eq->eq_lh);
116         list_del (&eq->eq_list);
117         lnet_eq_free (eq);
118
119         LNET_UNLOCK();
120
121         LIBCFS_FREE(events, size * sizeof (lnet_event_t));
122
123         return 0;
124 }
125
126 int
127 lib_get_event (lnet_eq_t *eq, lnet_event_t *ev)
128 {
129         int           new_index = eq->eq_deq_seq & (eq->eq_size - 1);
130         lnet_event_t *new_event = &eq->eq_events[new_index];
131         int           rc;
132         ENTRY;
133
134         CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
135                new_event, eq->eq_deq_seq, eq->eq_size);
136
137         if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
138                 RETURN(0);
139         }
140
141         /* We've got a new event... */
142         *ev = *new_event;
143
144         /* ...but did it overwrite an event we've not seen yet? */
145         if (eq->eq_deq_seq == new_event->sequence) {
146                 rc = 1;
147         } else {
148                 /* don't complain with CERROR: some EQs are sized small
149                  * anyway; if it's important, the caller should complain */
150                 CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n",
151                        eq->eq_deq_seq, new_event->sequence);
152                 rc = -EOVERFLOW;
153         }
154
155         eq->eq_deq_seq = new_event->sequence + 1;
156         RETURN(rc);
157 }
158
159
160 int
161 LNetEQGet (lnet_handle_eq_t eventq, lnet_event_t *event)
162 {
163         int which;
164
165         return LNetEQPoll(&eventq, 1, 0,
166                          event, &which);
167 }
168
169 int
170 LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event)
171 {
172         int which;
173
174         return LNetEQPoll(&eventq, 1, LNET_TIME_FOREVER,
175                          event, &which);
176 }
177
178 int
179 LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
180             lnet_event_t *event, int *which)
181 {
182         int              i;
183         int              rc;
184 #ifdef __KERNEL__
185         cfs_waitlink_t   wl;
186         cfs_time_t       now;
187 #else
188         struct timeval   then;
189         struct timeval   now;
190 # ifdef HAVE_LIBPTHREAD
191         struct timespec  ts;
192 # endif
193         lnet_ni_t       *eqwaitni = the_lnet.ln_eqwaitni;
194 #endif
195         ENTRY;
196
197         LASSERT (the_lnet.ln_init);
198         LASSERT (the_lnet.ln_refcount > 0);
199
200         if (neq < 1)
201                 RETURN(-ENOENT);
202
203         LNET_LOCK();
204
205         for (;;) {
206                 for (i = 0; i < neq; i++) {
207                         lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
208
209                         if (eq == NULL) {
210                                 LNET_UNLOCK();
211                                 RETURN(-ENOENT);
212                         }
213
214                         rc = lib_get_event (eq, event);
215                         if (rc != 0) {
216                                 LNET_UNLOCK();
217                                 *which = i;
218                                 RETURN(rc);
219                         }
220                 }
221
222 #ifdef __KERNEL__
223                 if (timeout_ms == 0) {
224                         LNET_UNLOCK();
225                         RETURN (0);
226                 }
227
228                 cfs_waitlink_init(&wl);
229                 set_current_state(TASK_INTERRUPTIBLE);
230                 cfs_waitq_add(&the_lnet.ln_waitq, &wl);
231
232                 LNET_UNLOCK();
233
234                 if (timeout_ms < 0) {
235                         cfs_waitq_wait (&wl, CFS_TASK_INTERRUPTIBLE);
236                 } else {
237                         struct timeval tv;
238
239                         now = cfs_time_current();
240                         cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE,
241                                             cfs_time_seconds(timeout_ms)/1000);
242                         cfs_duration_usec(cfs_time_sub(cfs_time_current(), now),
243                                           &tv);
244                         timeout_ms -= tv.tv_sec * 1000 + tv.tv_usec / 1000;
245                         if (timeout_ms < 0)
246                                 timeout_ms = 0;
247                 }
248
249                 LNET_LOCK();
250                 cfs_waitq_del(&the_lnet.ln_waitq, &wl);
251 #else
252                 if (eqwaitni != NULL) {
253                         /* I have a single NI that I have to call into, to get
254                          * events queued, or to block. */
255                         lnet_ni_addref_locked(eqwaitni);
256                         LNET_UNLOCK();
257
258                         if (timeout_ms <= 0) {
259                                 (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
260                         } else {
261                                 gettimeofday(&then, NULL);
262
263                                 (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
264
265                                 gettimeofday(&now, NULL);
266                                 timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
267                                               (now.tv_usec - then.tv_usec) / 1000;
268                                 if (timeout_ms < 0)
269                                         timeout_ms = 0;
270                         }
271
272                         LNET_LOCK();
273                         lnet_ni_decref_locked(eqwaitni);
274
275                         /* don't call into eqwaitni again if timeout has
276                          * expired */
277                         if (timeout_ms == 0)
278                                 eqwaitni = NULL;
279
280                         continue;               /* go back and check for events */
281                 }
282
283                 if (timeout_ms == 0) {
284                         LNET_UNLOCK();
285                         RETURN (0);
286                 }
287
288 # ifndef HAVE_LIBPTHREAD
289                 /* If I'm single-threaded, LNET fails at startup if it can't
290                  * set the_lnet.ln_eqwaitni correctly.  */
291                 LBUG();
292 # else
293                 if (timeout_ms < 0) {
294                         pthread_cond_wait(&the_lnet.ln_cond,
295                                           &the_lnet.ln_lock);
296                 } else {
297                         gettimeofday(&then, NULL);
298
299                         ts.tv_sec = then.tv_sec + timeout_ms/1000;
300                         ts.tv_nsec = then.tv_usec * 1000 +
301                                      (timeout_ms%1000) * 1000000;
302                         if (ts.tv_nsec >= 1000000000) {
303                                 ts.tv_sec++;
304                                 ts.tv_nsec -= 1000000000;
305                         }
306
307                         pthread_cond_timedwait(&the_lnet.ln_cond,
308                                                &the_lnet.ln_lock, &ts);
309
310                         gettimeofday(&now, NULL);
311                         timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
312                                       (now.tv_usec - then.tv_usec) / 1000;
313
314                         if (timeout_ms < 0)
315                                 timeout_ms = 0;
316                 }
317 # endif
318 #endif
319         }
320 }