Whamcloud - gitweb
Landing b_hd_newconfig on HEAD
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-eq.c
5  * Library level Event queue management routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LNET
26 #include <lnet/lib-lnet.h>
27
28 int
29 LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback, 
30             lnet_handle_eq_t *handle)
31 {
32         lnet_eq_t     *eq;
33
34         LASSERT (the_lnet.ln_init);
35         LASSERT (the_lnet.ln_refcount > 0);
36         
37         /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
38          * overflow, they don't skip entries, so the queue has the same
39          * apparant capacity at all times */
40
41         if (count != LOWEST_BIT_SET(count)) {   /* not a power of 2 already */
42                 do {                    /* knock off all but the top bit... */
43                         count &= ~LOWEST_BIT_SET (count);
44                 } while (count != LOWEST_BIT_SET(count));
45
46                 count <<= 1;                             /* ...and round up */
47         }
48
49         if (count == 0)        /* catch bad parameter / overflow on roundup */
50                 return (-EINVAL);
51         
52         eq = lnet_eq_alloc();
53         if (eq == NULL)
54                 return (-ENOMEM);
55
56         LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
57         if (eq->eq_events == NULL) {
58                 LNET_LOCK();
59                 lnet_eq_free (eq);
60                 LNET_UNLOCK();
61
62                 return -ENOMEM;
63         }
64
65         /* NB this resets all event sequence numbers to 0, to be earlier
66          * than eq_deq_seq */
67         memset(eq->eq_events, 0, count * sizeof(lnet_event_t));
68
69         eq->eq_deq_seq = 1;
70         eq->eq_enq_seq = 1;
71         eq->eq_size = count;
72         eq->eq_refcount = 0;
73         eq->eq_callback = callback;
74
75         LNET_LOCK();
76
77         lnet_initialise_handle (&eq->eq_lh, LNET_COOKIE_TYPE_EQ);
78         list_add (&eq->eq_list, &the_lnet.ln_active_eqs);
79
80         LNET_UNLOCK();
81
82         lnet_eq2handle(handle, eq);
83         return (0);
84 }
85
86 int
87 LNetEQFree(lnet_handle_eq_t eqh)
88 {
89         lnet_eq_t     *eq;
90         int            size;
91         lnet_event_t  *events;
92
93         LASSERT (the_lnet.ln_init);
94         LASSERT (the_lnet.ln_refcount > 0);
95         
96         LNET_LOCK();
97
98         eq = lnet_handle2eq(&eqh);
99         if (eq == NULL) {
100                 LNET_UNLOCK();
101                 return (-ENOENT);
102         }
103
104         if (eq->eq_refcount != 0) {
105                 LNET_UNLOCK();
106                 return (-EBUSY);
107         }
108
109         /* stash for free after lock dropped */
110         events  = eq->eq_events;
111         size    = eq->eq_size;
112
113         lnet_invalidate_handle (&eq->eq_lh);
114         list_del (&eq->eq_list);
115         lnet_eq_free (eq);
116
117         LNET_UNLOCK();
118
119         LIBCFS_FREE(events, size * sizeof (lnet_event_t));
120
121         return 0;
122 }
123
124 int
125 lib_get_event (lnet_eq_t *eq, lnet_event_t *ev)
126 {
127         int           new_index = eq->eq_deq_seq & (eq->eq_size - 1);
128         lnet_event_t *new_event = &eq->eq_events[new_index];
129         int           rc;
130         ENTRY;
131
132         CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
133                new_event, eq->eq_deq_seq, eq->eq_size);
134
135         if (LNET_SEQ_GT (eq->eq_deq_seq, new_event->sequence)) {
136                 RETURN(0);
137         }
138
139         /* We've got a new event... */
140         *ev = *new_event;
141
142         /* ...but did it overwrite an event we've not seen yet? */
143         if (eq->eq_deq_seq == new_event->sequence) {
144                 rc = 1;
145         } else {
146                 /* don't complain with CERROR: some EQs are sized small
147                  * anyway; if it's important, the caller should complain */
148                 CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n",
149                        eq->eq_deq_seq, new_event->sequence);
150                 rc = -EOVERFLOW;
151         }
152
153         eq->eq_deq_seq = new_event->sequence + 1;
154         RETURN(rc);
155 }
156
157
158 int
159 LNetEQGet (lnet_handle_eq_t eventq, lnet_event_t *event)
160 {
161         int which;
162
163         return LNetEQPoll(&eventq, 1, 0, 
164                          event, &which);
165 }
166
167 int
168 LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event)
169 {
170         int which;
171
172         return LNetEQPoll(&eventq, 1, LNET_TIME_FOREVER,
173                          event, &which);
174 }
175
176 int
177 LNetEQPoll (lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
178             lnet_event_t *event, int *which)
179 {
180         int              i;
181         int              rc;
182 #ifdef __KERNEL__
183         cfs_waitlink_t   wl;
184         cfs_time_t       now;
185 #else
186         struct timeval   then;
187         struct timeval   now;
188 # if HAVE_LIBPTHREAD
189         struct timespec  ts;
190 # endif
191         lnet_ni_t       *eqwaitni = the_lnet.ln_eqwaitni;
192 #endif
193         ENTRY;
194
195         LASSERT (the_lnet.ln_init);
196         LASSERT (the_lnet.ln_refcount > 0);
197
198         if (neq < 1)
199                 RETURN(-ENOENT);
200
201         LNET_LOCK();
202
203         for (;;) {
204                 for (i = 0; i < neq; i++) {
205                         lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
206
207                         if (eq == NULL) {
208                                 LNET_UNLOCK();
209                                 RETURN(-ENOENT);
210                         }
211
212                         rc = lib_get_event (eq, event);
213                         if (rc != 0) {
214                                 LNET_UNLOCK();
215                                 *which = i;
216                                 RETURN(rc);
217                         }
218                 }
219                 
220 #ifdef __KERNEL__
221                 if (timeout_ms == 0) {
222                         LNET_UNLOCK ();
223                         RETURN (0);
224                 }
225
226                 cfs_waitlink_init(&wl);
227                 set_current_state(TASK_INTERRUPTIBLE);
228                 cfs_waitq_add(&the_lnet.ln_waitq, &wl);
229
230                 LNET_UNLOCK();
231
232                 if (timeout_ms < 0) {
233                         cfs_waitq_wait (&wl, CFS_TASK_INTERRUPTIBLE);
234                 } else { 
235                         struct timeval tv;
236
237                         now = cfs_time_current();
238                         cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE,
239                                             cfs_time_seconds(timeout_ms)/1000);
240                         cfs_duration_usec(cfs_time_sub(cfs_time_current(), now), 
241                                           &tv); 
242                         timeout_ms -= tv.tv_sec * 1000 + tv.tv_usec / 1000;
243                         if (timeout_ms < 0)
244                                 timeout_ms = 0;
245                 }
246                 
247                 LNET_LOCK();
248                 cfs_waitq_del(&the_lnet.ln_waitq, &wl);
249 #else
250                 if (eqwaitni != NULL) {
251                         /* I have a single NI that I have to call into, to get
252                          * events queued, or to block. */
253                         lnet_ni_addref_locked(eqwaitni);
254                         LNET_UNLOCK();
255
256                         if (timeout_ms <= 0) {
257                                 (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
258                         } else {
259                                 gettimeofday(&then, NULL);
260
261                                 (eqwaitni->ni_lnd->lnd_wait)(eqwaitni, timeout_ms);
262                                 
263                                 gettimeofday(&now, NULL);
264                                 timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
265                                               (now.tv_usec - then.tv_usec) / 1000;
266                                 if (timeout_ms < 0)
267                                         timeout_ms = 0;
268                         }
269
270                         LNET_LOCK();
271                         lnet_ni_decref_locked(eqwaitni);
272
273                         /* don't call into eqwaitni again if timeout has
274                          * expired */
275                         if (timeout_ms == 0)
276                                 eqwaitni = NULL;
277
278                         continue;               /* go back and check for events */
279                 }
280
281                 if (timeout_ms == 0) {
282                         LNET_UNLOCK();
283                         RETURN (0);
284                 }
285
286 # if !HAVE_LIBPTHREAD
287                 /* If I'm single-threaded, LNET fails at startup if it can't
288                  * set the_lnet.ln_eqwaitni correctly.  */
289                 LBUG();
290 # else
291                 if (timeout_ms < 0) {
292                         pthread_cond_wait(&the_lnet.ln_cond, 
293                                           &the_lnet.ln_lock);
294                 } else {
295                         gettimeofday(&then, NULL);
296                         
297                         ts.tv_sec = then.tv_sec + timeout_ms/1000;
298                         ts.tv_nsec = then.tv_usec * 1000 + 
299                                      (timeout_ms%1000) * 1000000;
300                         if (ts.tv_nsec >= 1000000000) {
301                                 ts.tv_sec++;
302                                 ts.tv_nsec -= 1000000000;
303                         }
304                         
305                         pthread_cond_timedwait(&the_lnet.ln_cond,
306                                                &the_lnet.ln_lock, &ts);
307                         
308                         gettimeofday(&now, NULL);
309                         timeout_ms -= (now.tv_sec - then.tv_sec) * 1000 +
310                                       (now.tv_usec - then.tv_usec) / 1000;
311                         
312                         if (timeout_ms < 0)
313                                 timeout_ms = 0;
314                 }
315 # endif
316 #endif
317         }
318 }