Whamcloud - gitweb
b=18337
authorjxiong <jxiong>
Tue, 31 Mar 2009 08:22:23 +0000 (08:22 +0000)
committerjxiong <jxiong>
Tue, 31 Mar 2009 08:22:23 +0000 (08:22 +0000)
r=wangdi,shadow

Fixed a crash of bottom to top lock destruction path.

lustre/lov/lovsub_lock.c
lustre/obdclass/cl_internal.h
lustre/obdclass/cl_io.c
lustre/obdclass/cl_lock.c
lustre/obdclass/cl_object.c

index d1f2757..6b96896 100644 (file)
@@ -310,75 +310,148 @@ static int lovsub_lock_closure(const struct lu_env *env,
 }
 
 /**
- * An implementation of cl_lock_operations::clo_delete() method. This is
- * invoked in "bottom-to-top" delete, when lock destruction starts from the
- * sub-lock (e.g, as a result of ldlm lock LRU policy).
+ * A helper function for lovsub_lock_delete() that deals with a given parent
+ * top-lock.
  */
-static void lovsub_lock_delete(const struct lu_env *env,
-                               const struct cl_lock_slice *slice)
+static int lovsub_lock_delete_one(const struct lu_env *env,
+                                  struct cl_lock *child, struct lov_lock *lov)
 {
-        struct lovsub_lock   *sub = cl2lovsub_lock(slice);
-        struct lov_lock      *lov;
         struct cl_lock       *parent;
-        struct lov_lock_link *scan;
-        struct lov_lock_link *temp;
-        struct lov_lock_sub  *subdata;
-
-        LASSERT(cl_lock_is_mutexed(slice->cls_lock));
+        int             result;
         ENTRY;
 
-        list_for_each_entry_safe(scan, temp, &sub->lss_parents, lll_list) {
-                lov     = scan->lll_super;
-                subdata = &lov->lls_sub[scan->lll_idx];
-                parent  = lov->lls_cl.cls_lock;
-                lovsub_parent_lock(env, lov);
-                subdata->sub_got = subdata->sub_descr;
-                lov_lock_unlink(env, scan, sub);
-                CDEBUG(D_DLMTRACE, "%p %p %i %i\n", parent, sub,
-                       lov->lls_nr_filled, parent->cll_state);
-                switch (parent->cll_state) {
-                case CLS_NEW:
-                case CLS_QUEUING:
-                case CLS_ENQUEUED:
-                case CLS_FREEING:
-                        cl_lock_signal(env, parent);
-                        break;
-                case CLS_UNLOCKING:
-                        /*
-                         * Here lies a problem: a sub-lock is canceled while
-                         * top-lock is being unlocked. Top-lock cannot be
-                         * moved into CLS_NEW state, because unlocking has to
-                         * succeed eventually by placing lock into CLS_CACHED
-                         * (or failing it), see cl_unuse_try(). Nor can
-                         * top-lock be left in CLS_CACHED state, because lov
-                         * maintains an invariant that all sub-locks exist in
-                         * CLS_CACHED (this allows cached top-lock to be
-                         * reused immediately). Nor can we wait for top-lock
-                         * state to change, because this can be synchronous to
-                         * the current thread.
+        parent  = lov->lls_cl.cls_lock;
+        result = 0;
+
+        switch (parent->cll_state) {
+        case CLS_NEW:
+        case CLS_QUEUING:
+        case CLS_ENQUEUED:
+        case CLS_FREEING:
+                cl_lock_signal(env, parent);
+                break;
+        case CLS_UNLOCKING:
+                /*
+                 * Here lies a problem: a sub-lock is canceled while top-lock
+                 * is being unlocked. Top-lock cannot be moved into CLS_NEW
+                 * state, because unlocking has to succeed eventually by
+                 * placing lock into CLS_CACHED (or failing it), see
+                 * cl_unuse_try(). Nor can top-lock be left in CLS_CACHED
+                 * state, because lov maintains an invariant that all
+                 * sub-locks exist in CLS_CACHED (this allows cached top-lock
+                 * to be reused immediately). Nor can we wait for top-lock
+                 * state to change, because this can be synchronous to the
+                 * current thread.
+                         *
+                 * We know for sure that lov_lock_unuse() will be called at
+                 * least one more time to finish un-using, so leave a mark on
+                 * the top-lock, that will be seen by the next call to
+                 * lov_lock_unuse().
+                 */
+                lov->lls_unuse_race = 1;
+                break;
+        case CLS_CACHED:
+                /*
+                 * if a sub-lock is canceled move its top-lock into CLS_NEW
+                 * state to preserve an invariant that a top-lock in
+                 * CLS_CACHED is immediately ready for re-use (i.e., has all
+                 * sub-locks), and so that next attempt to re-use the top-lock
+                 * enqueues missing sub-lock.
+                 */
+                cl_lock_state_set(env, parent, CLS_NEW);
+                /*
+                 * if last sub-lock is canceled, destroy the top-lock (which
+                 * is now `empty') proactively.
+                 */
+                if (lov->lls_nr_filled == 0) {
+                        /* ... but unfortunately, this cannot be done easily,
+                         * as cancellation of a top-lock might acquire mutices
+                         * of its other sub-locks, violating lock ordering,
+                         * see cl_lock_{cancel,delete}() preconditions.
                          *
-                         * We know for sure that lov_lock_unuse() will be
-                         * called at least one more time to finish un-using,
-                         * so leave a mark on the top-lock, that will be seen
-                         * by the next call to lov_lock_unuse().
+                         * To work around this, the mutex of this sub-lock is
+                         * released, top-lock is destroyed, and sub-lock mutex
+                         * acquired again. The list of parents has to be
+                         * re-scanned from the beginning after this.
+                         *
+                         * Only do this if no mutices other than on @child and
+                         * @parent are held by the current thread.
+                         *
+                         * TODO: The lock modal here is too complex, because
+                         * the lock may be canceled and deleted by voluntarily:
+                         *    cl_lock_request
+                         *      -> osc_lock_enqueue_wait
+                         *        -> osc_lock_cancel_wait
+                         *          -> cl_lock_delete
+                         *            -> lovsub_lock_delete
+                         *              -> cl_lock_cancel/delete
+                         *                -> ...
+                         *
+                         * The better choice is to spawn a kernel thread for
+                         * this purpose. -jay
                          */
-                        lov->lls_unuse_race = 1;
-                        break;
-                case CLS_CACHED:
-                        cl_lock_state_set(env, parent, CLS_NEW);
-                        if (lov->lls_nr_filled == 0) {
+                        if (cl_lock_nr_mutexed(env) == 2) {
+                                cl_lock_mutex_put(env, child);
                                 cl_lock_cancel(env, parent);
                                 cl_lock_delete(env, parent);
-                                cl_lock_signal(env, parent);
+                                result = 1;
                         }
-                        break;
-                case CLS_HELD:
-                default:
-                        CERROR("Impossible state: %i\n", parent->cll_state);
-                        LBUG();
                 }
-                lovsub_parent_unlock(env, lov);
+                break;
+        case CLS_HELD:
+        default:
+                CERROR("Impossible state: %i\n", parent->cll_state);
+                LBUG();
         }
+
+        RETURN(result);
+}
+
+/**
+ * An implementation of cl_lock_operations::clo_delete() method. This is
+ * invoked in "bottom-to-top" delete, when lock destruction starts from the
+ * sub-lock (e.g, as a result of ldlm lock LRU policy).
+ */
+static void lovsub_lock_delete(const struct lu_env *env,
+                               const struct cl_lock_slice *slice)
+{
+        struct cl_lock     *child = slice->cls_lock;
+        struct lovsub_lock *sub   = cl2lovsub_lock(slice);
+        int restart;
+
+        LASSERT(cl_lock_is_mutexed(child));
+
+        ENTRY;
+        /*
+         * Destruction of a sub-lock might take multiple iterations, because
+         * when the last sub-lock of a given top-lock is deleted, top-lock is
+         * canceled proactively, and this requires to release sub-lock
+         * mutex. Once sub-lock mutex has been released, list of its parents
+         * has to be re-scanned from the beginning.
+         */
+        do {
+                struct lov_lock      *lov;
+                struct lov_lock_link *scan;
+                struct lov_lock_link *temp;
+                struct lov_lock_sub  *subdata;
+
+                restart = 0;
+                list_for_each_entry_safe(scan, temp,
+                                         &sub->lss_parents, lll_list) {
+                        lov     = scan->lll_super;
+                        subdata = &lov->lls_sub[scan->lll_idx];
+                        lovsub_parent_lock(env, lov);
+                        subdata->sub_got = subdata->sub_descr;
+                        lov_lock_unlink(env, scan, sub);
+                        restart = lovsub_lock_delete_one(env, child, lov);
+                        lovsub_parent_unlock(env, lov);
+
+                        if (restart) {
+                                cl_lock_mutex_get(env, child);
+                                break;
+                        }
+               }
+        } while (restart);
         EXIT;
 }
 
index 578fdc7..b14a583 100644 (file)
 #define CLT_PVEC_SIZE (14)
 
 /**
+ * Possible levels of the nesting. Currently this is 2: there are "top"
+ * entities (files, extent locks), and "sub" entities (stripes and stripe
+ * locks). This is used only for debugging counters right now.
+ */
+enum clt_nesting_level {
+        CNL_TOP,
+        CNL_SUB,
+        CNL_NR
+};
+
+/**
+ * Counters used to check correctness of cl_lock interface usage.
+ */
+struct cl_thread_counters {
+        /**
+         * Number of outstanding calls to cl_lock_mutex_get() made by the
+         * current thread. For debugging.
+         */
+        int           ctc_nr_locks_locked;
+        /** List of locked locks. */
+        struct lu_ref ctc_locks_locked;
+        /** Number of outstanding holds on locks. */
+        int           ctc_nr_held;
+        /** Number of outstanding uses on locks. */
+        int           ctc_nr_used;
+        /** Number of held extent locks. */
+        int           ctc_nr_locks_acquired;
+};
+
+/**
  * Thread local state internal for generic cl-code.
  */
 struct cl_thread_info {
@@ -58,24 +88,9 @@ struct cl_thread_info {
         struct cl_lock_descr clt_descr;
         struct cl_page_list  clt_list;
         /**
-         * \name debugging.
-         *
-         * Counters used to check correctness of cl_lock interface usage.
-         * @{
+         * Counters for every level of lock nesting.
          */
-        /**
-         * Number of outstanding calls to cl_lock_mutex_get() made by the
-         * current thread. For debugging.
-         */
-        int                  clt_nr_locks_locked;
-        /** List of locked locks. */
-        struct lu_ref        clt_locks_locked;
-        /** Number of outstanding holds on the top-level locks. */
-        int                  clt_nr_held;
-        /** Number of outstanding uses on the top-level locks. */
-        int                  clt_nr_used;
-        /** Number of held top-level extent locks. */
-        int                  clt_nr_locks_acquired;
+        struct cl_thread_counters clt_counters[CNL_NR];
         /** @} debugging */
 
         /*
index 1b10e2a..747ae11 100644 (file)
@@ -466,7 +466,7 @@ void cl_io_unlock(const struct lu_env *env, struct cl_io *io)
                         scan->cis_iop->op[io->ci_type].cio_unlock(env, scan);
         }
         io->ci_state = CIS_UNLOCKED;
-        LASSERT(cl_env_info(env)->clt_nr_locks_acquired == 0);
+        LASSERT(!cl_env_info(env)->clt_counters[CNL_TOP].ctc_nr_locks_acquired);
         EXIT;
 }
 EXPORT_SYMBOL(cl_io_unlock);
index e087eca..bd935a6 100644 (file)
@@ -104,6 +104,29 @@ static int cl_lock_invariant(const struct lu_env *env,
         return result;
 }
 
+/**
+ * Returns lock "nesting": 0 for a top-lock and 1 for a sub-lock.
+ */
+static enum clt_nesting_level cl_lock_nesting(const struct cl_lock *lock)
+{
+        return cl_object_header(lock->cll_descr.cld_obj)->coh_nesting;
+}
+
+/**
+ * Returns a set of counters for this lock, depending on a lock nesting.
+ */
+static struct cl_thread_counters *cl_lock_counters(const struct lu_env *env,
+                                                   const struct cl_lock *lock)
+{
+        struct cl_thread_info *info;
+        enum clt_nesting_level nesting;
+
+        info = cl_env_info(env);
+        nesting = cl_lock_nesting(lock);
+        LASSERT(nesting < ARRAY_SIZE(info->clt_counters));
+        return &info->clt_counters[nesting];
+}
+
 #define RETIP ((unsigned long)__builtin_return_address(0))
 
 #ifdef CONFIG_LOCKDEP
@@ -117,7 +140,7 @@ static void cl_lock_lockdep_init(struct cl_lock *lock)
 static void cl_lock_lockdep_acquire(const struct lu_env *env,
                                     struct cl_lock *lock, __u32 enqflags)
 {
-        cl_env_info(env)->clt_nr_locks_acquired++;
+        cl_lock_counters(env, lock)->ctc_nr_locks_acquired++;
         lock_acquire(&lock->dep_map, !!(enqflags & CEF_ASYNC),
                      /* try: */ 0, lock->cll_descr.cld_mode <= CLM_READ,
                      /* check: */ 2, RETIP);
@@ -126,7 +149,7 @@ static void cl_lock_lockdep_acquire(const struct lu_env *env,
 static void cl_lock_lockdep_release(const struct lu_env *env,
                                     struct cl_lock *lock)
 {
-        cl_env_info(env)->clt_nr_locks_acquired--;
+        cl_lock_counters(env, lock)->ctc_nr_locks_acquired--;
         lock_release(&lock->dep_map, 0, RETIP);
 }
 
@@ -545,23 +568,23 @@ const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
 }
 EXPORT_SYMBOL(cl_lock_at);
 
-static void cl_lock_trace(struct cl_thread_info *info,
+static void cl_lock_trace(struct cl_thread_counters *counters,
                           const char *prefix, const struct cl_lock *lock)
 {
         CDEBUG(D_DLMTRACE|D_TRACE, "%s: %i@%p %p %i %i\n", prefix,
                atomic_read(&lock->cll_ref), lock, lock->cll_guarder,
-               lock->cll_depth, info->clt_nr_locks_locked);
+               lock->cll_depth, counters->ctc_nr_locks_locked);
 }
 
 static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
 {
-        struct cl_thread_info *info;
+        struct cl_thread_counters *counters;
 
-        info = cl_env_info(env);
+        counters = cl_lock_counters(env, lock);
         lock->cll_depth++;
-        info->clt_nr_locks_locked++;
-        lu_ref_add(&info->clt_locks_locked, "cll_guard", lock);
-        cl_lock_trace(info, "got mutex", lock);
+        counters->ctc_nr_locks_locked++;
+        lu_ref_add(&counters->ctc_locks_locked, "cll_guard", lock);
+        cl_lock_trace(counters, "got mutex", lock);
 }
 
 /**
@@ -583,9 +606,17 @@ void cl_lock_mutex_get(const struct lu_env *env, struct cl_lock *lock)
                 LINVRNT(lock->cll_depth > 0);
         } else {
                 struct cl_object_header *hdr;
+                struct cl_thread_info   *info;
+                int i;
 
                 LINVRNT(lock->cll_guarder != cfs_current());
                 hdr = cl_object_header(lock->cll_descr.cld_obj);
+                /*
+                 * Check that mutices are taken in the bottom-to-top order.
+                 */
+                info = cl_env_info(env);
+                for (i = 0; i < hdr->coh_nesting; ++i)
+                        LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
                 mutex_lock_nested(&lock->cll_guard, hdr->coh_nesting);
                 lock->cll_guarder = cfs_current();
                 LINVRNT(lock->cll_depth == 0);
@@ -635,19 +666,19 @@ EXPORT_SYMBOL(cl_lock_mutex_try);
  */
 void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
 {
-        struct cl_thread_info *info;
+        struct cl_thread_counters *counters;
 
         LINVRNT(cl_lock_invariant(env, lock));
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(lock->cll_guarder == cfs_current());
         LINVRNT(lock->cll_depth > 0);
 
-        info = cl_env_info(env);
-        LINVRNT(info->clt_nr_locks_locked > 0);
+        counters = cl_lock_counters(env, lock);
+        LINVRNT(counters->ctc_nr_locks_locked > 0);
 
-        cl_lock_trace(info, "put mutex", lock);
-        lu_ref_del(&info->clt_locks_locked, "cll_guard", lock);
-        info->clt_nr_locks_locked--;
+        cl_lock_trace(counters, "put mutex", lock);
+        lu_ref_del(&counters->ctc_locks_locked, "cll_guard", lock);
+        counters->ctc_nr_locks_locked--;
         if (--lock->cll_depth == 0) {
                 lock->cll_guarder = NULL;
                 mutex_unlock(&lock->cll_guard);
@@ -669,7 +700,19 @@ EXPORT_SYMBOL(cl_lock_is_mutexed);
  */
 int cl_lock_nr_mutexed(const struct lu_env *env)
 {
-        return cl_env_info(env)->clt_nr_locks_locked;
+        struct cl_thread_info *info;
+        int i;
+        int locked;
+
+        /*
+         * NOTE: if summation across all nesting levels (currently 2) proves
+         *       too expensive, a summary counter can be added to
+         *       struct cl_thread_info.
+         */
+        info = cl_env_info(env);
+        for (i = 0, locked = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
+                locked += info->clt_counters[i].ctc_nr_locks_locked;
+        return locked;
 }
 EXPORT_SYMBOL(cl_lock_nr_mutexed);
 
@@ -737,33 +780,43 @@ static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
         EXIT;
 }
 
+/**
+ * Mod(ifie)s cl_lock::cll_holds counter for a given lock. Also, for a
+ * top-lock (nesting == 0) accounts for this modification in the per-thread
+ * debugging counters. Sub-lock holds can be released by a thread different
+ * from one that acquired it.
+ */
 static void cl_lock_hold_mod(const struct lu_env *env, struct cl_lock *lock,
                              int delta)
 {
-        struct cl_thread_info   *cti;
-        struct cl_object_header *hdr;
+        struct cl_thread_counters *counters;
+        enum clt_nesting_level     nesting;
 
-        cti = cl_env_info(env);
-        hdr = cl_object_header(lock->cll_descr.cld_obj);
         lock->cll_holds += delta;
-        if (hdr->coh_nesting == 0) {
-                cti->clt_nr_held += delta;
-                LASSERT(cti->clt_nr_held >= 0);
+        nesting = cl_lock_nesting(lock);
+        if (nesting == CNL_TOP) {
+                counters = &cl_env_info(env)->clt_counters[CNL_TOP];
+                counters->ctc_nr_held += delta;
+                LASSERT(counters->ctc_nr_held >= 0);
         }
 }
 
+/**
+ * Mod(ifie)s cl_lock::cll_users counter for a given lock. See
+ * cl_lock_hold_mod() for the explanation of the debugging code.
+ */
 static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
                              int delta)
 {
-        struct cl_thread_info   *cti;
-        struct cl_object_header *hdr;
+        struct cl_thread_counters *counters;
+        enum clt_nesting_level     nesting;
 
-        cti = cl_env_info(env);
-        hdr = cl_object_header(lock->cll_descr.cld_obj);
         lock->cll_users += delta;
-        if (hdr->coh_nesting == 0) {
-                cti->clt_nr_used += delta;
-                LASSERT(cti->clt_nr_used >= 0);
+        nesting = cl_lock_nesting(lock);
+        if (nesting == CNL_TOP) {
+                counters = &cl_env_info(env)->clt_counters[CNL_TOP];
+                counters->ctc_nr_used += delta;
+                LASSERT(counters->ctc_nr_used >= 0);
         }
 }
 
@@ -1516,6 +1569,11 @@ EXPORT_SYMBOL(cl_lock_closure_fini);
  * cl_lock_put() to finish it.
  *
  * \pre atomic_read(&lock->cll_ref) > 0
+ * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
+ *           cl_lock_nr_mutexed(env) == 1)
+ *      [i.e., if a top-lock is deleted, mutices of no other locks can be
+ *      held, as deletion of sub-locks might require releasing a top-lock
+ *      mutex]
  *
  * \see cl_lock_operations::clo_delete()
  * \see cl_lock::cll_holds
@@ -1524,6 +1582,8 @@ void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock)
 {
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(cl_lock_invariant(env, lock));
+        LASSERT(ergo(cl_lock_nesting(lock) == CNL_TOP,
+                     cl_lock_nr_mutexed(env) == 1));
 
         ENTRY;
         if (lock->cll_holds == 0)
@@ -1540,6 +1600,10 @@ EXPORT_SYMBOL(cl_lock_delete);
  * time-out happens.
  *
  * \pre atomic_read(&lock->cll_ref) > 0
+ * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
+ *           cl_lock_nr_mutexed(env) == 1)
+ *      [i.e., if a top-lock failed, mutices of no other locks can be held, as
+ *      failing sub-locks might require releasing a top-lock mutex]
  *
  * \see clo_lock_delete()
  * \see cl_lock::cll_holds
@@ -1568,6 +1632,12 @@ EXPORT_SYMBOL(cl_lock_error);
  *
  * Cancellation notification is delivered to layers at most once.
  *
+ * \pre ergo(cl_lock_nesting(lock) == CNL_TOP,
+ *           cl_lock_nr_mutexed(env) == 1)
+ *      [i.e., if a top-lock is canceled, mutices of no other locks can be
+ *      held, as cancellation of sub-locks might require releasing a top-lock
+ *      mutex]
+ *
  * \see cl_lock_operations::clo_cancel()
  * \see cl_lock::cll_holds
  */
@@ -1575,6 +1645,9 @@ void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock)
 {
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(cl_lock_invariant(env, lock));
+        LASSERT(ergo(cl_lock_nesting(lock) == CNL_TOP,
+                     cl_lock_nr_mutexed(env) == 1));
+
         ENTRY;
         if (lock->cll_holds == 0)
                 cl_lock_cancel0(env, lock);
index 35a7cf5..e873554 100644 (file)
@@ -995,8 +995,12 @@ static void *cl_key_init(const struct lu_context *ctx,
         struct cl_thread_info *info;
 
         info = cl0_key_init(ctx, key);
-        if (!IS_ERR(info))
-                lu_ref_init(&info->clt_locks_locked);
+        if (!IS_ERR(info)) {
+                int i;
+
+                for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
+                        lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
+        }
         return info;
 }
 
@@ -1004,9 +1008,11 @@ static void cl_key_fini(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {
         struct cl_thread_info *info;
+        int i;
 
         info = data;
-        lu_ref_fini(&info->clt_locks_locked);
+        for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
+                lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
         cl0_key_fini(ctx, key, data);
 }
 
@@ -1014,14 +1020,16 @@ static void cl_key_exit(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {
         struct cl_thread_info *info = data;
+        int i;
 
-        LASSERT(info->clt_nr_locks_locked == 0);
-        LASSERT(info->clt_nr_held == 0);
-        LASSERT(info->clt_nr_used == 0);
-        LASSERT(info->clt_nr_locks_acquired == 0);
-
-        lu_ref_fini(&info->clt_locks_locked);
-        lu_ref_init(&info->clt_locks_locked);
+        for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i) {
+                LASSERT(info->clt_counters[i].ctc_nr_held == 0);
+                LASSERT(info->clt_counters[i].ctc_nr_used == 0);
+                LASSERT(info->clt_counters[i].ctc_nr_locks_acquired == 0);
+                LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
+                lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
+                lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
+        }
 }
 
 static struct lu_context_key cl_key = {