* overflow, they don't skip entries, so the queue has the same
* apparent capacity at all times */
- if (count != LOWEST_BIT_SET(count)) { /* not a power of 2 already */
- do { /* knock off all but the top bit... */
- count &= ~LOWEST_BIT_SET (count);
- } while (count != LOWEST_BIT_SET(count));
+ count = cfs_power2_roundup(count);
- count <<= 1; /* ...and round up */
- }
-
- if (count == 0) /* catch bad parameter / overflow on roundup */
- return (-EINVAL);
-
- eq = lnet_eq_alloc();
- if (eq == NULL)
- return (-ENOMEM);
+ if (callback != LNET_EQ_HANDLER_NONE && count != 0) {
+ CWARN("EQ callback is guaranteed to get every event, "
+ "do you still want to set eqcount %d for polling "
+ "event which will have locking overhead? "
+ "Please contact with developer to confirm\n", count);
+ }
- LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
- if (eq->eq_events == NULL) {
- lnet_eq_free(eq);
+ /* count can be 0 if only need callback, we can eliminate
+ * overhead of enqueue event */
+ if (count == 0 && callback == LNET_EQ_HANDLER_NONE)
+ return -EINVAL;
- return -ENOMEM;
- }
+ eq = lnet_eq_alloc();
+ if (eq == NULL)
+ return -ENOMEM;
- /* NB this resets all event sequence numbers to 0, to be earlier
- * than eq_deq_seq */
- memset(eq->eq_events, 0, count * sizeof(lnet_event_t));
+ if (count != 0) {
+ LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
+ if (eq->eq_events == NULL) {
+ lnet_eq_free(eq);
+ return -ENOMEM;
+ }
+ /* NB allocator has set all event sequence numbers to 0,
+ * so all them should be earlier than eq_deq_seq */
+ }
eq->eq_deq_seq = 1;
eq->eq_enq_seq = 1;
LNET_UNLOCK();
- LIBCFS_FREE(events, size * sizeof (lnet_event_t));
+ if (events != NULL)
+ LIBCFS_FREE(events, size * sizeof(lnet_event_t));
- return 0;
+ return 0;
}
void
lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev)
{
- lnet_event_t *eq_slot;
+ /* MUST called with resource lock hold */
+ int index;
+
+ if (eq->eq_size == 0) {
+ LASSERT(eq->eq_callback != LNET_EQ_HANDLER_NONE);
+ eq->eq_callback(ev);
+ return;
+ }
- /* Allocate the next queue slot */
ev->sequence = eq->eq_enq_seq++;
- /* size must be a power of 2 to handle sequence # overflow */
- LASSERT(eq->eq_size != 0 &&
- eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
- eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
+ LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
+ index = ev->sequence & (eq->eq_size - 1);
- /* There is no race since both event consumers and event producers
- * take the LNET_LOCK, so we don't screw around with memory
- * barriers, setting the sequence number last or weird structure
- * layout assertions. */
- *eq_slot = *ev;
+ eq->eq_events[index] = *ev;
- /* Call the callback handler (if any) */
- if (eq->eq_callback != NULL)
- eq->eq_callback(eq_slot);
+ if (eq->eq_callback != LNET_EQ_HANDLER_NONE)
+ eq->eq_callback(ev);
#ifdef __KERNEL__
/* Wake anyone waiting in LNetEQPoll() */
/* CAVEAT EMPTOR: how we process portals events is _radically_
* different depending on... */
#ifdef __KERNEL__
- /* kernel portals calls our master callback when events are added to
- * the event queue. In fact lustre never pulls events off this queue,
- * so it's only sized for some debug history. */
- rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
+ /* kernel LNet calls our master callback when there are new event,
+ * because we are guaranteed to get every event via callback,
+ * so we just set EQ size to 0 to avoid overhread of serializing
+ * enqueue/dequeue operations in LNet. */
+ rc = LNetEQAlloc(0, ptlrpc_master_callback, &ptlrpc_eq_h);
#else
/* liblustre calls the master callback when it removes events from the
* event queue. The event queue has to be big enough not to drop