Whamcloud - gitweb
b479a1f513a0caa7e76034935afad6c5278661d1
[fs/lustre-release.git] / lnet / lnet / lib-eq.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/lib-eq.c
33  *
34  * Library level Event queue management routines
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38 #include <lnet/lib-lnet.h>
39
40 /**
41  * Create an event queue that has room for \a count number of events.
42  *
43  * Note that when EQ handler is specified in \a callback, no event loss
44  * can happen, since the handler is run for each event deposited into
45  * the EQ.
46  *
47  * \param count The number of events to be stored in the event queue. It
48  * will be rounded up to the next power of two.
49  * \param callback A handler function that runs when an event is deposited
50  * into the EQ. The constant value LNET_EQ_HANDLER_NONE can be used to
51  * indicate that no event handler is desired.
52  *
53  * \retval eq      On successful return, the newly created EQ is returned.
54  *                 On failure, an error code encoded with ERR_PTR() is returned.
55  * \retval -EINVAL If an parameter is not valid.
56  * \retval -ENOMEM If memory for the EQ can't be allocated.
57  *
58  * \see lnet_eq_handler_t for the discussion on EQ handler semantics.
59  */
60 struct lnet_eq *
61 LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback)
62 {
63         struct lnet_eq *eq;
64
65         LASSERT(the_lnet.ln_refcount > 0);
66
67         /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
68          * overflow, they don't skip entries, so the queue has the same
69          * apparent capacity at all times */
70
71         if (count)
72                 count = roundup_pow_of_two(count);
73
74         if (callback != LNET_EQ_HANDLER_NONE && count != 0) {
75                 CWARN("EQ callback is guaranteed to get every event, "
76                       "do you still want to set eqcount %d for polling "
77                       "event which will have locking overhead? "
78                       "Please contact with developer to confirm\n", count);
79         }
80
81         /* count can be 0 if only need callback, we can eliminate
82          * overhead of enqueue event */
83         if (count == 0 && callback == LNET_EQ_HANDLER_NONE)
84                 return ERR_PTR(-EINVAL);
85
86         eq = lnet_eq_alloc();
87         if (eq == NULL)
88                 return ERR_PTR(-ENOMEM);
89
90         if (count != 0) {
91                 LIBCFS_ALLOC(eq->eq_events, count * sizeof(*eq->eq_events));
92                 if (eq->eq_events == NULL)
93                         goto failed;
94                 /* NB allocator has set all event sequence numbers to 0,
95                  * so all them should be earlier than eq_deq_seq */
96         }
97
98         eq->eq_deq_seq = 1;
99         eq->eq_enq_seq = 1;
100         eq->eq_size = count;
101         eq->eq_callback = callback;
102
103         eq->eq_refs = cfs_percpt_alloc(lnet_cpt_table(),
104                                        sizeof(*eq->eq_refs[0]));
105         if (eq->eq_refs == NULL)
106                 goto failed;
107
108         return eq;
109
110 failed:
111         if (eq->eq_events != NULL)
112                 LIBCFS_FREE(eq->eq_events, count * sizeof(*eq->eq_events));
113
114         if (eq->eq_refs != NULL)
115                 cfs_percpt_free(eq->eq_refs);
116
117         lnet_eq_free(eq);
118         return ERR_PTR(-ENOMEM);
119 }
120 EXPORT_SYMBOL(LNetEQAlloc);
121
122 /**
123  * Release the resources associated with an event queue if it's idle;
124  * otherwise do nothing and it's up to the user to try again.
125  *
126  * \param eq The event queue to be released.
127  *
128  * \retval 0 If the EQ is not in use and freed.
129  * \retval -EBUSY  If the EQ is still in use by some MDs.
130  */
131 int
132 LNetEQFree(struct lnet_eq *eq)
133 {
134         struct lnet_event       *events = NULL;
135         int             **refs = NULL;
136         int             *ref;
137         int             rc = 0;
138         int             size = 0;
139         int             i;
140
141         lnet_res_lock(LNET_LOCK_EX);
142         /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
143          * both EQ lookup and poll event with only lnet_eq_wait_lock */
144         lnet_eq_wait_lock();
145
146         cfs_percpt_for_each(ref, i, eq->eq_refs) {
147                 LASSERT(*ref >= 0);
148                 if (*ref == 0)
149                         continue;
150
151                 CDEBUG(D_NET, "Event equeue (%d: %d) busy on destroy.\n",
152                        i, *ref);
153                 rc = -EBUSY;
154                 goto out;
155         }
156
157         /* stash for free after lock dropped */
158         events  = eq->eq_events;
159         size    = eq->eq_size;
160         refs    = eq->eq_refs;
161
162         lnet_eq_free(eq);
163  out:
164         lnet_eq_wait_unlock();
165         lnet_res_unlock(LNET_LOCK_EX);
166
167         if (events != NULL)
168                 LIBCFS_FREE(events, size * sizeof(*events));
169         if (refs != NULL)
170                 cfs_percpt_free(refs);
171
172         return rc;
173 }
174 EXPORT_SYMBOL(LNetEQFree);
175
176 void
177 lnet_eq_enqueue_event(struct lnet_eq *eq, struct lnet_event *ev)
178 {
179         /* MUST called with resource lock hold but w/o lnet_eq_wait_lock */
180         int index;
181
182         if (eq->eq_size == 0) {
183                 LASSERT(eq->eq_callback != LNET_EQ_HANDLER_NONE);
184                 eq->eq_callback(ev);
185                 return;
186         }
187
188         lnet_eq_wait_lock();
189         ev->sequence = eq->eq_enq_seq++;
190
191         LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
192         index = ev->sequence & (eq->eq_size - 1);
193
194         eq->eq_events[index] = *ev;
195
196         if (eq->eq_callback != LNET_EQ_HANDLER_NONE)
197                 eq->eq_callback(ev);
198
199         lnet_eq_wait_unlock();
200 }