Whamcloud - gitweb
LU-4801 ldlm: discard l_lock from struct ldlm_lock. 83/35483/19
authorNeilBrown <neilb@suse.com>
Tue, 27 Aug 2019 16:13:50 +0000 (12:13 -0400)
committerOleg Drokin <green@whamcloud.com>
Tue, 14 Apr 2020 08:11:26 +0000 (08:11 +0000)
This spinlock (l_lock) is only used to stablise the l_resource
pointer while taking a spinlock on the resource.

This is not necessary - it is sufficient to take the resource
spinlock, and then check if l_resource has changed or not.  If it
hasn't then it cannot change until the resource spinlock is dropped.

We must ensure this is safe even if the resource is freed before
lock_res_and_lock() managed to get the lock.  To do this we mark the
slab as SLAB_TYPESAFE_BY_RCU and initialise the lock in an
init_once() function, but not on every allocate (and specifically
don't zero the whole struct on each allocation).
This means that if we find a resource after taking the RCU read lock,
then it is always safe to take and then drop the spinlock.
After taking the spinlock, we can check if it is more generally safe
to use.

Discarding l_lock shrinks 'struct ldlm_lock' which helps save memory.

Change-Id: I2646f198ca60bdbd2e94922bf7679fab31f45c41
Signed-off-by: NeilBrown <neilb@suse.com>
Reviewed-on: https://review.whamcloud.com/35483
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Shaun Tancheff <shaun.tancheff@hpe.com>
Reviewed-by: Neil Brown <neilb@suse.de>
Reviewed-by: Petros Koutoupis <petros.koutoupis@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
libcfs/include/libcfs/linux/linux-mem.h
lustre/include/lustre_dlm.h
lustre/ldlm/l_lock.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_resource.c

index b822173..e8550f0 100644 (file)
 # include <linux/mm_inline.h>
 #endif
 
+#ifndef SLAB_TYPESAFE_BY_RCU
+#define SLAB_TYPESAFE_BY_RCU SLAB_DESTROY_BY_RCU
+#endif
+
 /*
  * Shrinker
  */
index 4f69be7..876caf9 100644 (file)
@@ -736,15 +736,12 @@ struct ldlm_lock {
         */
        struct portals_handle   l_handle;
        /**
-        * Internal spinlock protects l_resource.  We should hold this lock
-        * first before taking res_lock.
-        */
-       spinlock_t              l_lock;
-       /**
         * Pointer to actual resource this lock is in.
-        * ldlm_lock_change_resource() can change this.
+        * ldlm_lock_change_resource() can change this on the client.
+        * When this is possible, rcu must be used to stablise
+        * the resource while we lock and check it hasn't been changed.
         */
-       struct ldlm_resource    *l_resource;
+       struct ldlm_resource __rcu *l_resource;
        /**
         * List item for client side LRU list.
         * Protected by ns_lock in struct ldlm_namespace.
index a4f7c85..6d37af1 100644 (file)
  *
  * LDLM locking uses resource to serialize access to locks
  * but there is a case when we change resource of lock upon
- * enqueue reply. We rely on lock->l_resource = new_res
+ * enqueue reply. We rely on rcu_assign_pointer(lock->l_resource, new_res)
  * being an atomic operation.
  */
 struct ldlm_resource *lock_res_and_lock(struct ldlm_lock *lock)
 {
-       /* on server-side resource of lock doesn't change */
-       if (!ldlm_is_ns_srv(lock))
-               spin_lock(&lock->l_lock);
+       struct ldlm_resource *res;
 
-       lock_res(lock->l_resource);
-
-       ldlm_set_res_locked(lock);
-       return lock->l_resource;
+       rcu_read_lock();
+       while (1) {
+               res = rcu_dereference(lock->l_resource);
+               lock_res(res);
+               if (res == lock->l_resource) {
+                       ldlm_set_res_locked(lock);
+                       rcu_read_unlock();
+                       return res;
+               }
+               unlock_res(res);
+       }
 }
 EXPORT_SYMBOL(lock_res_and_lock);
 
@@ -62,11 +67,8 @@ EXPORT_SYMBOL(lock_res_and_lock);
  */
 void unlock_res_and_lock(struct ldlm_lock *lock)
 {
-       /* on server-side resource of lock doesn't change */
        ldlm_clear_res_locked(lock);
 
        unlock_res(lock->l_resource);
-       if (!ldlm_is_ns_srv(lock))
-               spin_unlock(&lock->l_lock);
 }
 EXPORT_SYMBOL(unlock_res_and_lock);
index 1496808..7e424fe 100644 (file)
@@ -469,8 +469,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
        if (lock == NULL)
                RETURN(NULL);
 
-       spin_lock_init(&lock->l_lock);
-       lock->l_resource = resource;
+       RCU_INIT_POINTER(lock->l_resource, resource);
        lu_ref_add(&resource->lr_reference, "lock", lock);
 
        refcount_set(&lock->l_handle.h_ref, 2);
@@ -487,24 +486,24 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
        INIT_HLIST_NODE(&lock->l_exp_hash);
        INIT_HLIST_NODE(&lock->l_exp_flock_hash);
 
-        lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
-                             LDLM_NSS_LOCKS);
+       lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
+                            LDLM_NSS_LOCKS);
        INIT_HLIST_NODE(&lock->l_handle.h_link);
        class_handle_hash(&lock->l_handle, lock_handle_owner);
 
-        lu_ref_init(&lock->l_reference);
-        lu_ref_add(&lock->l_reference, "hash", lock);
-        lock->l_callback_timeout = 0;
+       lu_ref_init(&lock->l_reference);
+       lu_ref_add(&lock->l_reference, "hash", lock);
+       lock->l_callback_timeout = 0;
        lock->l_activity = 0;
 
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
        INIT_LIST_HEAD(&lock->l_exp_refs_link);
-        lock->l_exp_refs_nr = 0;
-        lock->l_exp_refs_target = NULL;
+       lock->l_exp_refs_nr = 0;
+       lock->l_exp_refs_target = NULL;
 #endif
        INIT_LIST_HEAD(&lock->l_exp_list);
 
-        RETURN(lock);
+       RETURN(lock);
 }
 
 /**
@@ -544,12 +543,13 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
 
         lu_ref_add(&newres->lr_reference, "lock", lock);
         /*
-         * To flip the lock from the old to the new resource, lock, oldres and
-         * newres have to be locked. Resource spin-locks are nested within
-         * lock->l_lock, and are taken in the memory address order to avoid
-         * dead-locks.
+        * To flip the lock from the old to the new resource, oldres
+        * and newres have to be locked. Resource spin-locks are taken
+        * in the memory address order to avoid dead-locks.
+        * As this is the only circumstance where ->l_resource
+        * can change, and this cannot race with itself, it is safe
+        * to access lock->l_resource without being careful about locking.
          */
-       spin_lock(&lock->l_lock);
         oldres = lock->l_resource;
         if (oldres < newres) {
                 lock_res(oldres);
@@ -560,9 +560,9 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
         }
         LASSERT(memcmp(new_resid, &oldres->lr_name,
                        sizeof oldres->lr_name) != 0);
-        lock->l_resource = newres;
+       rcu_assign_pointer(lock->l_resource, newres);
         unlock_res(oldres);
-        unlock_res_and_lock(lock);
+       unlock_res(newres);
 
         /* ...and the flowers are still standing! */
         lu_ref_del(&oldres->lr_reference, "lock", lock);
@@ -2760,15 +2760,11 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
        struct va_format vaf;
         char *nid = "local";
 
-       /* on server-side resource of lock doesn't change */
-       if ((lock->l_flags & LDLM_FL_NS_SRV) != 0) {
-               if (lock->l_resource != NULL)
-                       resource = ldlm_resource_getref(lock->l_resource);
-       } else if (spin_trylock(&lock->l_lock)) {
-               if (lock->l_resource != NULL)
-                       resource = ldlm_resource_getref(lock->l_resource);
-               spin_unlock(&lock->l_lock);
-       }
+       rcu_read_lock();
+       resource = rcu_dereference(lock->l_resource);
+       if (resource && !atomic_inc_not_zero(&resource->lr_refcount))
+               resource = NULL;
+       rcu_read_unlock();
 
         va_start(args, fmt);
        vaf.fmt = fmt;
index 5f331cc..f7a320f 100644 (file)
@@ -40,6 +40,7 @@
 #include <linux/kthread.h>
 #include <linux/list.h>
 #include <libcfs/libcfs.h>
+#include <libcfs/linux/linux-mem.h>
 #include <lustre_errno.h>
 #include <lustre_dlm.h>
 #include <obd_class.h>
@@ -3372,11 +3373,30 @@ static int ldlm_cleanup(void)
        RETURN(0);
 }
 
+void ldlm_resource_init_once(void *p)
+{
+       /*
+        * It is import to initialise the spinlock only once,
+        * as ldlm_lock_change_resource() could try to lock
+        * the resource *after* it has been freed and possibly
+        * reused. SLAB_TYPESAFE_BY_RCU ensures the memory won't
+        * be freed while the lock is being taken, but we need to
+        * ensure that it doesn't get reinitialized either.
+        */
+       struct ldlm_resource *res = p;
+
+       memset(res, 0, sizeof(*res));
+       mutex_init(&res->lr_lvb_mutex);
+       spin_lock_init(&res->lr_lock);
+}
+
 int ldlm_init(void)
 {
        ldlm_resource_slab = kmem_cache_create("ldlm_resources",
                                               sizeof(struct ldlm_resource), 0,
-                                              SLAB_HWCACHE_ALIGN, NULL);
+                                              SLAB_TYPESAFE_BY_RCU |
+                                              SLAB_HWCACHE_ALIGN,
+                                              ldlm_resource_init_once);
        if (ldlm_resource_slab == NULL)
                return -ENOMEM;
 
@@ -3436,6 +3456,13 @@ void ldlm_exit(void)
 {
        if (ldlm_refcount)
                CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
+       /* These two lines should not be needed, but appear to fix
+        * a crash on RHEL7. The slab_cache sometimes gets freed before the
+        * last slab is rcu_freed, and that can cause kmem_freepages()
+        * to free too many pages and trip a BUG
+        */
+       kmem_cache_shrink(ldlm_resource_slab);
+       synchronize_rcu();
        kmem_cache_destroy(ldlm_resource_slab);
        /*
         * ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
index 2976e4b..6691acd 100644 (file)
@@ -1367,7 +1367,7 @@ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type)
        struct ldlm_resource *res;
        bool rc;
 
-       OBD_SLAB_ALLOC_PTR_GFP(res, ldlm_resource_slab, GFP_NOFS);
+       res = kmem_cache_alloc(ldlm_resource_slab, GFP_NOFS);
        if (res == NULL)
                return NULL;
 
@@ -1383,20 +1383,21 @@ static struct ldlm_resource *ldlm_resource_new(enum ldlm_type ldlm_type)
                break;
        }
        if (!rc) {
-               OBD_SLAB_FREE_PTR(res, ldlm_resource_slab);
+               kmem_cache_free(ldlm_resource_slab, res);
                return NULL;
        }
 
        INIT_LIST_HEAD(&res->lr_granted);
        INIT_LIST_HEAD(&res->lr_waiting);
+       res->lr_lvb_data = NULL;
+       res->lr_lvb_inode = NULL;
+       res->lr_lvb_len = 0;
 
        atomic_set(&res->lr_refcount, 1);
-       spin_lock_init(&res->lr_lock);
        lu_ref_init(&res->lr_reference);
 
        /* Since LVB init can be delayed now, there is no longer need to
         * immediatelly acquire mutex here. */
-       mutex_init(&res->lr_lvb_mutex);
        res->lr_lvb_initialized = false;
 
        return res;
@@ -1413,7 +1414,7 @@ static void ldlm_resource_free(struct ldlm_resource *res)
                        OBD_FREE_PTR(res->lr_ibits_queues);
        }
 
-       OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+       kmem_cache_free(ldlm_resource_slab, res);
 }
 
 /**