From d277f2ae95d0d0580059561706face3accfe3618 Mon Sep 17 00:00:00 2001 From: Mr NeilBrown Date: Wed, 20 Nov 2019 12:11:57 +1100 Subject: [PATCH] LU-13005 lnet: remove the 'queue' from LNetEQ All calls to LNetEQAlloc pass a size of 0, so no queue is ever allocated. So remove the 'size' arg, and all code that depends on it being non-zero. Similarly remove eq_size, eq_deq_seq eq_enq_seq and eq_events as they are always 0/NULL. Signed-off-by: Mr NeilBrown Change-Id: Icb7bb352fa61cd6ea46847676e583e738eeeda8c Reviewed-on: https://review.whamcloud.com/36844 Tested-by: jenkins Reviewed-by: Chris Horn Tested-by: Maloo Reviewed-by: James Simmons Reviewed-by: Oleg Drokin --- lnet/include/lnet/api.h | 9 +++--- lnet/include/lnet/lib-types.h | 4 --- lnet/lnet/api-ni.c | 8 ++--- lnet/lnet/lib-eq.c | 74 ++++--------------------------------------- lnet/lnet/peer.c | 2 +- lnet/selftest/rpc.c | 2 +- lustre/ptlrpc/events.c | 2 +- 7 files changed, 17 insertions(+), 84 deletions(-) diff --git a/lnet/include/lnet/api.h b/lnet/include/lnet/api.h index 9ec8948..a36510b 100644 --- a/lnet/include/lnet/api.h +++ b/lnet/include/lnet/api.h @@ -136,9 +136,9 @@ int LNetMDUnlink(struct lnet_handle_md md_in); * local MDs. In particular, they signal the completion of a data transmission * into or out of a MD. They can also be used to hold acknowledgments for * completed PUT operations and indicate when a MD has been unlinked. Multiple - * MDs can share a single EQ. An EQ may have an optional event handler - * associated with it. If an event handler exists, it will be run for each - * event that is deposited into the EQ. + * MDs can share a single EQ. An EQ must have an event handler + * associated with it. It will be run for each event that is deposited into + * the EQ. * * In addition to the struct lnet_eq, the LNet API defines two types * associated with events: The ::lnet_event_kind defines the kinds of events @@ -150,8 +150,7 @@ int LNetMDUnlink(struct lnet_handle_md md_in); * releases these resources and frees the EQ. * @{ */ struct lnet_eq * -LNetEQAlloc(unsigned int count_in, - lnet_eq_handler_t handler); +LNetEQAlloc(lnet_eq_handler_t handler); int LNetEQFree(struct lnet_eq *eventq_in); diff --git a/lnet/include/lnet/lib-types.h b/lnet/include/lnet/lib-types.h index 17f0bf5..eee4b14 100644 --- a/lnet/include/lnet/lib-types.h +++ b/lnet/include/lnet/lib-types.h @@ -183,11 +183,7 @@ struct lnet_libhandle { ((type *)((char *)(ptr)-(char *)(&((type *)0)->member))) struct lnet_eq { - unsigned long eq_enq_seq; - unsigned long eq_deq_seq; - unsigned int eq_size; lnet_eq_handler_t eq_callback; - struct lnet_event *eq_events; int **eq_refs; /* percpt refcount for EQ */ }; diff --git a/lnet/lnet/api-ni.c b/lnet/lnet/api-ni.c index 7235dc7..9105eb1 100644 --- a/lnet/lnet/api-ni.c +++ b/lnet/lnet/api-ni.c @@ -1678,7 +1678,7 @@ lnet_ping_target_setup(struct lnet_ping_buffer **ppbuf, if (set_eq) { the_lnet.ln_ping_target_eq = - LNetEQAlloc(0, lnet_ping_target_event_handler); + LNetEQAlloc(lnet_ping_target_event_handler); if (IS_ERR(the_lnet.ln_ping_target_eq)) { rc = PTR_ERR(the_lnet.ln_ping_target_eq); CERROR("Can't allocate ping buffer EQ: %d\n", rc); @@ -1944,7 +1944,7 @@ static int lnet_push_target_init(void) return -EALREADY; the_lnet.ln_push_target_eq = - LNetEQAlloc(0, lnet_push_target_event_handler); + LNetEQAlloc(lnet_push_target_event_handler); if (IS_ERR(the_lnet.ln_push_target_eq)) { rc = PTR_ERR(the_lnet.ln_push_target_eq); CERROR("Can't allocated push target EQ: %d\n", rc); @@ -2645,7 +2645,7 @@ LNetNIInit(lnet_pid_t requested_pid) lnet_ping_target_update(pbuf, ping_mdh); - the_lnet.ln_mt_eq = LNetEQAlloc(0, lnet_mt_event_handler); + the_lnet.ln_mt_eq = LNetEQAlloc(lnet_mt_event_handler); if (IS_ERR(the_lnet.ln_mt_eq)) { rc = PTR_ERR(the_lnet.ln_mt_eq); CERROR("Can't allocate monitor thread EQ: %d\n", rc); @@ -4134,7 +4134,7 @@ static int lnet_ping(struct lnet_process_id id, signed long timeout, if (!pbuf) return -ENOMEM; - eq = LNetEQAlloc(0, lnet_ping_event_handler); + eq = LNetEQAlloc(lnet_ping_event_handler); if (IS_ERR(eq)) { rc = PTR_ERR(eq); CERROR("Can't allocate EQ: %d\n", rc); diff --git a/lnet/lnet/lib-eq.c b/lnet/lnet/lib-eq.c index b479a1f..261eba5 100644 --- a/lnet/lnet/lib-eq.c +++ b/lnet/lnet/lib-eq.c @@ -38,17 +38,10 @@ #include /** - * Create an event queue that has room for \a count number of events. + * Create an event queue that calls a @callback on each event. * - * Note that when EQ handler is specified in \a callback, no event loss - * can happen, since the handler is run for each event deposited into - * the EQ. - * - * \param count The number of events to be stored in the event queue. It - * will be rounded up to the next power of two. * \param callback A handler function that runs when an event is deposited - * into the EQ. The constant value LNET_EQ_HANDLER_NONE can be used to - * indicate that no event handler is desired. + * into the EQ. * * \retval eq On successful return, the newly created EQ is returned. * On failure, an error code encoded with ERR_PTR() is returned. @@ -58,46 +51,19 @@ * \see lnet_eq_handler_t for the discussion on EQ handler semantics. */ struct lnet_eq * -LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback) +LNetEQAlloc(lnet_eq_handler_t callback) { struct lnet_eq *eq; LASSERT(the_lnet.ln_refcount > 0); - /* We need count to be a power of 2 so that when eq_{enq,deq}_seq - * overflow, they don't skip entries, so the queue has the same - * apparent capacity at all times */ - - if (count) - count = roundup_pow_of_two(count); - - if (callback != LNET_EQ_HANDLER_NONE && count != 0) { - CWARN("EQ callback is guaranteed to get every event, " - "do you still want to set eqcount %d for polling " - "event which will have locking overhead? " - "Please contact with developer to confirm\n", count); - } - - /* count can be 0 if only need callback, we can eliminate - * overhead of enqueue event */ - if (count == 0 && callback == LNET_EQ_HANDLER_NONE) + if (callback == LNET_EQ_HANDLER_NONE) return ERR_PTR(-EINVAL); eq = lnet_eq_alloc(); if (eq == NULL) return ERR_PTR(-ENOMEM); - if (count != 0) { - LIBCFS_ALLOC(eq->eq_events, count * sizeof(*eq->eq_events)); - if (eq->eq_events == NULL) - goto failed; - /* NB allocator has set all event sequence numbers to 0, - * so all them should be earlier than eq_deq_seq */ - } - - eq->eq_deq_seq = 1; - eq->eq_enq_seq = 1; - eq->eq_size = count; eq->eq_callback = callback; eq->eq_refs = cfs_percpt_alloc(lnet_cpt_table(), @@ -108,9 +74,6 @@ LNetEQAlloc(unsigned int count, lnet_eq_handler_t callback) return eq; failed: - if (eq->eq_events != NULL) - LIBCFS_FREE(eq->eq_events, count * sizeof(*eq->eq_events)); - if (eq->eq_refs != NULL) cfs_percpt_free(eq->eq_refs); @@ -131,11 +94,9 @@ EXPORT_SYMBOL(LNetEQAlloc); int LNetEQFree(struct lnet_eq *eq) { - struct lnet_event *events = NULL; int **refs = NULL; int *ref; int rc = 0; - int size = 0; int i; lnet_res_lock(LNET_LOCK_EX); @@ -155,8 +116,6 @@ LNetEQFree(struct lnet_eq *eq) } /* stash for free after lock dropped */ - events = eq->eq_events; - size = eq->eq_size; refs = eq->eq_refs; lnet_eq_free(eq); @@ -164,8 +123,6 @@ LNetEQFree(struct lnet_eq *eq) lnet_eq_wait_unlock(); lnet_res_unlock(LNET_LOCK_EX); - if (events != NULL) - LIBCFS_FREE(events, size * sizeof(*events)); if (refs != NULL) cfs_percpt_free(refs); @@ -176,25 +133,6 @@ EXPORT_SYMBOL(LNetEQFree); void lnet_eq_enqueue_event(struct lnet_eq *eq, struct lnet_event *ev) { - /* MUST called with resource lock hold but w/o lnet_eq_wait_lock */ - int index; - - if (eq->eq_size == 0) { - LASSERT(eq->eq_callback != LNET_EQ_HANDLER_NONE); - eq->eq_callback(ev); - return; - } - - lnet_eq_wait_lock(); - ev->sequence = eq->eq_enq_seq++; - - LASSERT(eq->eq_size == LOWEST_BIT_SET(eq->eq_size)); - index = ev->sequence & (eq->eq_size - 1); - - eq->eq_events[index] = *ev; - - if (eq->eq_callback != LNET_EQ_HANDLER_NONE) - eq->eq_callback(ev); - - lnet_eq_wait_unlock(); + LASSERT(eq->eq_callback != LNET_EQ_HANDLER_NONE); + eq->eq_callback(ev); } diff --git a/lnet/lnet/peer.c b/lnet/lnet/peer.c index e4a6d49..5ea9206 100644 --- a/lnet/lnet/peer.c +++ b/lnet/lnet/peer.c @@ -3419,7 +3419,7 @@ int lnet_peer_discovery_start(void) if (the_lnet.ln_dc_state != LNET_DC_STATE_SHUTDOWN) return -EALREADY; - the_lnet.ln_dc_eq = LNetEQAlloc(0, lnet_discovery_event_handler); + the_lnet.ln_dc_eq = LNetEQAlloc(lnet_discovery_event_handler); if (IS_ERR(the_lnet.ln_dc_eq)) { rc = PTR_ERR(the_lnet.ln_dc_eq); CERROR("Can't allocate discovery EQ: %d\n", rc); diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index 3c7490a..5e7e539 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -1621,7 +1621,7 @@ srpc_startup (void) srpc_data.rpc_state = SRPC_STATE_NI_INIT; - srpc_data.rpc_lnet_eq = LNetEQAlloc(0, srpc_lnet_ev_handler); + srpc_data.rpc_lnet_eq = LNetEQAlloc(srpc_lnet_ev_handler); if (IS_ERR(srpc_data.rpc_lnet_eq)) { rc = PTR_ERR(srpc_data.rpc_lnet_eq); CERROR("LNetEQAlloc() has failed: %d\n", rc); diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index f7a4c88..fb4dfc4 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -603,7 +603,7 @@ int ptlrpc_ni_init(void) * because we are guaranteed to get every event via callback, * so we just set EQ size to 0 to avoid overhread of serializing * enqueue/dequeue operations in LNet. */ - ptlrpc_eq = LNetEQAlloc(0, ptlrpc_master_callback); + ptlrpc_eq = LNetEQAlloc(ptlrpc_master_callback); if (!IS_ERR(ptlrpc_eq)) return 0; -- 1.8.3.1