Whamcloud - gitweb
LU-11814 obdcalss: ensure LCT_QUIESCENT take sync
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 5da1aa6..dce91d5 100644 (file)
@@ -1244,14 +1244,25 @@ void lu_device_put(struct lu_device *d)
 }
 EXPORT_SYMBOL(lu_device_put);
 
 }
 EXPORT_SYMBOL(lu_device_put);
 
+enum { /* Maximal number of tld slots. */
+       LU_CONTEXT_KEY_NR = 40
+};
+static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
+static DECLARE_RWSEM(lu_key_initing);
+
 /**
  * Initialize device \a d of type \a t.
  */
 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
 {
 /**
  * Initialize device \a d of type \a t.
  */
 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
 {
-       if (atomic_inc_return(&t->ldt_device_nr) == 1 &&
-           t->ldt_ops->ldto_start != NULL)
-               t->ldt_ops->ldto_start(t);
+       if (atomic_add_unless(&t->ldt_device_nr, 1, 0) == 0) {
+               down_write(&lu_key_initing);
+               if (t->ldt_ops->ldto_start &&
+                   atomic_read(&t->ldt_device_nr) == 0)
+                       t->ldt_ops->ldto_start(t);
+               atomic_inc(&t->ldt_device_nr);
+               up_write(&lu_key_initing);
+       }
 
        memset(d, 0, sizeof *d);
        d->ld_type = t;
 
        memset(d, 0, sizeof *d);
        d->ld_type = t;
@@ -1417,17 +1428,6 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
         }
 }
 
         }
 }
 
-enum {
-        /**
-         * Maximal number of tld slots.
-         */
-        LU_CONTEXT_KEY_NR = 40
-};
-
-static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
-
-static DECLARE_RWSEM(lu_key_initing);
-
 /**
  * Global counter incremented whenever key is registered, unregistered,
  * revived or quiesced. This is used to void unnecessary calls to
 /**
  * Global counter incremented whenever key is registered, unregistered,
  * revived or quiesced. This is used to void unnecessary calls to
@@ -1507,7 +1507,7 @@ void lu_context_key_degister(struct lu_context_key *key)
        LASSERT(atomic_read(&key->lct_used) >= 1);
        LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
 
        LASSERT(atomic_read(&key->lct_used) >= 1);
        LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
 
-       lu_context_key_quiesce(key);
+       lu_context_key_quiesce(NULL, key);
 
        key_fini(&lu_shrink_env.le_ctx, key->lct_index);
 
 
        key_fini(&lu_shrink_env.le_ctx, key->lct_index);
 
@@ -1593,16 +1593,17 @@ EXPORT_SYMBOL(lu_context_key_revive_many);
 /**
  * Quiescent a number of keys.
  */
 /**
  * Quiescent a number of keys.
  */
-void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
+void lu_context_key_quiesce_many(struct lu_device_type *t,
+                                struct lu_context_key *k, ...)
 {
 {
-        va_list args;
+       va_list args;
 
 
-        va_start(args, k);
-        do {
-                lu_context_key_quiesce(k);
-                k = va_arg(args, struct lu_context_key*);
-        } while (k != NULL);
-        va_end(args);
+       va_start(args, k);
+       do {
+               lu_context_key_quiesce(t, k);
+               k = va_arg(args, struct lu_context_key*);
+       } while (k != NULL);
+       va_end(args);
 }
 EXPORT_SYMBOL(lu_context_key_quiesce_many);
 
 }
 EXPORT_SYMBOL(lu_context_key_quiesce_many);
 
@@ -1630,18 +1631,22 @@ static DEFINE_SPINLOCK(lu_context_remembered_guard);
  * values in "shared" contexts (like service threads), when a module owning
  * the key is about to be unloaded.
  */
  * values in "shared" contexts (like service threads), when a module owning
  * the key is about to be unloaded.
  */
-void lu_context_key_quiesce(struct lu_context_key *key)
+void lu_context_key_quiesce(struct lu_device_type *t,
+                           struct lu_context_key *key)
 {
        struct lu_context *ctx;
 
 {
        struct lu_context *ctx;
 
+       if (key->lct_tags & LCT_QUIESCENT)
+               return;
+       /*
+        * The write-lock on lu_key_initing will ensure that any
+        * keys_fill() which didn't see LCT_QUIESCENT will have
+        * finished before we call key_fini().
+        */
+       down_write(&lu_key_initing);
        if (!(key->lct_tags & LCT_QUIESCENT)) {
        if (!(key->lct_tags & LCT_QUIESCENT)) {
-                /*
-                * The write-lock on lu_key_initing will ensure that any
-                * keys_fill() which didn't see LCT_QUIESCENT will have
-                * finished before we call key_fini().
-                 */
-               down_write(&lu_key_initing);
-               key->lct_tags |= LCT_QUIESCENT;
+               if (t == NULL || atomic_read(&t->ldt_device_nr) == 0)
+                       key->lct_tags |= LCT_QUIESCENT;
                up_write(&lu_key_initing);
 
                spin_lock(&lu_context_remembered_guard);
                up_write(&lu_key_initing);
 
                spin_lock(&lu_context_remembered_guard);
@@ -1649,9 +1654,11 @@ void lu_context_key_quiesce(struct lu_context_key *key)
                        spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING);
                        key_fini(ctx, key->lct_index);
                }
                        spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING);
                        key_fini(ctx, key->lct_index);
                }
-
                spin_unlock(&lu_context_remembered_guard);
                spin_unlock(&lu_context_remembered_guard);
+
+               return;
        }
        }
+       up_write(&lu_key_initing);
 }
 
 void lu_context_key_revive(struct lu_context_key *key)
 }
 
 void lu_context_key_revive(struct lu_context_key *key)