Whamcloud - gitweb
1e5011c38e2ecc92b24856ece9f96352ead71d8a
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/lnet/lib-eq.c
35  *
36  * Library level Event queue management routines
37  */
38
39 #define DEBUG_SUBSYSTEM S_LNET
40 #include <lnet/lib-lnet.h>
41
42 /**
43  * Create an event queue that has room for \a count number of events.
44  *
45  * The event queue is circular and older events will be overwritten by new
46  * ones if they are not removed in time by the user using the functions
47  * LNetEQGet(), LNetEQWait(), or LNetEQPoll(). It is up to the user to
48  * determine the appropriate size of the event queue to prevent this loss
49  * of events. Note that when EQ handler is specified in \a callback, no
50  * event loss can happen, since the handler is run for each event deposited
51  * into the EQ.
52  *
53  * \param count The number of events to be stored in the event queue. It
54  * will be rounded up to the next power of two.
55  * \param callback A handler function that runs when an event is deposited
56  * into the EQ. The constant value LNET_EQ_HANDLER_NONE can be used to
57  * indicate that no event handler is desired.
58  * \param handle On successful return, this location will hold a handle for
59  * the newly created EQ.
60  *
61  * \retval 0       On success.
62  * \retval -EINVAL If an parameter is not valid.
63  * \retval -ENOMEM If memory for the EQ can't be allocated.
64  *
65  * \see lnet_eq_handler_t for the discussion on EQ handler semantics.
66  */
67 int
68 LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
69             lnet_handle_eq_t *handle)
70 {
71         lnet_eq_t     *eq;
72
73         LASSERT (the_lnet.ln_init);
74         LASSERT (the_lnet.ln_refcount > 0);
75
76         /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
77          * overflow, they don't skip entries, so the queue has the same
78          * apparent capacity at all times */
79
80         count = cfs_power2_roundup(count);
81
82         if (callback != LNET_EQ_HANDLER_NONE && count != 0) {
83                 CWARN("EQ callback is guaranteed to get every event, "
84                       "do you still want to set eqcount %d for polling "
85                       "event which will have locking overhead? "
86                       "Please contact with developer to confirm\n", count);
87         }
88
89         /* count can be 0 if only need callback, we can eliminate
90          * overhead of enqueue event */
91         if (count == 0 && callback == LNET_EQ_HANDLER_NONE)
92                 return -EINVAL;
93
94         eq = lnet_eq_alloc();
95         if (eq == NULL)
96                 return -ENOMEM;
97
98         if (count != 0) {
99                 LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
100                 if (eq->eq_events == NULL)
101                         goto failed;
102                 /* NB allocator has set all event sequence numbers to 0,
103                  * so all them should be earlier than eq_deq_seq */
104         }
105
106         eq->eq_deq_seq = 1;
107         eq->eq_enq_seq = 1;
108         eq->eq_size = count;
109         eq->eq_callback = callback;
110
111         eq->eq_refs = cfs_percpt_alloc(lnet_cpt_table(),
112                                        sizeof(*eq->eq_refs[0]));
113         if (eq->eq_refs == NULL)
114                 goto failed;
115
116         /* MUST hold both exclusive lnet_res_lock */
117         lnet_res_lock(LNET_LOCK_EX);
118         /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
119          * both EQ lookup and poll event with only lnet_eq_wait_lock */
120         lnet_eq_wait_lock();
121
122         lnet_res_lh_initialize(&the_lnet.ln_eq_container, &eq->eq_lh);
123         cfs_list_add(&eq->eq_list, &the_lnet.ln_eq_container.rec_active);
124
125         lnet_eq_wait_unlock();
126         lnet_res_unlock(LNET_LOCK_EX);
127
128         lnet_eq2handle(handle, eq);
129         return 0;
130
131 failed:
132         if (eq->eq_events != NULL)
133                 LIBCFS_FREE(eq->eq_events, count * sizeof(lnet_event_t));
134
135         if (eq->eq_refs != NULL)
136                 cfs_percpt_free(eq->eq_refs);
137
138         lnet_eq_free(eq);
139         return -ENOMEM;
140 }
141
142 /**
143  * Release the resources associated with an event queue if it's idle;
144  * otherwise do nothing and it's up to the user to try again.
145  *
146  * \param eqh A handle for the event queue to be released.
147  *
148  * \retval 0 If the EQ is not in use and freed.
149  * \retval -ENOENT If \a eqh does not point to a valid EQ.
150  * \retval -EBUSY  If the EQ is still in use by some MDs.
151  */
152 int
153 LNetEQFree(lnet_handle_eq_t eqh)
154 {
155         struct lnet_eq  *eq;
156         lnet_event_t    *events = NULL;
157         int             **refs = NULL;
158         int             *ref;
159         int             rc = 0;
160         int             size = 0;
161         int             i;
162
163         LASSERT(the_lnet.ln_init);
164         LASSERT(the_lnet.ln_refcount > 0);
165
166         lnet_res_lock(LNET_LOCK_EX);
167         /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
168          * both EQ lookup and poll event with only lnet_eq_wait_lock */
169         lnet_eq_wait_lock();
170
171         eq = lnet_handle2eq(&eqh);
172         if (eq == NULL) {
173                 rc = -ENOENT;
174                 goto out;
175         }
176
177         cfs_percpt_for_each(ref, i, eq->eq_refs) {
178                 LASSERT(*ref >= 0);
179                 if (*ref == 0)
180                         continue;
181
182                 CDEBUG(D_NET, "Event equeue (%d: %d) busy on destroy.\n",
183                        i, *ref);
184                 rc = -EBUSY;
185                 goto out;
186         }
187
188         /* stash for free after lock dropped */
189         events  = eq->eq_events;
190         size    = eq->eq_size;
191         refs    = eq->eq_refs;
192
193         lnet_res_lh_invalidate(&eq->eq_lh);
194         cfs_list_del(&eq->eq_list);
195         lnet_eq_free_locked(eq);
196  out:
197         lnet_eq_wait_unlock();
198         lnet_res_unlock(LNET_LOCK_EX);
199
200         if (events != NULL)
201                 LIBCFS_FREE(events, size * sizeof(lnet_event_t));
202         if (refs != NULL)
203                 cfs_percpt_free(refs);
204
205         return rc;
206 }
207
208 void
209 lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev)
210 {
211         /* MUST called with resource lock hold but w/o lnet_eq_wait_lock */
212         int index;
213
214         if (eq->eq_size == 0) {
215                 LASSERT(eq->eq_callback != LNET_EQ_HANDLER_NONE);
216                 eq->eq_callback(ev);
217                 return;
218         }
219
220         lnet_eq_wait_lock();
221         ev->sequence = eq->eq_enq_seq++;
222
223         LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
224         index = ev->sequence & (eq->eq_size - 1);
225
226         eq->eq_events[index] = *ev;
227
228         if (eq->eq_callback != LNET_EQ_HANDLER_NONE)
229                 eq->eq_callback(ev);
230
231 #ifdef __KERNEL__
232         /* Wake anyone waiting in LNetEQPoll() */
233         if (cfs_waitq_active(&the_lnet.ln_eq_waitq))
234                 cfs_waitq_broadcast(&the_lnet.ln_eq_waitq);
235 #else
236 # ifndef HAVE_LIBPTHREAD
237         /* LNetEQPoll() calls into _the_ LND to wait for action */
238 # else
239         /* Wake anyone waiting in LNetEQPoll() */
240         pthread_cond_broadcast(&the_lnet.ln_eq_cond);
241 # endif
242 #endif
243         lnet_eq_wait_unlock();
244 }
245
246 int
247 lnet_eq_dequeue_event(lnet_eq_t *eq, lnet_event_t *ev)
248 {
249         int             new_index = eq->eq_deq_seq & (eq->eq_size - 1);
250         lnet_event_t    *new_event = &eq->eq_events[new_index];
251         int             rc;
252         ENTRY;
253
254         /* must called with lnet_eq_wait_lock hold */
255         if (LNET_SEQ_GT(eq->eq_deq_seq, new_event->sequence))
256                 RETURN(0);
257
258         /* We've got a new event... */
259         *ev = *new_event;
260
261         CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
262                new_event, eq->eq_deq_seq, eq->eq_size);
263
264         /* ...but did it overwrite an event we've not seen yet? */
265         if (eq->eq_deq_seq == new_event->sequence) {
266                 rc = 1;
267         } else {
268                 /* don't complain with CERROR: some EQs are sized small
269                  * anyway; if it's important, the caller should complain */
270                 CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n",
271                        eq->eq_deq_seq, new_event->sequence);
272                 rc = -EOVERFLOW;
273         }
274
275         eq->eq_deq_seq = new_event->sequence + 1;
276         RETURN(rc);
277 }
278
279 /**
280  * A nonblocking function that can be used to get the next event in an EQ.
281  * If an event handler is associated with the EQ, the handler will run before
282  * this function returns successfully. The event is removed from the queue.
283  *
284  * \param eventq A handle for the event queue.
285  * \param event On successful return (1 or -EOVERFLOW), this location will
286  * hold the next event in the EQ.
287  *
288  * \retval 0          No pending event in the EQ.
289  * \retval 1          Indicates success.
290  * \retval -ENOENT    If \a eventq does not point to a valid EQ.
291  * \retval -EOVERFLOW Indicates success (i.e., an event is returned) and that
292  * at least one event between this event and the last event obtained from the
293  * EQ has been dropped due to limited space in the EQ.
294  */
295 int
296 LNetEQGet (lnet_handle_eq_t eventq, lnet_event_t *event)
297 {
298         int which;
299
300         return LNetEQPoll(&eventq, 1, 0,
301                          event, &which);
302 }
303
304 /**
305  * Block the calling process until there is an event in the EQ.
306  * If an event handler is associated with the EQ, the handler will run before
307  * this function returns successfully. This function returns the next event
308  * in the EQ and removes it from the EQ.
309  *
310  * \param eventq A handle for the event queue.
311  * \param event On successful return (1 or -EOVERFLOW), this location will
312  * hold the next event in the EQ.
313  *
314  * \retval 1          Indicates success.
315  * \retval -ENOENT    If \a eventq does not point to a valid EQ.
316  * \retval -EOVERFLOW Indicates success (i.e., an event is returned) and that
317  * at least one event between this event and the last event obtained from the
318  * EQ has been dropped due to limited space in the EQ.
319  */
320 int
321 LNetEQWait (lnet_handle_eq_t eventq, lnet_event_t *event)
322 {
323         int which;
324
325         return LNetEQPoll(&eventq, 1, LNET_TIME_FOREVER,
326                          event, &which);
327 }
328
329 #ifdef __KERNEL__
330
331 static int
332 lnet_eq_wait_locked(int *timeout_ms)
333 {
334         int             tms = *timeout_ms;
335         int             wait;
336         cfs_waitlink_t  wl;
337         cfs_time_t      now;
338
339         if (tms == 0)
340                 return -1; /* don't want to wait and no new event */
341
342         cfs_waitlink_init(&wl);
343         cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
344         cfs_waitq_add(&the_lnet.ln_eq_waitq, &wl);
345
346         lnet_eq_wait_unlock();
347
348         if (tms < 0) {
349                 cfs_waitq_wait(&wl, CFS_TASK_INTERRUPTIBLE);
350
351         } else {
352                 struct timeval tv;
353
354                 now = cfs_time_current();
355                 cfs_waitq_timedwait(&wl, CFS_TASK_INTERRUPTIBLE,
356                                     cfs_time_seconds(tms) / 1000);
357                 cfs_duration_usec(cfs_time_sub(cfs_time_current(), now), &tv);
358                 tms -= (int)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
359                 if (tms < 0) /* no more wait but may have new event */
360                         tms = 0;
361         }
362
363         wait = tms != 0; /* might need to call here again */
364         *timeout_ms = tms;
365
366         lnet_eq_wait_lock();
367         cfs_waitq_del(&the_lnet.ln_eq_waitq, &wl);
368
369         return wait;
370 }
371
372 #else /* !__KERNEL__ */
373
374 # ifdef HAVE_LIBPTHREAD
375 static void
376 lnet_eq_cond_wait(struct timespec *ts)
377 {
378         if (ts == NULL) {
379                 pthread_cond_wait(&the_lnet.ln_eq_cond,
380                                   &the_lnet.ln_eq_wait_lock);
381         } else {
382                 pthread_cond_timedwait(&the_lnet.ln_eq_cond,
383                                        &the_lnet.ln_eq_wait_lock, ts);
384         }
385 }
386 # endif
387
388 static int
389 lnet_eq_wait_locked(int *timeout_ms)
390 {
391         lnet_ni_t       *eq_waitni = NULL;
392         int             tms = *timeout_ms;
393         int             wait;
394         struct timeval  then;
395         struct timeval  now;
396
397         if (the_lnet.ln_eq_waitni != NULL) {
398                 /* I have a single NI that I have to call into, to get
399                  * events queued, or to block. */
400                 lnet_eq_wait_unlock();
401
402                 lnet_net_lock(0);
403                 eq_waitni = the_lnet.ln_eq_waitni;
404                 if (unlikely(eq_waitni == NULL)) {
405                         lnet_net_unlock(0);
406
407                         lnet_eq_wait_lock();
408                         return -1;
409                 }
410
411                 lnet_ni_addref_locked(eq_waitni, 0);
412                 lnet_net_unlock(0);
413
414                 if (tms <= 0) { /* even for tms == 0 */
415                         (eq_waitni->ni_lnd->lnd_wait)(eq_waitni, tms);
416
417                 } else {
418                         gettimeofday(&then, NULL);
419
420                         (eq_waitni->ni_lnd->lnd_wait)(eq_waitni, tms);
421
422                         gettimeofday(&now, NULL);
423                         tms -= (now.tv_sec - then.tv_sec) * 1000 +
424                                (now.tv_usec - then.tv_usec) / 1000;
425                         if (tms < 0)
426                                 tms = 0;
427                 }
428
429                 lnet_ni_decref(eq_waitni);
430                 lnet_eq_wait_lock();
431         } else { /* w/o eq_waitni */
432 # ifndef HAVE_LIBPTHREAD
433                 /* If I'm single-threaded, LNET fails at startup if it can't
434                  * set the_lnet.ln_eqwaitni correctly.  */
435                 LBUG();
436 # else /* HAVE_LIBPTHREAD */
437                 struct timespec  ts;
438
439                 if (tms == 0) /* don't want to wait and new event */
440                         return -1;
441
442                 if (tms < 0) {
443                         lnet_eq_cond_wait(NULL);
444
445                 } else {
446
447                         gettimeofday(&then, NULL);
448
449                         ts.tv_sec = then.tv_sec + tms / 1000;
450                         ts.tv_nsec = then.tv_usec * 1000 +
451                                      (tms % 1000) * 1000000;
452                         if (ts.tv_nsec >= 1000000000) {
453                                 ts.tv_sec++;
454                                 ts.tv_nsec -= 1000000000;
455                         }
456
457                         lnet_eq_cond_wait(&ts);
458
459                         gettimeofday(&now, NULL);
460                         tms -= (now.tv_sec - then.tv_sec) * 1000 +
461                                (now.tv_usec - then.tv_usec) / 1000;
462                         if (tms < 0)
463                                 tms = 0;
464                 }
465 # endif /* HAVE_LIBPTHREAD */
466         }
467
468         wait = tms != 0;
469         *timeout_ms = tms;
470
471         return wait;
472 }
473
474 #endif /* __KERNEL__ */
475
476
477 /**
478  * Block the calling process until there's an event from a set of EQs or
479  * timeout happens.
480  *
481  * If an event handler is associated with the EQ, the handler will run before
482  * this function returns successfully, in which case the corresponding event
483  * is consumed.
484  *
485  * LNetEQPoll() provides a timeout to allow applications to poll, block for a
486  * fixed period, or block indefinitely.
487  *
488  * \param eventqs,neq An array of EQ handles, and size of the array.
489  * \param timeout_ms Time in milliseconds to wait for an event to occur on
490  * one of the EQs. The constant LNET_TIME_FOREVER can be used to indicate an
491  * infinite timeout.
492  * \param event,which On successful return (1 or -EOVERFLOW), \a event will
493  * hold the next event in the EQs, and \a which will contain the index of the
494  * EQ from which the event was taken.
495  *
496  * \retval 0          No pending event in the EQs after timeout.
497  * \retval 1          Indicates success.
498  * \retval -EOVERFLOW Indicates success (i.e., an event is returned) and that
499  * at least one event between this event and the last event obtained from the
500  * EQ indicated by \a which has been dropped due to limited space in the EQ.
501  * \retval -ENOENT    If there's an invalid handle in \a eventqs.
502  */
503 int
504 LNetEQPoll(lnet_handle_eq_t *eventqs, int neq, int timeout_ms,
505            lnet_event_t *event, int *which)
506 {
507         int     wait = 1;
508         int     rc;
509         int     i;
510         ENTRY;
511
512         LASSERT (the_lnet.ln_init);
513         LASSERT (the_lnet.ln_refcount > 0);
514
515         if (neq < 1)
516                 RETURN(-ENOENT);
517
518         lnet_eq_wait_lock();
519
520         for (;;) {
521 #ifndef __KERNEL__
522                 lnet_eq_wait_unlock();
523
524                 /* Recursion breaker */
525                 if (the_lnet.ln_rc_state == LNET_RC_STATE_RUNNING &&
526                     !LNetHandleIsEqual(eventqs[0], the_lnet.ln_rc_eqh))
527                         lnet_router_checker();
528
529                 lnet_eq_wait_lock();
530 #endif
531                 for (i = 0; i < neq; i++) {
532                         lnet_eq_t *eq = lnet_handle2eq(&eventqs[i]);
533
534                         if (eq == NULL) {
535                                 lnet_eq_wait_unlock();
536                                 RETURN(-ENOENT);
537                         }
538
539                         rc = lnet_eq_dequeue_event(eq, event);
540                         if (rc != 0) {
541                                 lnet_eq_wait_unlock();
542                                 *which = i;
543                                 RETURN(rc);
544                         }
545                 }
546
547                 if (wait == 0)
548                         break;
549
550                 /*
551                  * return value of lnet_eq_wait_locked:
552                  * -1 : did nothing and it's sure no new event
553                  *  1 : sleep inside and wait until new event
554                  *  0 : don't want to wait anymore, but might have new event
555                  *      so need to call dequeue again
556                  */
557                 wait = lnet_eq_wait_locked(&timeout_ms);
558                 if (wait < 0) /* no new event */
559                         break;
560         }
561
562         lnet_eq_wait_unlock();
563         RETURN(0);
564 }