From dd5aa640781d8c6dcbbba46135e8b28b532a3840 Mon Sep 17 00:00:00 2001 From: Alexey Lyashkov Date: Wed, 6 Apr 2022 08:13:46 +0300 Subject: [PATCH] LU-15718 lnet: improve lnet_selftest speed lets replace a global spinlock with atomic variables, to avoid cpu power limit in testing. Test-Parameters: trivial testlist=lnet-selftest HPE-bug-id: LUS-10812 Signed-off-by: Alexey Lyashkov Change-Id: Ib12a3b3fe2fe300354e5a7c502bc6f5165e7d05c Reviewed-on: https://review.whamcloud.com/47002 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Cyril Bordage Reviewed-by: Chris Horn Reviewed-by: Oleg Drokin --- lnet/selftest/rpc.c | 98 +++++++++++++++++++++++++++--------------------- lnet/selftest/selftest.h | 1 - 2 files changed, 55 insertions(+), 44 deletions(-) diff --git a/lnet/selftest/rpc.c b/lnet/selftest/rpc.c index d5a2153..b85e045 100644 --- a/lnet/selftest/rpc.c +++ b/lnet/selftest/rpc.c @@ -49,15 +49,41 @@ enum srpc_state { SRPC_STATE_STOPPING, }; +enum rpc_counter_32 { + SRPC_ERROR, + SRPC_RPC_SENT, + SRPC_RPC_RCVD, + SRPC_RPC_DROP, + SRPC_RPC_EXPIRED, + SRPC_COUNTER32_MAX, +}; + +enum rpc_counter_64 { + SRPC_BULK_GET, + SRPC_BULK_PUT, + SRPC_COUNTER64_MAX, +}; + static struct smoketest_rpc { spinlock_t rpc_glock; /* global lock */ struct srpc_service *rpc_services[SRPC_SERVICE_MAX_ID + 1]; lnet_handler_t rpc_lnet_handler;/* _the_ LNet event handler */ enum srpc_state rpc_state; - struct srpc_counters rpc_counters; - __u64 rpc_matchbits; /* matchbits counter */ + atomic_t rpc_counters32[SRPC_COUNTER32_MAX]; + atomic64_t rpc_counters64[SRPC_COUNTER64_MAX]; + atomic64_t rpc_matchbits; /* matchbits counter */ + } srpc_data; +#define RPC_STAT32(a) \ + srpc_data.rpc_counters32[(a)] + +#define GET_RPC_STAT32(a) \ + atomic_read(&srpc_data.rpc_counters32[(a)]) + +#define GET_RPC_STAT64(a) \ + atomic64_read(&srpc_data.rpc_counters64[(a)]) + static inline int srpc_serv_portal(int svc_id) { @@ -68,18 +94,17 @@ srpc_serv_portal(int svc_id) /* forward ref's */ static int srpc_handle_rpc(struct swi_workitem *wi); -void srpc_get_counters(struct srpc_counters *cnt) -{ - spin_lock(&srpc_data.rpc_glock); - *cnt = srpc_data.rpc_counters; - spin_unlock(&srpc_data.rpc_glock); -} -void srpc_set_counters(const struct srpc_counters *cnt) +void srpc_get_counters(struct srpc_counters *cnt) { - spin_lock(&srpc_data.rpc_glock); - srpc_data.rpc_counters = *cnt; - spin_unlock(&srpc_data.rpc_glock); + cnt->errors = GET_RPC_STAT32(SRPC_ERROR); + cnt->rpcs_sent = GET_RPC_STAT32(SRPC_RPC_SENT); + cnt->rpcs_rcvd = GET_RPC_STAT32(SRPC_RPC_RCVD); + cnt->rpcs_dropped = GET_RPC_STAT32(SRPC_RPC_DROP); + cnt->rpcs_expired = GET_RPC_STAT32(SRPC_RPC_EXPIRED); + + cnt->bulk_get = GET_RPC_STAT64(SRPC_BULK_GET); + cnt->bulk_put = GET_RPC_STAT64(SRPC_BULK_PUT); } static int @@ -160,12 +185,7 @@ srpc_alloc_bulk(int cpt, unsigned bulk_off, unsigned bulk_npg, static inline __u64 srpc_next_id (void) { - __u64 id; - - spin_lock(&srpc_data.rpc_glock); - id = srpc_data.rpc_matchbits++; - spin_unlock(&srpc_data.rpc_glock); - return id; + return atomic64_inc_return(&srpc_data.rpc_matchbits); } static void @@ -915,11 +935,8 @@ srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status) rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), swi_state2str(rpc->srpc_wi.swi_state), status); - if (status != 0) { - spin_lock(&srpc_data.rpc_glock); - srpc_data.rpc_counters.rpcs_dropped++; - spin_unlock(&srpc_data.rpc_glock); - } + if (status != 0) + atomic_inc(&RPC_STAT32(SRPC_RPC_DROP)); if (rpc->srpc_done != NULL) (*rpc->srpc_done) (rpc); @@ -1092,9 +1109,7 @@ srpc_client_rpc_expired (void *data) spin_unlock(&rpc->crpc_lock); - spin_lock(&srpc_data.rpc_glock); - srpc_data.rpc_counters.rpcs_expired++; - spin_unlock(&srpc_data.rpc_glock); + atomic_inc(&RPC_STAT32(SRPC_RPC_EXPIRED)); } static void @@ -1427,11 +1442,11 @@ srpc_lnet_ev_handler(struct lnet_event *ev) if (ev->status != 0) { __u32 errors; - spin_lock(&srpc_data.rpc_glock); + if (ev->status != -ECANCELED) /* cancellation is not error */ - srpc_data.rpc_counters.errors++; - errors = srpc_data.rpc_counters.errors; - spin_unlock(&srpc_data.rpc_glock); + errors = atomic_inc_return(&RPC_STAT32(SRPC_ERROR)); + else + errors = atomic_read(&RPC_STAT32(SRPC_ERROR)); CNETERR("LNet event status %d type %d, RPC errors %u\n", ev->status, ev->type, errors); @@ -1446,11 +1461,9 @@ srpc_lnet_ev_handler(struct lnet_event *ev) LBUG(); fallthrough; case SRPC_REQUEST_SENT: - if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { - spin_lock(&srpc_data.rpc_glock); - srpc_data.rpc_counters.rpcs_sent++; - spin_unlock(&srpc_data.rpc_glock); - } + if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) + atomic_inc(&RPC_STAT32(SRPC_RPC_SENT)); + fallthrough; case SRPC_REPLY_RCVD: case SRPC_BULK_REQ_RCVD: @@ -1562,9 +1575,7 @@ srpc_lnet_ev_handler(struct lnet_event *ev) spin_unlock(&scd->scd_lock); - spin_lock(&srpc_data.rpc_glock); - srpc_data.rpc_counters.rpcs_rcvd++; - spin_unlock(&srpc_data.rpc_glock); + atomic_inc(&RPC_STAT32(SRPC_RPC_RCVD)); break; case SRPC_BULK_GET_RPLD: @@ -1577,14 +1588,14 @@ srpc_lnet_ev_handler(struct lnet_event *ev) fallthrough; case SRPC_BULK_PUT_SENT: if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { - spin_lock(&srpc_data.rpc_glock); + atomic64_t *data; if (rpcev->ev_type == SRPC_BULK_GET_RPLD) - srpc_data.rpc_counters.bulk_get += ev->mlength; + data = &srpc_data.rpc_counters64[SRPC_BULK_GET]; else - srpc_data.rpc_counters.bulk_put += ev->mlength; + data = &srpc_data.rpc_counters64[SRPC_BULK_PUT]; - spin_unlock(&srpc_data.rpc_glock); + atomic64_add(ev->mlength, data); } fallthrough; case SRPC_REPLY_SENT: @@ -1616,7 +1627,8 @@ srpc_startup (void) /* 1 second pause to avoid timestamp reuse */ schedule_timeout_uninterruptible(cfs_time_seconds(1)); - srpc_data.rpc_matchbits = ((__u64) ktime_get_real_seconds()) << 48; + atomic64_set(&srpc_data.rpc_matchbits, + ((__u64)ktime_get_real_seconds() << 48)); srpc_data.rpc_state = SRPC_STATE_NONE; diff --git a/lnet/selftest/selftest.h b/lnet/selftest/selftest.h index 27126be..f19685d 100644 --- a/lnet/selftest/selftest.h +++ b/lnet/selftest/selftest.h @@ -445,7 +445,6 @@ int srpc_finish_service(struct srpc_service *sv); int srpc_service_add_buffers(struct srpc_service *sv, int nbuffer); void srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer); void srpc_get_counters(struct srpc_counters *cnt); -void srpc_set_counters(const struct srpc_counters *cnt); extern struct cfs_wi_sched *lst_sched_serial; extern struct cfs_wi_sched **lst_sched_test; -- 1.8.3.1