From bb6edb7b8eeec65f46f8eaeb135e5dde13bf7ad8 Mon Sep 17 00:00:00 2001 From: NeilBrown Date: Tue, 27 Aug 2019 12:13:50 -0400 Subject: [PATCH] LU-4801 ldlm: discard l_lock from struct ldlm_lock. This spinlock (l_lock) is only used to stablise the l_resource pointer while taking a spinlock on the resource. This is not necessary - it is sufficient to take the resource spinlock, and then check if l_resource has changed or not. If it hasn't then it cannot change until the resource spinlock is dropped. We must ensure this is safe even if the resource is freed before lock_res_and_lock() managed to get the lock. To do this we use call_rcu() to free the resource. This means that if we find a resource after taking the RCU read lock, then it is always safe to take and then drop the spinlock. After taking the spinlock, we can check if it is more generally safe to use. Discarding l_lock shrinks 'struct ldlm_lock' which helps save memory. Note that it would be cleaner to use SLAB_TYPESAFE_BY_RCU instead of call_rcu(), but this is not correctly supported on all platforms that lustre supports. To simplify a transition to SLAB_TYPESAFE_BY_RCU once that is usable, an 'init_once' function is used to initialize the spinlock as would be needed in that case. Signed-off-by: Mr NeilBrown Change-Id: Ief5af0b3bc9bed0cd32673e4d62ce51c83846418 Reviewed-on: https://review.whamcloud.com/39811 Tested-by: jenkins Tested-by: Maloo Reviewed-by: James Simmons Reviewed-by: Andreas Dilger Reviewed-by: Oleg Drokin --- lustre/include/lustre_dlm.h | 17 +++++++++-------- lustre/ldlm/l_lock.c | 24 ++++++++++++----------- lustre/ldlm/ldlm_lock.c | 46 +++++++++++++++++++++------------------------ lustre/ldlm/ldlm_lockd.c | 1 + lustre/ldlm/ldlm_resource.c | 10 +++++++++- 5 files changed, 53 insertions(+), 45 deletions(-) diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 63f9bd6..ea5efc9 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -782,13 +782,10 @@ struct ldlm_lock { */ struct portals_handle l_handle; /** - * Internal spinlock protects l_resource. We should hold this lock - * first before taking res_lock. - */ - spinlock_t l_lock; - /** * Pointer to actual resource this lock is in. - * ldlm_lock_change_resource() can change this. + * ldlm_lock_change_resource() can change this on the client. + * When this is possible, rcu must be used to stablise + * the resource while we lock and check it hasn't been changed. */ struct ldlm_resource *l_resource; /** @@ -1060,9 +1057,13 @@ struct ldlm_resource { /** * List item for list in namespace hash. - * protected by ns_lock + * protected by ns_lock. + * Shared with linkage for RCU-delayed free. */ - struct hlist_node lr_hash; + union { + struct hlist_node lr_hash; + struct rcu_head lr_rcu; + }; /** Reference count for this resource */ atomic_t lr_refcount; diff --git a/lustre/ldlm/l_lock.c b/lustre/ldlm/l_lock.c index a4f7c85..6d37af1 100644 --- a/lustre/ldlm/l_lock.c +++ b/lustre/ldlm/l_lock.c @@ -41,19 +41,24 @@ * * LDLM locking uses resource to serialize access to locks * but there is a case when we change resource of lock upon - * enqueue reply. We rely on lock->l_resource = new_res + * enqueue reply. We rely on rcu_assign_pointer(lock->l_resource, new_res) * being an atomic operation. */ struct ldlm_resource *lock_res_and_lock(struct ldlm_lock *lock) { - /* on server-side resource of lock doesn't change */ - if (!ldlm_is_ns_srv(lock)) - spin_lock(&lock->l_lock); + struct ldlm_resource *res; - lock_res(lock->l_resource); - - ldlm_set_res_locked(lock); - return lock->l_resource; + rcu_read_lock(); + while (1) { + res = rcu_dereference(lock->l_resource); + lock_res(res); + if (res == lock->l_resource) { + ldlm_set_res_locked(lock); + rcu_read_unlock(); + return res; + } + unlock_res(res); + } } EXPORT_SYMBOL(lock_res_and_lock); @@ -62,11 +67,8 @@ EXPORT_SYMBOL(lock_res_and_lock); */ void unlock_res_and_lock(struct ldlm_lock *lock) { - /* on server-side resource of lock doesn't change */ ldlm_clear_res_locked(lock); unlock_res(lock->l_resource); - if (!ldlm_is_ns_srv(lock)) - spin_unlock(&lock->l_lock); } EXPORT_SYMBOL(unlock_res_and_lock); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 8fbd76f..48906c5 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -469,8 +469,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) if (lock == NULL) RETURN(NULL); - spin_lock_init(&lock->l_lock); - lock->l_resource = resource; + RCU_INIT_POINTER(lock->l_resource, resource); lu_ref_add(&resource->lr_reference, "lock", lock); refcount_set(&lock->l_handle.h_ref, 2); @@ -487,24 +486,24 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource) INIT_HLIST_NODE(&lock->l_exp_hash); INIT_HLIST_NODE(&lock->l_exp_flock_hash); - lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats, - LDLM_NSS_LOCKS); + lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats, + LDLM_NSS_LOCKS); INIT_HLIST_NODE(&lock->l_handle.h_link); class_handle_hash(&lock->l_handle, lock_handle_owner); - lu_ref_init(&lock->l_reference); - lu_ref_add(&lock->l_reference, "hash", lock); + lu_ref_init(&lock->l_reference); + lu_ref_add(&lock->l_reference, "hash", lock); lock->l_callback_timestamp = 0; lock->l_activity = 0; #if LUSTRE_TRACKS_LOCK_EXP_REFS INIT_LIST_HEAD(&lock->l_exp_refs_link); - lock->l_exp_refs_nr = 0; - lock->l_exp_refs_target = NULL; + lock->l_exp_refs_nr = 0; + lock->l_exp_refs_target = NULL; #endif INIT_LIST_HEAD(&lock->l_exp_list); - RETURN(lock); + RETURN(lock); } /** @@ -544,12 +543,13 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, lu_ref_add(&newres->lr_reference, "lock", lock); /* - * To flip the lock from the old to the new resource, lock, oldres and - * newres have to be locked. Resource spin-locks are nested within - * lock->l_lock, and are taken in the memory address order to avoid - * dead-locks. + * To flip the lock from the old to the new resource, oldres + * and newres have to be locked. Resource spin-locks are taken + * in the memory address order to avoid dead-locks. + * As this is the only circumstance where ->l_resource + * can change, and this cannot race with itself, it is safe + * to access lock->l_resource without being careful about locking. */ - spin_lock(&lock->l_lock); oldres = lock->l_resource; if (oldres < newres) { lock_res(oldres); @@ -560,9 +560,9 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock, } LASSERT(memcmp(new_resid, &oldres->lr_name, sizeof oldres->lr_name) != 0); - lock->l_resource = newres; + rcu_assign_pointer(lock->l_resource, newres); unlock_res(oldres); - unlock_res_and_lock(lock); + unlock_res(newres); /* ...and the flowers are still standing! */ lu_ref_del(&oldres->lr_reference, "lock", lock); @@ -2764,15 +2764,11 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, struct va_format vaf; char *nid = "local"; - /* on server-side resource of lock doesn't change */ - if ((lock->l_flags & LDLM_FL_NS_SRV) != 0) { - if (lock->l_resource != NULL) - resource = ldlm_resource_getref(lock->l_resource); - } else if (spin_trylock(&lock->l_lock)) { - if (lock->l_resource != NULL) - resource = ldlm_resource_getref(lock->l_resource); - spin_unlock(&lock->l_lock); - } + rcu_read_lock(); + resource = rcu_dereference(lock->l_resource); + if (resource && !atomic_inc_not_zero(&resource->lr_refcount)) + resource = NULL; + rcu_read_unlock(); va_start(args, fmt); vaf.fmt = fmt; diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 63d345d..4378d1c 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -3463,6 +3463,7 @@ void ldlm_exit(void) { if (ldlm_refcount) CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount); + synchronize_rcu(); kmem_cache_destroy(ldlm_resource_slab); /* * ldlm_lock_put() use RCU to call ldlm_lock_free, so need call diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 21cc6c4..19c50d5 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -1453,6 +1453,14 @@ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type) return res; } +static void __ldlm_resource_free(struct rcu_head *head) +{ + struct ldlm_resource *res = container_of(head, struct ldlm_resource, + lr_rcu); + + OBD_SLAB_FREE_PTR(res, ldlm_resource_slab); +} + static void ldlm_resource_free(struct ldlm_resource *res) { if (res->lr_type == LDLM_EXTENT) { @@ -1464,7 +1472,7 @@ static void ldlm_resource_free(struct ldlm_resource *res) OBD_FREE_PTR(res->lr_ibits_queues); } - OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res); + call_rcu(&res->lr_rcu, __ldlm_resource_free); } /** -- 1.8.3.1