Whamcloud - gitweb
LU-13005 lnet: eq: discard struct lnet_handle_eq
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/lib-eq.c
33  *
34  * Library level Event queue management routines
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38 #include <lnet/lib-lnet.h>
39
40 /**
41  * Create an event queue that has room for \a count number of events.
42  *
43  * The event queue is circular and older events will be overwritten by
44  * new ones if they are not removed in time by the user using the
45  * function LNetEQPoll().  It is up to the user to determine the
46  * appropriate size of the event queue to prevent this loss of events.
47  * Note that when EQ handler is specified in \a callback, no event loss
48  * can happen, since the handler is run for each event deposited into
49  * the EQ.
50  *
51  * \param count The number of events to be stored in the event queue. It
52  * will be rounded up to the next power of two.
53  * \param callback A handler function that runs when an event is deposited
54  * into the EQ. The constant value LNET_EQ_HANDLER_NONE can be used to
55  * indicate that no event handler is desired.
56  *
57  * \retval eq      On successful return, the newly created EQ is returned.
58  *                 On failure, an error code encoded with ERR_PTR() is returned.
59  * \retval -EINVAL If an parameter is not valid.
60  * \retval -ENOMEM If memory for the EQ can't be allocated.
61  *
62  * \see lnet_eq_handler_t for the discussion on EQ handler semantics.
63  */
64 struct lnet_eq *
65 LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback)
66 {
67         struct lnet_eq *eq;
68
69         LASSERT(the_lnet.ln_refcount > 0);
70
71         /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
72          * overflow, they don't skip entries, so the queue has the same
73          * apparent capacity at all times */
74
75         if (count)
76                 count = roundup_pow_of_two(count);
77
78         if (callback != LNET_EQ_HANDLER_NONE && count != 0) {
79                 CWARN("EQ callback is guaranteed to get every event, "
80                       "do you still want to set eqcount %d for polling "
81                       "event which will have locking overhead? "
82                       "Please contact with developer to confirm\n", count);
83         }
84
85         /* count can be 0 if only need callback, we can eliminate
86          * overhead of enqueue event */
87         if (count == 0 && callback == LNET_EQ_HANDLER_NONE)
88                 return ERR_PTR(-EINVAL);
89
90         eq = lnet_eq_alloc();
91         if (eq == NULL)
92                 return ERR_PTR(-ENOMEM);
93
94         if (count != 0) {
95                 LIBCFS_ALLOC(eq->eq_events, count * sizeof(*eq->eq_events));
96                 if (eq->eq_events == NULL)
97                         goto failed;
98                 /* NB allocator has set all event sequence numbers to 0,
99                  * so all them should be earlier than eq_deq_seq */
100         }
101
102         eq->eq_deq_seq = 1;
103         eq->eq_enq_seq = 1;
104         eq->eq_size = count;
105         eq->eq_callback = callback;
106
107         eq->eq_refs = cfs_percpt_alloc(lnet_cpt_table(),
108                                        sizeof(*eq->eq_refs[0]));
109         if (eq->eq_refs == NULL)
110                 goto failed;
111
112         return eq;
113
114 failed:
115         if (eq->eq_events != NULL)
116                 LIBCFS_FREE(eq->eq_events, count * sizeof(*eq->eq_events));
117
118         if (eq->eq_refs != NULL)
119                 cfs_percpt_free(eq->eq_refs);
120
121         lnet_eq_free(eq);
122         return ERR_PTR(-ENOMEM);
123 }
124 EXPORT_SYMBOL(LNetEQAlloc);
125
126 /**
127  * Release the resources associated with an event queue if it's idle;
128  * otherwise do nothing and it's up to the user to try again.
129  *
130  * \param eq The event queue to be released.
131  *
132  * \retval 0 If the EQ is not in use and freed.
133  * \retval -EBUSY  If the EQ is still in use by some MDs.
134  */
135 int
136 LNetEQFree(struct lnet_eq *eq)
137 {
138         struct lnet_event       *events = NULL;
139         int             **refs = NULL;
140         int             *ref;
141         int             rc = 0;
142         int             size = 0;
143         int             i;
144
145         lnet_res_lock(LNET_LOCK_EX);
146         /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
147          * both EQ lookup and poll event with only lnet_eq_wait_lock */
148         lnet_eq_wait_lock();
149
150         cfs_percpt_for_each(ref, i, eq->eq_refs) {
151                 LASSERT(*ref >= 0);
152                 if (*ref == 0)
153                         continue;
154
155                 CDEBUG(D_NET, "Event equeue (%d: %d) busy on destroy.\n",
156                        i, *ref);
157                 rc = -EBUSY;
158                 goto out;
159         }
160
161         /* stash for free after lock dropped */
162         events  = eq->eq_events;
163         size    = eq->eq_size;
164         refs    = eq->eq_refs;
165
166         lnet_eq_free(eq);
167  out:
168         lnet_eq_wait_unlock();
169         lnet_res_unlock(LNET_LOCK_EX);
170
171         if (events != NULL)
172                 LIBCFS_FREE(events, size * sizeof(*events));
173         if (refs != NULL)
174                 cfs_percpt_free(refs);
175
176         return rc;
177 }
178 EXPORT_SYMBOL(LNetEQFree);
179
180 void
181 lnet_eq_enqueue_event(struct lnet_eq *eq, struct lnet_event *ev)
182 {
183         /* MUST called with resource lock hold but w/o lnet_eq_wait_lock */
184         int index;
185
186         if (eq->eq_size == 0) {
187                 LASSERT(eq->eq_callback != LNET_EQ_HANDLER_NONE);
188                 eq->eq_callback(ev);
189                 return;
190         }
191
192         lnet_eq_wait_lock();
193         ev->sequence = eq->eq_enq_seq++;
194
195         LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
196         index = ev->sequence & (eq->eq_size - 1);
197
198         eq->eq_events[index] = *ev;
199
200         if (eq->eq_callback != LNET_EQ_HANDLER_NONE)
201                 eq->eq_callback(ev);
202
203         /* Wake anyone waiting in LNetEQPoll() */
204         if (waitqueue_active(&the_lnet.ln_eq_waitq))
205                 wake_up_all(&the_lnet.ln_eq_waitq);
206         lnet_eq_wait_unlock();
207 }
208
209 static int
210 lnet_eq_dequeue_event(struct lnet_eq *eq, struct lnet_event *ev)
211 {
212         int             new_index = eq->eq_deq_seq & (eq->eq_size - 1);
213         struct lnet_event       *new_event = &eq->eq_events[new_index];
214         int             rc;
215         ENTRY;
216
217         /* must called with lnet_eq_wait_lock hold */
218         if (LNET_SEQ_GT(eq->eq_deq_seq, new_event->sequence))
219                 RETURN(0);
220
221         /* We've got a new event... */
222         *ev = *new_event;
223
224         CDEBUG(D_INFO, "event: %p, sequence: %lu, eq->size: %u\n",
225                new_event, eq->eq_deq_seq, eq->eq_size);
226
227         /* ...but did it overwrite an event we've not seen yet? */
228         if (eq->eq_deq_seq == new_event->sequence) {
229                 rc = 1;
230         } else {
231                 /* don't complain with CERROR: some EQs are sized small
232                  * anyway; if it's important, the caller should complain */
233                 CDEBUG(D_NET, "Event Queue Overflow: eq seq %lu ev seq %lu\n",
234                        eq->eq_deq_seq, new_event->sequence);
235                 rc = -EOVERFLOW;
236         }
237
238         eq->eq_deq_seq = new_event->sequence + 1;
239         RETURN(rc);
240 }
241
242 static int
243 lnet_eq_wait_locked(signed long *timeout)
244 __must_hold(&the_lnet.ln_eq_wait_lock)
245 {
246         signed long tms = *timeout;
247         wait_queue_entry_t wl;
248         int wait;
249
250         if (tms == 0)
251                 return -ENXIO; /* don't want to wait and no new event */
252
253         init_waitqueue_entry(&wl, current);
254         add_wait_queue(&the_lnet.ln_eq_waitq, &wl);
255
256         lnet_eq_wait_unlock();
257
258         tms = schedule_timeout_interruptible(tms);
259         wait = tms != 0; /* might need to call here again */
260         *timeout = tms;
261
262         lnet_eq_wait_lock();
263         remove_wait_queue(&the_lnet.ln_eq_waitq, &wl);
264
265         return wait;
266 }
267
268 /**
269  * Block the calling process until there's an event from a set of EQs or
270  * timeout happens.
271  *
272  * If an event handler is associated with the EQ, the handler will run before
273  * this function returns successfully, in which case the corresponding event
274  * is consumed.
275  *
276  * LNetEQPoll() provides a timeout to allow applications to poll, block for a
277  * fixed period, or block indefinitely.
278  *
279  * \param eventqs,neq An array of lnet_eq, and size of the array.
280  * \param timeout Time in jiffies to wait for an event to occur on
281  * one of the EQs. The constant MAX_SCHEDULE_TIMEOUT can be used to indicate an
282  * infinite timeout.
283  * \param event,which On successful return (1 or -EOVERFLOW), \a event will
284  * hold the next event in the EQs, and \a which will contain the index of the
285  * EQ from which the event was taken.
286  *
287  * \retval 0          No pending event in the EQs after timeout.
288  * \retval 1          Indicates success.
289  * \retval -EOVERFLOW Indicates success (i.e., an event is returned) and that
290  * at least one event between this event and the last event obtained from the
291  * EQ indicated by \a which has been dropped due to limited space in the EQ.
292  * \retval -ENOENT    If there's an invalid handle in \a eventqs.
293  */
294 int
295 LNetEQPoll(struct lnet_eq **eventqs, int neq, signed long timeout,
296            struct lnet_event *event, int *which)
297 {
298         int     wait = 1;
299         int     rc;
300         int     i;
301         ENTRY;
302
303         LASSERT(the_lnet.ln_refcount > 0);
304
305         if (neq < 1)
306                 RETURN(-ENOENT);
307
308         lnet_eq_wait_lock();
309
310         for (;;) {
311                 for (i = 0; i < neq; i++) {
312                         struct lnet_eq *eq = eventqs[i];
313
314                         if (eq == NULL) {
315                                 lnet_eq_wait_unlock();
316                                 RETURN(-ENOENT);
317                         }
318
319                         rc = lnet_eq_dequeue_event(eq, event);
320                         if (rc != 0) {
321                                 lnet_eq_wait_unlock();
322                                 *which = i;
323                                 RETURN(rc);
324                         }
325                 }
326
327                 if (wait == 0)
328                         break;
329
330                 /*
331                  * return value of lnet_eq_wait_locked:
332                  * -1 : did nothing and it's sure no new event
333                  *  1 : sleep inside and wait until new event
334                  *  0 : don't want to wait anymore, but might have new event
335                  *      so need to call dequeue again
336                  */
337                 wait = lnet_eq_wait_locked(&timeout);
338                 if (wait < 0) /* no new event */
339                         break;
340         }
341
342         lnet_eq_wait_unlock();
343         RETURN(0);
344 }