# include <linux/mm_inline.h>
#endif
+#ifndef SLAB_TYPESAFE_BY_RCU
+#define SLAB_TYPESAFE_BY_RCU SLAB_DESTROY_BY_RCU
+#endif
+
/*
* Shrinker
*/
*/
struct portals_handle l_handle;
/**
- * Internal spinlock protects l_resource. We should hold this lock
- * first before taking res_lock.
- */
- spinlock_t l_lock;
- /**
* Pointer to actual resource this lock is in.
- * ldlm_lock_change_resource() can change this.
+ * ldlm_lock_change_resource() can change this on the client.
+ * When this is possible, rcu must be used to stablise
+ * the resource while we lock and check it hasn't been changed.
*/
- struct ldlm_resource *l_resource;
+ struct ldlm_resource __rcu *l_resource;
/**
* List item for client side LRU list.
* Protected by ns_lock in struct ldlm_namespace.
*
* LDLM locking uses resource to serialize access to locks
* but there is a case when we change resource of lock upon
- * enqueue reply. We rely on lock->l_resource = new_res
+ * enqueue reply. We rely on rcu_assign_pointer(lock->l_resource, new_res)
* being an atomic operation.
*/
struct ldlm_resource *lock_res_and_lock(struct ldlm_lock *lock)
{
- /* on server-side resource of lock doesn't change */
- if (!ldlm_is_ns_srv(lock))
- spin_lock(&lock->l_lock);
+ struct ldlm_resource *res;
- lock_res(lock->l_resource);
-
- ldlm_set_res_locked(lock);
- return lock->l_resource;
+ rcu_read_lock();
+ while (1) {
+ res = rcu_dereference(lock->l_resource);
+ lock_res(res);
+ if (res == lock->l_resource) {
+ ldlm_set_res_locked(lock);
+ rcu_read_unlock();
+ return res;
+ }
+ unlock_res(res);
+ }
}
EXPORT_SYMBOL(lock_res_and_lock);
*/
void unlock_res_and_lock(struct ldlm_lock *lock)
{
- /* on server-side resource of lock doesn't change */
ldlm_clear_res_locked(lock);
unlock_res(lock->l_resource);
- if (!ldlm_is_ns_srv(lock))
- spin_unlock(&lock->l_lock);
}
EXPORT_SYMBOL(unlock_res_and_lock);
if (lock == NULL)
RETURN(NULL);
- spin_lock_init(&lock->l_lock);
- lock->l_resource = resource;
+ RCU_INIT_POINTER(lock->l_resource, resource);
lu_ref_add(&resource->lr_reference, "lock", lock);
refcount_set(&lock->l_handle.h_ref, 2);
INIT_HLIST_NODE(&lock->l_exp_hash);
INIT_HLIST_NODE(&lock->l_exp_flock_hash);
- lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
- LDLM_NSS_LOCKS);
+ lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
+ LDLM_NSS_LOCKS);
INIT_HLIST_NODE(&lock->l_handle.h_link);
class_handle_hash(&lock->l_handle, lock_handle_owner);
- lu_ref_init(&lock->l_reference);
- lu_ref_add(&lock->l_reference, "hash", lock);
- lock->l_callback_timeout = 0;
+ lu_ref_init(&lock->l_reference);
+ lu_ref_add(&lock->l_reference, "hash", lock);
+ lock->l_callback_timeout = 0;
lock->l_activity = 0;
#if LUSTRE_TRACKS_LOCK_EXP_REFS
INIT_LIST_HEAD(&lock->l_exp_refs_link);
- lock->l_exp_refs_nr = 0;
- lock->l_exp_refs_target = NULL;
+ lock->l_exp_refs_nr = 0;
+ lock->l_exp_refs_target = NULL;
#endif
INIT_LIST_HEAD(&lock->l_exp_list);
- RETURN(lock);
+ RETURN(lock);
}
/**
lu_ref_add(&newres->lr_reference, "lock", lock);
/*
- * To flip the lock from the old to the new resource, lock, oldres and
- * newres have to be locked. Resource spin-locks are nested within
- * lock->l_lock, and are taken in the memory address order to avoid
- * dead-locks.
+ * To flip the lock from the old to the new resource, oldres
+ * and newres have to be locked. Resource spin-locks are taken
+ * in the memory address order to avoid dead-locks.
+ * As this is the only circumstance where ->l_resource
+ * can change, and this cannot race with itself, it is safe
+ * to access lock->l_resource without being careful about locking.
*/
- spin_lock(&lock->l_lock);
oldres = lock->l_resource;
if (oldres < newres) {
lock_res(oldres);
}
LASSERT(memcmp(new_resid, &oldres->lr_name,
sizeof oldres->lr_name) != 0);
- lock->l_resource = newres;
+ rcu_assign_pointer(lock->l_resource, newres);
unlock_res(oldres);
- unlock_res_and_lock(lock);
+ unlock_res(newres);
/* ...and the flowers are still standing! */
lu_ref_del(&oldres->lr_reference, "lock", lock);
struct va_format vaf;
char *nid = "local";
- /* on server-side resource of lock doesn't change */
- if ((lock->l_flags & LDLM_FL_NS_SRV) != 0) {
- if (lock->l_resource != NULL)
- resource = ldlm_resource_getref(lock->l_resource);
- } else if (spin_trylock(&lock->l_lock)) {
- if (lock->l_resource != NULL)
- resource = ldlm_resource_getref(lock->l_resource);
- spin_unlock(&lock->l_lock);
- }
+ rcu_read_lock();
+ resource = rcu_dereference(lock->l_resource);
+ if (resource && !atomic_inc_not_zero(&resource->lr_refcount))
+ resource = NULL;
+ rcu_read_unlock();
va_start(args, fmt);
vaf.fmt = fmt;
#include <linux/kthread.h>
#include <linux/list.h>
#include <libcfs/libcfs.h>
+#include <libcfs/linux/linux-mem.h>
#include <lustre_errno.h>
#include <lustre_dlm.h>
#include <obd_class.h>
RETURN(0);
}
+void ldlm_resource_init_once(void *p)
+{
+ /*
+ * It is import to initialise the spinlock only once,
+ * as ldlm_lock_change_resource() could try to lock
+ * the resource *after* it has been freed and possibly
+ * reused. SLAB_TYPESAFE_BY_RCU ensures the memory won't
+ * be freed while the lock is being taken, but we need to
+ * ensure that it doesn't get reinitialized either.
+ */
+ struct ldlm_resource *res = p;
+
+ memset(res, 0, sizeof(*res));
+ mutex_init(&res->lr_lvb_mutex);
+ spin_lock_init(&res->lr_lock);
+}
+
int ldlm_init(void)
{
ldlm_resource_slab = kmem_cache_create("ldlm_resources",
sizeof(struct ldlm_resource), 0,
- SLAB_HWCACHE_ALIGN, NULL);
+ SLAB_TYPESAFE_BY_RCU |
+ SLAB_HWCACHE_ALIGN,
+ ldlm_resource_init_once);
if (ldlm_resource_slab == NULL)
return -ENOMEM;
{
if (ldlm_refcount)
CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
+ /* These two lines should not be needed, but appear to fix
+ * a crash on RHEL7. The slab_cache sometimes gets freed before the
+ * last slab is rcu_freed, and that can cause kmem_freepages()
+ * to free too many pages and trip a BUG
+ */
+ kmem_cache_shrink(ldlm_resource_slab);
+ synchronize_rcu();
kmem_cache_destroy(ldlm_resource_slab);
/*
* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
struct ldlm_resource *res;
bool rc;
- OBD_SLAB_ALLOC_PTR_GFP(res, ldlm_resource_slab, GFP_NOFS);
+ res = kmem_cache_alloc(ldlm_resource_slab, GFP_NOFS);
if (res == NULL)
return NULL;
break;
}
if (!rc) {
- OBD_SLAB_FREE_PTR(res, ldlm_resource_slab);
+ kmem_cache_free(ldlm_resource_slab, res);
return NULL;
}
INIT_LIST_HEAD(&res->lr_granted);
INIT_LIST_HEAD(&res->lr_waiting);
+ res->lr_lvb_data = NULL;
+ res->lr_lvb_inode = NULL;
+ res->lr_lvb_len = 0;
atomic_set(&res->lr_refcount, 1);
- spin_lock_init(&res->lr_lock);
lu_ref_init(&res->lr_reference);
/* Since LVB init can be delayed now, there is no longer need to
* immediatelly acquire mutex here. */
- mutex_init(&res->lr_lvb_mutex);
res->lr_lvb_initialized = false;
return res;
OBD_FREE_PTR(res->lr_ibits_queues);
}
- OBD_SLAB_FREE(res, ldlm_resource_slab, sizeof *res);
+ kmem_cache_free(ldlm_resource_slab, res);
}
/**