Whamcloud - gitweb
LU-11089 obd: use wait_event_var() in lu_context_key_degister() 67/33667/13
authorNeilBrown <neilb@suse.com>
Tue, 21 May 2019 14:22:39 +0000 (10:22 -0400)
committerOleg Drokin <green@whamcloud.com>
Sat, 1 Jun 2019 03:55:48 +0000 (03:55 +0000)
lu_context_key_degister() has an open coded loop which calls
schedule() without setting a new task state.  This is generally
a bad idea - it could easily just spin.

Instead, use wait_event_var() to wait for ->lct_used to be zero,
and arrange to get a wakeup when that happens.
Previously ->lct_used would only fall down to 1.  Now we decrement
it an extra time so that wake_up, which only happens when the
count reaches zero, will only happen when lu_context_key_degister()
is actually waiting for it.

Note that this patch removes key_fini() from protection by
lu_keys_guard.  key_fini() calls are not always protected
by a lock, and there seems to be no need here.  Nothing else
can be acting on the given key in that context at this point,
so no race is possible.

Linux-commit: ef84c07364211bb4e398a9de45d1c13a32059cee

Change-Id: I9514bd21916f75fce00e393612967fb197e3a1c4
Signed-off-by: NeilBrown <neilb@suse.com>
Signed-off-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-on: https://review.whamcloud.com/33667
Tested-by: Jenkins
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Petros Koutoupis <pkoutoupis@cray.com>
Reviewed-by: Gu Zheng <gzheng@ddn.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
libcfs/autoconf/lustre-libcfs.m4
libcfs/include/libcfs/libcfs.h
libcfs/include/libcfs/linux/Makefile.am
libcfs/include/libcfs/linux/linux-misc.h
libcfs/include/libcfs/linux/linux-wait.h [new file with mode: 0644]
libcfs/libcfs/Makefile.in
libcfs/libcfs/linux/Makefile.am
libcfs/libcfs/linux/linux-wait.c [new file with mode: 0644]
lustre/obdclass/lu_object.c

index 12efc01..3f87822 100644 (file)
@@ -274,6 +274,21 @@ ktime_get_ts64, [
 ]) # LIBCFS_KTIME_GET_TS64
 
 #
 ]) # LIBCFS_KTIME_GET_TS64
 
 #
+# Kernel version 3.12-rc4 commit c2d816443ef30 added prepare_to_wait_event()
+#
+AC_DEFUN([LIBCFS_PREPARE_TO_WAIT_EVENT],[
+LB_CHECK_COMPILE([does function 'prepare_to_wait_event' exist],
+prepare_to_wait_event, [
+       #include <linux/wait.h>
+],[
+       prepare_to_wait_event(NULL, NULL, 0);
+],[
+       AC_DEFINE(HAVE_PREPARE_TO_WAIT_EVENT, 1,
+               ['prepare_to_wait_event' is available])
+])
+]) # LIBCFS_PREPARE_TO_WAIT_EVENT
+
+#
 # Linux kernel 3.12 introduced struct kernel_param_ops
 # This has been backported to all lustre supported
 # clients except RHEL6. We have to handle the differences.
 # Linux kernel 3.12 introduced struct kernel_param_ops
 # This has been backported to all lustre supported
 # clients except RHEL6. We have to handle the differences.
@@ -914,6 +929,35 @@ LB_CHECK_LINUX_HEADER([linux/processor.h], [
                [processor.h is present])])
 ]) # LIBCFS_HAVE_PROCESSOR_HEADER
 
                [processor.h is present])])
 ]) # LIBCFS_HAVE_PROCESSOR_HEADER
 
+#
+# Kernel verison 4.12-rc6 commit 5dd43ce2f69d42a71dcacdb13d17d8c0ac1fe8f7
+# created wait_bit.h
+#
+AC_DEFUN([LIBCFS_HAVE_WAIT_BIT_HEADER], [
+LB_CHECK_LINUX_HEADER([linux/wait_bit.h], [
+       AC_DEFINE(HAVE_WAIT_BIT_HEADER_H, 1,
+               [wait_bit.h is present])])
+]) # LIBCFS_HAVE_WAIT_BIT_HEADER
+
+#
+# Kernel version 4.12-rc6 commmit 2055da97389a605c8a00d163d40903afbe413921
+# changed:
+#      struct wait_queue_head::task_list       => ::head
+#      struct wait_queue_entry::task_list      => ::entry
+#
+AC_DEFUN([LIBCFS_WAIT_QUEUE_TASK_LIST_RENAME], [
+LB_CHECK_COMPILE([if linux wait_queue_head list_head is named head],
+wait_queue_task_list, [
+       #include <linux/wait.h>
+],[
+       wait_queue_head_t e;
+
+       INIT_LIST_HEAD(&e.head);
+],[
+       AC_DEFINE(HAVE_WAIT_QUEUE_ENTRY_LIST, 1,
+               [linux wait_queue_head_t list_head is name head])
+])
+]) # LIBCFS_WAIT_QUEUE_TASK_LIST_RENAME
 
 #
 # LIBCFS_WAIT_QUEUE_ENTRY
 
 #
 # LIBCFS_WAIT_QUEUE_ENTRY
@@ -998,6 +1042,48 @@ timer_setup, [
 ]) # LIBCFS_TIMER_SETUP
 
 #
 ]) # LIBCFS_TIMER_SETUP
 
 #
+# LIBCFS_WAIT_VAR_EVENT
+#
+# Kernel version 4.16-rc4 commit 6b2bb7265f0b62605e8caee3613449ed0db270b9
+# added wait_var_event()
+#
+AC_DEFUN([LIBCFS_WAIT_VAR_EVENT], [
+LB_CHECK_COMPILE([if 'wait_var_event' exist],
+wait_var_event, [
+       #ifdef HAVE_WAIT_BIT_HEADER_H
+       #include <linux/wait_bit.h>
+       #endif
+       #include <linux/wait.h>
+],[
+       wake_up_var(NULL);
+],[
+       AC_DEFINE(HAVE_WAIT_VAR_EVENT, 1,
+               ['wait_var_event' is available])
+])
+]) # LIBCFS_WAIT_VAR_EVENT
+
+#
+# LIBCFS_CLEAR_AND_WAKE_UP_BIT
+#
+# Kernel version 4.17-rc2 commit 8236b0ae31c837d2b3a2565c5f8d77f637e824cc
+# added clear_and_wake_up_bit()
+#
+AC_DEFUN([LIBCFS_CLEAR_AND_WAKE_UP_BIT], [
+LB_CHECK_COMPILE([if 'clear_and_wake_up_bit' exist],
+clear_and_wake_up_bit, [
+       #ifdef HAVE_WAIT_BIT_HEADER_H
+       #include <linux/wait_bit.h>
+       #endif
+       #include <linux/wait.h>
+],[
+       clear_and_wake_up_bit(0, NULL);
+],[
+       AC_DEFINE(HAVE_CLEAR_AND_WAKE_UP_BIT, 1,
+               ['clear_and_wake_up_bit' is available])
+])
+]) # LIBCFS_CLEAR_AND_WAKE_UP_BIT
+
+#
 # LIBCFS_PROG_LINUX
 #
 # LibCFS linux kernel checks
 # LIBCFS_PROG_LINUX
 #
 # LibCFS linux kernel checks
@@ -1033,6 +1119,7 @@ LIBCFS_ENABLE_CRC32C_ACCEL
 # 3.11
 LIBCFS_KTIME_GET_TS64
 # 3.12
 # 3.11
 LIBCFS_KTIME_GET_TS64
 # 3.12
+LIBCFS_PREPARE_TO_WAIT_EVENT
 LIBCFS_KERNEL_PARAM_OPS
 LIBCFS_KTIME_ADD
 LIBCFS_KTIME_AFTER
 LIBCFS_KERNEL_PARAM_OPS
 LIBCFS_KTIME_ADD
 LIBCFS_KTIME_AFTER
@@ -1083,6 +1170,8 @@ LIBCFS_RHASHTABLE_LOOKUP_GET_INSERT_FAST
 LIBCFS_SCHED_HEADERS
 # 4.12
 LIBCFS_HAVE_PROCESSOR_HEADER
 LIBCFS_SCHED_HEADERS
 # 4.12
 LIBCFS_HAVE_PROCESSOR_HEADER
+LIBCFS_HAVE_WAIT_BIT_HEADER
+LIBCFS_WAIT_QUEUE_TASK_LIST_RENAME
 LIBCFS_UUID_T
 # 4.13
 LIBCFS_WAIT_QUEUE_ENTRY
 LIBCFS_UUID_T
 # 4.13
 LIBCFS_WAIT_QUEUE_ENTRY
@@ -1091,6 +1180,10 @@ LIBCFS_DEFINE_TIMER
 LIBCFS_NEW_KERNEL_WRITE
 # 4.15
 LIBCFS_TIMER_SETUP
 LIBCFS_NEW_KERNEL_WRITE
 # 4.15
 LIBCFS_TIMER_SETUP
+# 4.16
+LIBCFS_WAIT_VAR_EVENT
+# 4.17
+LIBCFS_CLEAR_AND_WAKE_UP_BIT
 ]) # LIBCFS_PROG_LINUX
 
 #
 ]) # LIBCFS_PROG_LINUX
 
 #
index e3bf27d..bec5974 100644 (file)
@@ -39,6 +39,7 @@
 
 #include <libcfs/linux/linux-misc.h>
 #include <libcfs/linux/linux-time.h>
 
 #include <libcfs/linux/linux-misc.h>
 #include <libcfs/linux/linux-time.h>
+#include <libcfs/linux/linux-wait.h>
 
 #include <uapi/linux/lnet/libcfs_ioctl.h>
 #include <libcfs/libcfs_debug.h>
 
 #include <uapi/linux/lnet/libcfs_ioctl.h>
 #include <libcfs/libcfs_debug.h>
index 82bc7db..6942d9f 100644 (file)
@@ -1,3 +1,3 @@
 EXTRA_DIST = linux-misc.h linux-fs.h linux-mem.h linux-time.h linux-cpu.h \
 EXTRA_DIST = linux-misc.h linux-fs.h linux-mem.h linux-time.h linux-cpu.h \
-            linux-list.h linux-hash.h linux-uuid.h linux-crypto.h \
+            linux-list.h linux-hash.h linux-uuid.h linux-crypto.h linux-wait.h \
             processor.h
             processor.h
index 4901053..230d8d4 100644 (file)
@@ -113,10 +113,6 @@ static inline bool gid_valid(kgid_t gid)
 
 int cfs_get_environ(const char *key, char *value, int *val_len);
 
 
 int cfs_get_environ(const char *key, char *value, int *val_len);
 
-#ifndef HAVE_WAIT_QUEUE_ENTRY
-#define wait_queue_entry_t wait_queue_t
-#endif
-
 int cfs_kernel_write(struct file *filp, const void *buf, size_t count,
                     loff_t *pos);
 
 int cfs_kernel_write(struct file *filp, const void *buf, size_t count,
                     loff_t *pos);
 
diff --git a/libcfs/include/libcfs/linux/linux-wait.h b/libcfs/include/libcfs/linux/linux-wait.h
new file mode 100644 (file)
index 0000000..b0142c7
--- /dev/null
@@ -0,0 +1,126 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __LIBCFS_LINUX_WAIT_BIT_H
+#define __LIBCFS_LINUX_WAIT_BIT_H
+
+/*
+ * Linux wait-bit related types and methods:
+ */
+#ifdef HAVE_WAIT_BIT_HEADER_H
+#include <linux/wait_bit.h>
+#endif
+#include <linux/wait.h>
+
+#ifndef HAVE_WAIT_QUEUE_ENTRY
+#define wait_queue_entry_t wait_queue_t
+#endif
+
+#ifndef HAVE_WAIT_BIT_HEADER_H
+struct wait_bit_queue_entry {
+       struct wait_bit_key     key;
+       wait_queue_entry_t      wq_entry;
+};
+
+#define ___wait_is_interruptible(state)                                         \
+       (!__builtin_constant_p(state) ||                                        \
+               state == TASK_INTERRUPTIBLE || state == TASK_KILLABLE)          \
+
+#endif /* ! HAVE_WAIT_BIT_HEADER_H */
+
+#ifndef HAVE_PREPARE_TO_WAIT_EVENT
+extern long prepare_to_wait_event(wait_queue_head_t *wq_head,
+                                 wait_queue_entry_t *wq_entry, int state);
+#endif
+
+#ifndef HAVE_CLEAR_AND_WAKE_UP_BIT
+/**
+ * clear_and_wake_up_bit - clear a bit and wake up anyone waiting on that bit
+ *
+ * @bit: the bit of the word being waited on
+ * @word: the word being waited on, a kernel virtual address
+ *
+ * You can use this helper if bitflags are manipulated atomically rather than
+ * non-atomically under a lock.
+ */
+static inline void clear_and_wake_up_bit(int bit, void *word)
+{
+       clear_bit_unlock(bit, word);
+       /* See wake_up_bit() for which memory barrier you need to use. */
+       smp_mb__after_atomic();
+       wake_up_bit(word, bit);
+}
+#endif /* ! HAVE_CLEAR_AND_WAKE_UP_BIT */
+
+#ifndef HAVE_WAIT_VAR_EVENT
+extern void init_wait_var_entry(struct wait_bit_queue_entry *wbq_entry,
+                               void *var, int flags);
+extern void wake_up_var(void *var);
+extern wait_queue_head_t *__var_waitqueue(void *p);
+
+#define ___wait_var_event(var, condition, state, exclusive, ret, cmd)  \
+({                                                                     \
+       __label__ __out;                                                \
+       wait_queue_head_t *__wq_head = __var_waitqueue(var);            \
+       struct wait_bit_queue_entry __wbq_entry;                        \
+       long __ret = ret; /* explicit shadow */                         \
+                                                                       \
+       init_wait_var_entry(&__wbq_entry, var,                          \
+                           exclusive ? WQ_FLAG_EXCLUSIVE : 0);         \
+       for (;;) {                                                      \
+               long __int = prepare_to_wait_event(__wq_head,           \
+                                                  &__wbq_entry.wq_entry, \
+                                                  state);              \
+               if (condition)                                          \
+                       break;                                          \
+                                                                       \
+               if (___wait_is_interruptible(state) && __int) {         \
+                       __ret = __int;                                  \
+                       goto __out;                                     \
+               }                                                       \
+                                                                       \
+               cmd;                                                    \
+       }                                                               \
+       finish_wait(__wq_head, &__wbq_entry.wq_entry);                  \
+__out: __ret;                                                          \
+})
+
+#define __wait_var_event(var, condition)                               \
+       ___wait_var_event(var, condition, TASK_UNINTERRUPTIBLE, 0, 0,   \
+                         schedule())
+
+#define wait_var_event(var, condition)                                 \
+do {                                                                   \
+       might_sleep();                                                  \
+       if (condition)                                                  \
+               break;                                                  \
+       __wait_var_event(var, condition);                               \
+} while (0)
+
+#define __wait_var_event_killable(var, condition)                      \
+       ___wait_var_event(var, condition, TASK_KILLABLE, 0, 0,          \
+                         schedule())
+
+#define wait_var_event_killable(var, condition)                                \
+({                                                                     \
+       int __ret = 0;                                                  \
+       might_sleep();                                                  \
+       if (!(condition))                                               \
+               __ret = __wait_var_event_killable(var, condition);      \
+       __ret;                                                          \
+})
+
+#define __wait_var_event_timeout(var, condition, timeout)              \
+       ___wait_var_event(var, ___wait_cond_timeout(condition),         \
+                         TASK_UNINTERRUPTIBLE, 0, timeout,             \
+                         __ret = schedule_timeout(__ret))
+
+#define wait_var_event_timeout(var, condition, timeout)                        \
+({                                                                     \
+       long __ret = timeout;                                           \
+       might_sleep();                                                  \
+       if (!___wait_cond_timeout(condition))                           \
+               __ret = __wait_var_event_timeout(var, condition, timeout); \
+       __ret;                                                          \
+})
+#endif /* ! HAVE_WAIT_VAR_EVENT */
+
+#endif /* __LICBFS_LINUX_WAIT_BIT_H */
index c75376a..5c80483 100644 (file)
@@ -6,6 +6,7 @@ libcfs-linux-objs += linux-curproc.o
 libcfs-linux-objs += linux-hash.o
 libcfs-linux-objs += linux-module.o
 libcfs-linux-objs += linux-crypto.o linux-crypto-adler.o
 libcfs-linux-objs += linux-hash.o
 libcfs-linux-objs += linux-module.o
 libcfs-linux-objs += linux-crypto.o linux-crypto-adler.o
+libcfs-linux-objs += linux-wait.o
 @HAVE_CRC32_TRUE@libcfs-linux-objs += linux-crypto-crc32.o
 @HAVE_PCLMULQDQ_TRUE@@NEED_PCLMULQDQ_CRC32_TRUE@libcfs-linux-objs += linux-crypto-crc32pclmul.o crc32-pclmul_asm.o
 @HAVE_PCLMULQDQ_TRUE@@NEED_PCLMULQDQ_CRC32C_TRUE@libcfs-linux-objs += linux-crypto-crc32c-pclmul.o crc32c-pcl-intel-asm_64.o
 @HAVE_CRC32_TRUE@libcfs-linux-objs += linux-crypto-crc32.o
 @HAVE_PCLMULQDQ_TRUE@@NEED_PCLMULQDQ_CRC32_TRUE@libcfs-linux-objs += linux-crypto-crc32pclmul.o crc32-pclmul_asm.o
 @HAVE_PCLMULQDQ_TRUE@@NEED_PCLMULQDQ_CRC32C_TRUE@libcfs-linux-objs += linux-crypto-crc32c-pclmul.o crc32c-pcl-intel-asm_64.o
index 14d84c8..d0f59df 100644 (file)
@@ -1,5 +1,5 @@
 EXTRA_DIST = linux-debug.c linux-prim.c linux-tracefile.c      \
 EXTRA_DIST = linux-debug.c linux-prim.c linux-tracefile.c      \
-       linux-curproc.c linux-module.c linux-hash.c             \
+       linux-curproc.c linux-module.c linux-hash.c linux-wait.c\
        linux-crypto.c linux-crypto-crc32.c linux-crypto-adler.c\
        linux-crypto-crc32pclmul.c linux-crypto-crc32c-pclmul.c \
        crc32-pclmul_asm.S crc32c-pcl-intel-asm_64.S inst.h
        linux-crypto.c linux-crypto-crc32.c linux-crypto-adler.c\
        linux-crypto-crc32pclmul.c linux-crypto-crc32c-pclmul.c \
        crc32-pclmul_asm.S crc32c-pcl-intel-asm_64.S inst.h
diff --git a/libcfs/libcfs/linux/linux-wait.c b/libcfs/libcfs/linux/linux-wait.c
new file mode 100644 (file)
index 0000000..dd08cac
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * The implementation of the wait_bit*() and related waiting APIs:
+ */
+#include <linux/hash.h>
+#include <linux/sched.h>
+#ifdef HAVE_SCHED_HEADERS
+#include <linux/sched/signal.h>
+#endif
+#include <libcfs/linux/linux-wait.h>
+
+#ifndef HAVE_PREPARE_TO_WAIT_EVENT
+
+#define __add_wait_queue_entry_tail __add_wait_queue_tail
+
+long prepare_to_wait_event(wait_queue_head_t *wq_head,
+                          wait_queue_entry_t *wq_entry, int state)
+{
+       unsigned long flags;
+       long ret = 0;
+
+       spin_lock_irqsave(&wq_head->lock, flags);
+       if (unlikely(signal_pending_state(state, current))) {
+               /*
+                * Exclusive waiter must not fail if it was selected by wakeup,
+                * it should "consume" the condition we were waiting for.
+                *
+                * The caller will recheck the condition and return success if
+                * we were already woken up, we can not miss the event because
+                * wakeup locks/unlocks the same wq_head->lock.
+                *
+                * But we need to ensure that set-condition + wakeup after that
+                * can't see us, it should wake up another exclusive waiter if
+                * we fail.
+                */
+               list_del_init(&wq_entry->task_list);
+               ret = -ERESTARTSYS;
+       } else {
+               if (list_empty(&wq_entry->task_list)) {
+                       if (wq_entry->flags & WQ_FLAG_EXCLUSIVE)
+                               __add_wait_queue_entry_tail(wq_head, wq_entry);
+                       else
+                               __add_wait_queue(wq_head, wq_entry);
+               }
+               set_current_state(state);
+       }
+       spin_unlock_irqrestore(&wq_head->lock, flags);
+
+       return ret;
+}
+EXPORT_SYMBOL(prepare_to_wait_event);
+#endif /* !HAVE_PREPARE_TO_WAIT_EVENT */
+
+#ifndef HAVE_WAIT_VAR_EVENT
+
+#define WAIT_TABLE_BITS 8
+#define WAIT_TABLE_SIZE (1 << WAIT_TABLE_BITS)
+
+static wait_queue_head_t bit_wait_table[WAIT_TABLE_SIZE] __cacheline_aligned;
+
+wait_queue_head_t *__var_waitqueue(void *p)
+{
+       return bit_wait_table + hash_ptr(p, WAIT_TABLE_BITS);
+}
+EXPORT_SYMBOL(__var_waitqueue);
+
+static int
+var_wake_function(wait_queue_entry_t *wq_entry, unsigned int mode,
+                 int sync, void *arg)
+{
+       struct wait_bit_key *key = arg;
+       struct wait_bit_queue_entry *wbq_entry =
+               container_of(wq_entry, struct wait_bit_queue_entry, wq_entry);
+
+       if (wbq_entry->key.flags != key->flags ||
+           wbq_entry->key.bit_nr != key->bit_nr)
+               return 0;
+
+       return autoremove_wake_function(wq_entry, mode, sync, key);
+}
+
+void init_wait_var_entry(struct wait_bit_queue_entry *wbq_entry, void *var,
+                        int flags)
+{
+       *wbq_entry = (struct wait_bit_queue_entry){
+               .key = {
+                       .flags  = (var),
+                       .bit_nr = -1,
+               },
+               .wq_entry = {
+                       .private = current,
+                       .func = var_wake_function,
+#ifdef HAVE_WAIT_QUEUE_ENTRY_LIST
+                       .entry = LIST_HEAD_INIT(wbq_entry->wq_entry.entry),
+#else
+                       .task_list = LIST_HEAD_INIT(wbq_entry->wq_entry.task_list),
+#endif
+               },
+       };
+}
+EXPORT_SYMBOL(init_wait_var_entry);
+
+void wake_up_var(void *var)
+{
+       __wake_up_bit(__var_waitqueue(var), var, -1);
+}
+EXPORT_SYMBOL(wake_up_var);
+#endif /* ! HAVE_WAIT_VAR_EVENT */
index ab578a3..3ddade9 100644 (file)
@@ -1399,7 +1399,8 @@ static void key_fini(struct lu_context *ctx, int index)
 
                 key->lct_fini(ctx, key, ctx->lc_value[index]);
                 lu_ref_del(&key->lct_reference, "ctx", ctx);
 
                 key->lct_fini(ctx, key, ctx->lc_value[index]);
                 lu_ref_del(&key->lct_reference, "ctx", ctx);
-               atomic_dec(&key->lct_used);
+               if (atomic_dec_and_test(&key->lct_used))
+                       wake_up_var(&key->lct_used);
 
                LASSERT(key->lct_owner != NULL);
                if ((ctx->lc_tags & LCT_NOREF) == 0) {
 
                LASSERT(key->lct_owner != NULL);
                if ((ctx->lc_tags & LCT_NOREF) == 0) {
@@ -1420,28 +1421,23 @@ void lu_context_key_degister(struct lu_context_key *key)
 
        lu_context_key_quiesce(key);
 
 
        lu_context_key_quiesce(key);
 
-       write_lock(&lu_keys_guard);
        key_fini(&lu_shrink_env.le_ctx, key->lct_index);
 
        /**
         * Wait until all transient contexts referencing this key have
         * run lu_context_key::lct_fini() method.
         */
        key_fini(&lu_shrink_env.le_ctx, key->lct_index);
 
        /**
         * Wait until all transient contexts referencing this key have
         * run lu_context_key::lct_fini() method.
         */
-       while (atomic_read(&key->lct_used) > 1) {
-               write_unlock(&lu_keys_guard);
-               CDEBUG(D_INFO, "lu_context_key_degister: \"%s\" %p, %d\n",
-                      key->lct_owner ? key->lct_owner->name : "", key,
-                      atomic_read(&key->lct_used));
-               schedule();
-               write_lock(&lu_keys_guard);
-       }
+       atomic_dec(&key->lct_used);
+       wait_var_event(&key->lct_used, atomic_read(&key->lct_used) == 0);
+
+       write_lock(&lu_keys_guard);
        if (lu_keys[key->lct_index]) {
                lu_keys[key->lct_index] = NULL;
                lu_ref_fini(&key->lct_reference);
        }
        write_unlock(&lu_keys_guard);
 
        if (lu_keys[key->lct_index]) {
                lu_keys[key->lct_index] = NULL;
                lu_ref_fini(&key->lct_reference);
        }
        write_unlock(&lu_keys_guard);
 
-       LASSERTF(atomic_read(&key->lct_used) == 1,
+       LASSERTF(atomic_read(&key->lct_used) == 0,
                 "key has instances: %d\n",
                 atomic_read(&key->lct_used));
 }
                 "key has instances: %d\n",
                 atomic_read(&key->lct_used));
 }