4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
34 * Library level Event queue management routines
37 #define DEBUG_SUBSYSTEM S_LNET
38 #include <lnet/lib-lnet.h>
41 * Create an event queue that has room for \a count number of events.
43 * Note that when EQ handler is specified in \a callback, no event loss
44 * can happen, since the handler is run for each event deposited into
47 * \param count The number of events to be stored in the event queue. It
48 * will be rounded up to the next power of two.
49 * \param callback A handler function that runs when an event is deposited
50 * into the EQ. The constant value LNET_EQ_HANDLER_NONE can be used to
51 * indicate that no event handler is desired.
53 * \retval eq On successful return, the newly created EQ is returned.
54 * On failure, an error code encoded with ERR_PTR() is returned.
55 * \retval -EINVAL If an parameter is not valid.
56 * \retval -ENOMEM If memory for the EQ can't be allocated.
58 * \see lnet_eq_handler_t for the discussion on EQ handler semantics.
61 LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback)
65 LASSERT(the_lnet.ln_refcount > 0);
67 /* We need count to be a power of 2 so that when eq_{enq,deq}_seq
68 * overflow, they don't skip entries, so the queue has the same
69 * apparent capacity at all times */
72 count = roundup_pow_of_two(count);
74 if (callback != LNET_EQ_HANDLER_NONE && count != 0) {
75 CWARN("EQ callback is guaranteed to get every event, "
76 "do you still want to set eqcount %d for polling "
77 "event which will have locking overhead? "
78 "Please contact with developer to confirm\n", count);
81 /* count can be 0 if only need callback, we can eliminate
82 * overhead of enqueue event */
83 if (count == 0 && callback == LNET_EQ_HANDLER_NONE)
84 return ERR_PTR(-EINVAL);
88 return ERR_PTR(-ENOMEM);
91 LIBCFS_ALLOC(eq->eq_events, count * sizeof(*eq->eq_events));
92 if (eq->eq_events == NULL)
94 /* NB allocator has set all event sequence numbers to 0,
95 * so all them should be earlier than eq_deq_seq */
101 eq->eq_callback = callback;
103 eq->eq_refs = cfs_percpt_alloc(lnet_cpt_table(),
104 sizeof(*eq->eq_refs[0]));
105 if (eq->eq_refs == NULL)
111 if (eq->eq_events != NULL)
112 LIBCFS_FREE(eq->eq_events, count * sizeof(*eq->eq_events));
114 if (eq->eq_refs != NULL)
115 cfs_percpt_free(eq->eq_refs);
118 return ERR_PTR(-ENOMEM);
120 EXPORT_SYMBOL(LNetEQAlloc);
123 * Release the resources associated with an event queue if it's idle;
124 * otherwise do nothing and it's up to the user to try again.
126 * \param eq The event queue to be released.
128 * \retval 0 If the EQ is not in use and freed.
129 * \retval -EBUSY If the EQ is still in use by some MDs.
132 LNetEQFree(struct lnet_eq *eq)
134 struct lnet_event *events = NULL;
141 lnet_res_lock(LNET_LOCK_EX);
142 /* NB: hold lnet_eq_wait_lock for EQ link/unlink, so we can do
143 * both EQ lookup and poll event with only lnet_eq_wait_lock */
146 cfs_percpt_for_each(ref, i, eq->eq_refs) {
151 CDEBUG(D_NET, "Event equeue (%d: %d) busy on destroy.\n",
157 /* stash for free after lock dropped */
158 events = eq->eq_events;
164 lnet_eq_wait_unlock();
165 lnet_res_unlock(LNET_LOCK_EX);
168 LIBCFS_FREE(events, size * sizeof(*events));
170 cfs_percpt_free(refs);
174 EXPORT_SYMBOL(LNetEQFree);
177 lnet_eq_enqueue_event(struct lnet_eq *eq, struct lnet_event *ev)
179 /* MUST called with resource lock hold but w/o lnet_eq_wait_lock */
182 if (eq->eq_size == 0) {
183 LASSERT(eq->eq_callback != LNET_EQ_HANDLER_NONE);
189 ev->sequence = eq->eq_enq_seq++;
191 LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
192 index = ev->sequence & (eq->eq_size - 1);
194 eq->eq_events[index] = *ev;
196 if (eq->eq_callback != LNET_EQ_HANDLER_NONE)
199 lnet_eq_wait_unlock();