Whamcloud - gitweb
LU-56 lnet: allow to create EQ with zero eq_size
authorLiang Zhen <liang@whamcloud.com>
Mon, 28 May 2012 03:16:49 +0000 (11:16 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 11 Jun 2012 13:12:11 +0000 (09:12 -0400)
Current LNet doesn't allow to create EQ with zero eq_size, it's
an unnecessary limit if the EQ has callback, because EQ callback
is guaranteed to get every event. EQ with no-zero eq_size is only
useful for LNetEQPoll, there is no Lustre use-case that is using
both EQ callback and LNetEQPoll on a same EQ.

Also, even with upcoming LNet SMP improvements, there has to be
a lock to protect enqueue/dequeue operations of event, which might
hurt performance because of lock contention.

This patch will allow user to create EQ with eq_size being zero,
which means events will only be delivered by callback and user
will get nothing by calling LNetEQPoll.

In multiple thread environment, we would suggest user to create EQ
only with callback (eq_size=0) if the EQ might get high rate events,
LNetEQPoll can still work fine if eq_size is non-zero and nothing
will be changed at all. User is still able to create EQ with both
callback and non-zero eq_size, and call LNetEQPoll to get event,
although it's deprecated.

Summary: still can support all current use-cases, although set
eq_size to zero and only rely on callback will be the better way.

Signed-off-by: Liang Zhen <liang@whamcloud.com>
Change-Id: Ia984f2f65eb5fd064a36dfd7b399e46378013c57
Reviewed-on: http://review.whamcloud.com/2925
Reviewed-by: Doug Oucharek <doug@whamcloud.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Bobi Jam <bobijam@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lnet/lnet/lib-eq.c
lnet/lnet/router.c
lnet/selftest/rpc.c
lustre/ptlrpc/events.c

index 87317ac..09fbdef 100644 (file)
@@ -77,31 +77,33 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback,
          * overflow, they don't skip entries, so the queue has the same
          * apparent capacity at all times */
 
-        if (count != LOWEST_BIT_SET(count)) {   /* not a power of 2 already */
-                do {                    /* knock off all but the top bit... */
-                        count &= ~LOWEST_BIT_SET (count);
-                } while (count != LOWEST_BIT_SET(count));
+       count = cfs_power2_roundup(count);
 
-                count <<= 1;                             /* ...and round up */
-        }
-
-        if (count == 0)        /* catch bad parameter / overflow on roundup */
-                return (-EINVAL);
-
-        eq = lnet_eq_alloc();
-        if (eq == NULL)
-                return (-ENOMEM);
+       if (callback != LNET_EQ_HANDLER_NONE && count != 0) {
+               CWARN("EQ callback is guaranteed to get every event, "
+                     "do you still want to set eqcount %d for polling "
+                     "event which will have locking overhead? "
+                     "Please contact with developer to confirm\n", count);
+       }
 
-        LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
-        if (eq->eq_events == NULL) {
-               lnet_eq_free(eq);
+       /* count can be 0 if only need callback, we can eliminate
+        * overhead of enqueue event */
+       if (count == 0 && callback == LNET_EQ_HANDLER_NONE)
+               return -EINVAL;
 
-                return -ENOMEM;
-        }
+       eq = lnet_eq_alloc();
+       if (eq == NULL)
+               return -ENOMEM;
 
-        /* NB this resets all event sequence numbers to 0, to be earlier
-         * than eq_deq_seq */
-        memset(eq->eq_events, 0, count * sizeof(lnet_event_t));
+       if (count != 0) {
+               LIBCFS_ALLOC(eq->eq_events, count * sizeof(lnet_event_t));
+               if (eq->eq_events == NULL) {
+                       lnet_eq_free(eq);
+                       return -ENOMEM;
+               }
+               /* NB allocator has set all event sequence numbers to 0,
+                * so all them should be earlier than eq_deq_seq */
+       }
 
         eq->eq_deq_seq = 1;
         eq->eq_enq_seq = 1;
@@ -165,33 +167,33 @@ LNetEQFree(lnet_handle_eq_t eqh)
 
         LNET_UNLOCK();
 
-        LIBCFS_FREE(events, size * sizeof (lnet_event_t));
+       if (events != NULL)
+               LIBCFS_FREE(events, size * sizeof(lnet_event_t));
 
-        return 0;
+       return 0;
 }
 
 void
 lnet_eq_enqueue_event(lnet_eq_t *eq, lnet_event_t *ev)
 {
-       lnet_event_t    *eq_slot;
+       /* MUST called with resource lock hold */
+       int index;
+
+       if (eq->eq_size == 0) {
+               LASSERT(eq->eq_callback != LNET_EQ_HANDLER_NONE);
+               eq->eq_callback(ev);
+               return;
+       }
 
-       /* Allocate the next queue slot */
        ev->sequence = eq->eq_enq_seq++;
 
-       /* size must be a power of 2 to handle sequence # overflow */
-       LASSERT(eq->eq_size != 0 &&
-               eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
-       eq_slot = eq->eq_events + (ev->sequence & (eq->eq_size - 1));
+       LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size));
+       index = ev->sequence & (eq->eq_size - 1);
 
-       /* There is no race since both event consumers and event producers
-        * take the LNET_LOCK, so we don't screw around with memory
-        * barriers, setting the sequence number last or weird structure
-        * layout assertions. */
-       *eq_slot = *ev;
+       eq->eq_events[index] = *ev;
 
-       /* Call the callback handler (if any) */
-       if (eq->eq_callback != NULL)
-               eq->eq_callback(eq_slot);
+       if (eq->eq_callback != LNET_EQ_HANDLER_NONE)
+               eq->eq_callback(ev);
 
 #ifdef __KERNEL__
        /* Wake anyone waiting in LNetEQPoll() */
index a4c9ba4..c08ae39 100644 (file)
@@ -1001,7 +1001,7 @@ lnet_router_checker_start(void)
         cfs_sema_init(&the_lnet.ln_rc_signal, 0);
         /* EQ size doesn't matter; the callback is guaranteed to get every
          * event */
-        eqsz = 1;
+       eqsz = 0;
         rc = LNetEQAlloc(eqsz, lnet_router_checker_event,
                          &the_lnet.ln_rc_eqh);
 #else
index 71b1e6d..8413f48 100644 (file)
@@ -1468,7 +1468,7 @@ srpc_startup (void)
 
         LNetInvalidateHandle(&srpc_data.rpc_lnet_eq);
 #ifdef __KERNEL__
-        rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
+       rc = LNetEQAlloc(0, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
 #else
         rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &srpc_data.rpc_lnet_eq);
 #endif
index c060665..ab6da13 100644 (file)
@@ -549,10 +549,11 @@ int ptlrpc_ni_init(void)
         /* CAVEAT EMPTOR: how we process portals events is _radically_
          * different depending on... */
 #ifdef __KERNEL__
-        /* kernel portals calls our master callback when events are added to
-         * the event queue.  In fact lustre never pulls events off this queue,
-         * so it's only sized for some debug history. */
-        rc = LNetEQAlloc(1024, ptlrpc_master_callback, &ptlrpc_eq_h);
+       /* kernel LNet calls our master callback when there are new event,
+        * because we are guaranteed to get every event via callback,
+        * so we just set EQ size to 0 to avoid overhread of serializing
+        * enqueue/dequeue operations in LNet. */
+       rc = LNetEQAlloc(0, ptlrpc_master_callback, &ptlrpc_eq_h);
 #else
         /* liblustre calls the master callback when it removes events from the
          * event queue.  The event queue has to be big enough not to drop