From 42f377db4a24cefa7a041fcd3106dd58771eb319 Mon Sep 17 00:00:00 2001 From: Andriy Skulysh Date: Fri, 5 Nov 2021 12:55:08 +0200 Subject: [PATCH] LU-15472 ldlm: optimize flock reprocess Resource reprocess on flock unlock can be done once after all pending unlock requests. It allows to reduce spinlock contention. Change-Id: I2809070f27fe3af7e1fc34e2b4b22603931f3dff HPE-bug-id: LUS-10471, LUS-10909 Signed-off-by: Andriy Skulysh Reviewed-by: Alexander Boyko Reviewed-by: Vitaly Fertman Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/46257 Tested-by: Maloo Tested-by: jenkins Reviewed-by: Vitaly Fertman Reviewed-by: Alexander Reviewed-by: Oleg Drokin --- lustre/include/lustre_dlm.h | 6 ++++++ lustre/ldlm/ldlm_flock.c | 15 +++++++++------ lustre/ldlm/ldlm_lock.c | 21 ++++++++++++++++++++- lustre/ldlm/ldlm_resource.c | 11 +++++++++++ 4 files changed, 46 insertions(+), 7 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 0b0fd66..ff211a5 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -714,6 +714,11 @@ struct ldlm_ibits_node { struct ldlm_lock *lock; }; +struct ldlm_flock_node { + atomic_t lfn_unlock_pending; + bool lfn_needs_reprocess; +}; + /** Whether to track references to exports by LDLM locks. */ #define LUSTRE_TRACKS_LOCK_EXP_REFS (0) @@ -1090,6 +1095,7 @@ struct ldlm_resource { */ struct ldlm_interval_tree *lr_itree; struct ldlm_ibits_queues *lr_ibits_queues; + struct ldlm_flock_node lr_flock_node; }; union { diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 745c1ea..84e56af 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -295,7 +295,6 @@ ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, enum ldlm_mode mode = req->l_req_mode; int local = ns_is_client(ns); int added = (mode == LCK_NL); - int overlaps = 0; int splitted = 0; const struct ldlm_callback_suite null_cbs = { NULL }; #ifdef HAVE_SERVER_SUPPORT @@ -497,7 +496,7 @@ reprocess: lock->l_policy_data.l_flock.start) break; - ++overlaps; + res->lr_flock_node.lfn_needs_reprocess = true; if (new->l_policy_data.l_flock.start <= lock->l_policy_data.l_flock.start) { @@ -606,21 +605,25 @@ reprocess: * but only once because 'intention' won't be * LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue. */ - if ((mode == LCK_NL) && overlaps) { + struct ldlm_flock_node *fn = &res->lr_flock_node; +restart: + if (mode == LCK_NL && fn->lfn_needs_reprocess && + atomic_read(&fn->lfn_unlock_pending) == 0) { LIST_HEAD(rpc_list); int rc; -restart: ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, LDLM_PROCESS_RESCAN, 0); - + fn->lfn_needs_reprocess = false; unlock_res_and_lock(req); rc = ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST); lock_res_and_lock(req); - if (rc == -ERESTART) + if (rc == -ERESTART) { + fn->lfn_needs_reprocess = true; GOTO(restart, rc); + } } } else { LASSERT(req->l_completion_ast); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 2830412..b1b00d3 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1839,8 +1839,27 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env, RETURN(rc); } } + + if (!local && lock->l_resource->lr_type == LDLM_FLOCK) { + struct ldlm_flock_node *fn = &lock->l_resource->lr_flock_node; + if (lock->l_req_mode == LCK_NL) { + atomic_inc(&fn->lfn_unlock_pending); + res = lock_res_and_lock(lock); + atomic_dec(&fn->lfn_unlock_pending); + } else { + res = lock_res_and_lock(lock); + + while (atomic_read(&fn->lfn_unlock_pending)) { + unlock_res_and_lock(lock); + cond_resched(); + lock_res_and_lock(lock); + } + } + } else #endif - res = lock_res_and_lock(lock); + { + res = lock_res_and_lock(lock); + } if (local && ldlm_is_granted(lock)) { /* The server returned a blocked lock, but it was granted * before we got a chance to actually enqueue it. We don't diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index d73ef51..762f104 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -1412,6 +1412,14 @@ static bool ldlm_resource_inodebits_new(struct ldlm_resource *res) return true; } +static bool ldlm_resource_flock_new(struct ldlm_resource *res) +{ + res->lr_flock_node.lfn_needs_reprocess = false; + atomic_set(&res->lr_flock_node.lfn_unlock_pending, 0); + + return true; +} + /** Create and initialize new resource. */ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type) { @@ -1429,6 +1437,9 @@ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type) case LDLM_IBITS: rc = ldlm_resource_inodebits_new(res); break; + case LDLM_FLOCK: + rc = ldlm_resource_flock_new(res); + break; default: rc = true; break; -- 1.8.3.1