Whamcloud - gitweb
LU-5710 all: second batch of corrected typos and grammar errors
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 2053bd3..3660990 100644 (file)
@@ -140,8 +140,6 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                return;
        }
 
-        LASSERT(bkt->lsb_busy > 0);
-        bkt->lsb_busy--;
         /*
          * When last reference is released, iterate over object
          * layers, and notify them that object is no longer busy.
@@ -151,15 +149,17 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                         o->lo_ops->loo_object_release(env, o);
         }
 
-        if (!lu_object_is_dying(top)) {
+       if (!lu_object_is_dying(top) &&
+           (lu_object_exists(orig) || lu_object_is_cl(orig))) {
                LASSERT(list_empty(&top->loh_lru));
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
+               bkt->lsb_lru_len++;
                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
                 return;
         }
 
         /*
-         * If object is dying (will not be cached), removed it
+        * If object is dying (will not be cached) then remove it
          * from hash table and LRU.
          *
          * This is done with hash table and LRU lists locked. As the only
@@ -206,7 +206,13 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
                cfs_hash_bd_t bd;
 
                cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
-               list_del_init(&top->loh_lru);
+               if (!list_empty(&top->loh_lru)) {
+                       struct lu_site_bkt_data *bkt;
+
+                       list_del_init(&top->loh_lru);
+                       bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
+                       bkt->lsb_lru_len--;
+               }
                cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
                cfs_hash_bd_unlock(obj_hash, &bd, 1);
        }
@@ -382,6 +388,7 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
                         cfs_hash_bd_del_locked(s->ls_obj_hash,
                                                &bd2, &h->loh_hash);
                        list_move(&h->loh_lru, &dispose);
+                       bkt->lsb_lru_len--;
                         if (did_sth == 0)
                                 did_sth = 1;
 
@@ -565,7 +572,6 @@ int lu_object_invariant(const struct lu_object *o)
         }
         return 1;
 }
-EXPORT_SYMBOL(lu_object_invariant);
 
 static struct lu_object *htable_lookup(struct lu_site *s,
                                       cfs_hash_bd_t *bd,
@@ -595,7 +601,10 @@ static struct lu_object *htable_lookup(struct lu_site *s,
         if (likely(!lu_object_is_dying(h))) {
                cfs_hash_get(s->ls_obj_hash, hnode);
                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
-               list_del_init(&h->loh_lru);
+               if (!list_empty(&h->loh_lru)) {
+                       list_del_init(&h->loh_lru);
+                       bkt->lsb_lru_len--;
+               }
                 return lu_object_top(h);
         }
 
@@ -615,31 +624,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
        return ERR_PTR(-EAGAIN);
 }
 
-static struct lu_object *htable_lookup_nowait(struct lu_site *s,
-                                             cfs_hash_bd_t *bd,
-                                             const struct lu_fid *f)
-{
-       struct hlist_node       *hnode;
-       struct lu_object_header *h;
-
-       /* cfs_hash_bd_peek_locked is a somehow "internal" function
-        * of cfs_hash, it doesn't add refcount on object. */
-       hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
-       if (hnode == NULL) {
-               lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
-               return ERR_PTR(-ENOENT);
-       }
-
-       h = container_of0(hnode, struct lu_object_header, loh_hash);
-       if (unlikely(lu_object_is_dying(h)))
-               return ERR_PTR(-ENOENT);
-
-       cfs_hash_get(s->ls_obj_hash, hnode);
-       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
-       list_del_init(&h->loh_lru);
-       return lu_object_top(h);
-}
-
 /**
  * Search cache for an object with the fid \a f. If such object is found,
  * return it. Otherwise, create new object, insert it into cache and return
@@ -685,7 +669,6 @@ static struct lu_object *lu_object_new(const struct lu_env *env,
         struct lu_object        *o;
         cfs_hash_t              *hs;
         cfs_hash_bd_t            bd;
-        struct lu_site_bkt_data *bkt;
 
         o = lu_object_alloc(env, dev, f, conf);
         if (unlikely(IS_ERR(o)))
@@ -693,9 +676,7 @@ static struct lu_object *lu_object_new(const struct lu_env *env,
 
         hs = dev->ld_site->ls_obj_hash;
         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-        bkt = cfs_hash_bd_extra_get(hs, &bd);
         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-        bkt->lsb_busy++;
         cfs_hash_bd_unlock(hs, &bd, 1);
 
        lu_object_limit(env, dev);
@@ -764,11 +745,7 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env,
 
         shadow = htable_lookup(s, &bd, f, waiter, &version);
        if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
-                struct lu_site_bkt_data *bkt;
-
-                bkt = cfs_hash_bd_extra_get(hs, &bd);
                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-                bkt->lsb_busy++;
                 cfs_hash_bd_unlock(hs, &bd, 1);
 
                lu_object_limit(env, dev);
@@ -815,30 +792,6 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
 EXPORT_SYMBOL(lu_object_find_at);
 
 /**
- * Try to find the object in cache without waiting for the dead object
- * to be released nor allocating object if no cached one was found.
- *
- * The found object will be set as LU_OBJECT_HEARD_BANSHEE for purging.
- */
-void lu_object_purge(const struct lu_env *env, struct lu_device *dev,
-                    const struct lu_fid *f)
-{
-       struct lu_site          *s  = dev->ld_site;
-       cfs_hash_t              *hs = s->ls_obj_hash;
-       cfs_hash_bd_t            bd;
-       struct lu_object        *o;
-
-       cfs_hash_bd_get_and_lock(hs, f, &bd, 1);
-       o = htable_lookup_nowait(s, &bd, f);
-       cfs_hash_bd_unlock(hs, &bd, 1);
-       if (!IS_ERR(o)) {
-               set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
-               lu_object_put(env, o);
-       }
-}
-EXPORT_SYMBOL(lu_object_purge);
-
-/**
  * Find object with given fid, and return its slice belonging to given device.
  */
 struct lu_object *lu_object_find_slice(const struct lu_env *env,
@@ -1049,14 +1002,7 @@ static void lu_obj_hop_get(cfs_hash_t *hs, struct hlist_node *hnode)
        struct lu_object_header *h;
 
        h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       if (atomic_add_return(1, &h->loh_ref) == 1) {
-               struct lu_site_bkt_data *bkt;
-               cfs_hash_bd_t            bd;
-
-               cfs_hash_bd_get(hs, &h->loh_fid, &bd);
-               bkt = cfs_hash_bd_extra_get(hs, &bd);
-               bkt->lsb_busy++;
-       }
+       atomic_inc(&h->loh_ref);
 }
 
 static void lu_obj_hop_put_locked(cfs_hash_t *hs, struct hlist_node *hnode)
@@ -1408,7 +1354,6 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
                 }
         }
 }
-EXPORT_SYMBOL(lu_stack_fini);
 
 enum {
         /**
@@ -1420,6 +1365,7 @@ enum {
 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
 
 static DEFINE_SPINLOCK(lu_keys_guard);
+static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0);
 
 /**
  * Global counter incremented whenever key is registered, unregistered,
@@ -1496,6 +1442,19 @@ void lu_context_key_degister(struct lu_context_key *key)
        ++key_set_version;
        spin_lock(&lu_keys_guard);
        key_fini(&lu_shrink_env.le_ctx, key->lct_index);
+
+       /**
+        * Wait until all transient contexts referencing this key have
+        * run lu_context_key::lct_fini() method.
+        */
+       while (atomic_read(&key->lct_used) > 1) {
+               spin_unlock(&lu_keys_guard);
+               CDEBUG(D_INFO, "lu_context_key_degister: \"%s\" %p, %d\n",
+                      key->lct_owner ? key->lct_owner->name : "", key,
+                      atomic_read(&key->lct_used));
+               schedule();
+               spin_lock(&lu_keys_guard);
+       }
        if (lu_keys[key->lct_index]) {
                lu_keys[key->lct_index] = NULL;
                lu_ref_fini(&key->lct_reference);
@@ -1622,11 +1581,27 @@ void lu_context_key_quiesce(struct lu_context_key *key)
                  * XXX layering violation.
                  */
                 cl_env_cache_purge(~0);
-                key->lct_tags |= LCT_QUIESCENT;
                 /*
                  * XXX memory barrier has to go here.
                  */
                spin_lock(&lu_keys_guard);
+               key->lct_tags |= LCT_QUIESCENT;
+
+               /**
+                * Wait until all lu_context_key::lct_init() methods
+                * have completed.
+                */
+               while (atomic_read(&lu_key_initing_cnt) > 0) {
+                       spin_unlock(&lu_keys_guard);
+                       CDEBUG(D_INFO, "lu_context_key_quiesce: \"%s\""
+                              " %p, %d (%d)\n",
+                              key->lct_owner ? key->lct_owner->name : "",
+                              key, atomic_read(&key->lct_used),
+                              atomic_read(&lu_key_initing_cnt));
+                       schedule();
+                       spin_lock(&lu_keys_guard);
+               }
+
                list_for_each_entry(ctx, &lu_context_remembered,
                                    lc_remember)
                        key_fini(ctx, key->lct_index);
@@ -1634,14 +1609,12 @@ void lu_context_key_quiesce(struct lu_context_key *key)
                ++key_set_version;
        }
 }
-EXPORT_SYMBOL(lu_context_key_quiesce);
 
 void lu_context_key_revive(struct lu_context_key *key)
 {
         key->lct_tags &= ~LCT_QUIESCENT;
         ++key_set_version;
 }
-EXPORT_SYMBOL(lu_context_key_revive);
 
 static void keys_fini(struct lu_context *ctx)
 {
@@ -1661,6 +1634,19 @@ static int keys_fill(struct lu_context *ctx)
 {
        unsigned int i;
 
+       /*
+        * A serialisation with lu_context_key_quiesce() is needed, but some
+        * "key->lct_init()" are calling kernel memory allocation routine and
+        * can't be called while holding a spin_lock.
+        * "lu_keys_guard" is held while incrementing "lu_key_initing_cnt"
+        * to ensure the start of the serialisation.
+        * An atomic_t variable is still used, in order not to reacquire the
+        * lock when decrementing the counter.
+        */
+       spin_lock(&lu_keys_guard);
+       atomic_inc(&lu_key_initing_cnt);
+       spin_unlock(&lu_keys_guard);
+
         LINVRNT(ctx->lc_value != NULL);
         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
                 struct lu_context_key *key;
@@ -1678,13 +1664,19 @@ static int keys_fill(struct lu_context *ctx)
                         LINVRNT(key->lct_init != NULL);
                         LINVRNT(key->lct_index == i);
 
-                        value = key->lct_init(ctx, key);
-                        if (unlikely(IS_ERR(value)))
-                                return PTR_ERR(value);
-
                        LASSERT(key->lct_owner != NULL);
-                       if (!(ctx->lc_tags & LCT_NOREF))
-                               try_module_get(key->lct_owner);
+                       if (!(ctx->lc_tags & LCT_NOREF) &&
+                           try_module_get(key->lct_owner) == 0) {
+                               /* module is unloading, skip this key */
+                               continue;
+                       }
+
+                       value = key->lct_init(ctx, key);
+                       if (unlikely(IS_ERR(value))) {
+                               atomic_dec(&lu_key_initing_cnt);
+                               return PTR_ERR(value);
+                       }
+
                        lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
                        atomic_inc(&key->lct_used);
                         /*
@@ -1698,6 +1690,7 @@ static int keys_fill(struct lu_context *ctx)
                 }
                 ctx->lc_version = key_set_version;
         }
+       atomic_dec(&lu_key_initing_cnt);
         return 0;
 }
 
@@ -1778,15 +1771,20 @@ void lu_context_exit(struct lu_context *ctx)
         ctx->lc_state = LCS_LEFT;
         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                        if (ctx->lc_value[i] != NULL) {
-                                struct lu_context_key *key;
-
-                                key = lu_keys[i];
-                                LASSERT(key != NULL);
-                                if (key->lct_exit != NULL)
-                                        key->lct_exit(ctx,
-                                                      key, ctx->lc_value[i]);
-                        }
+                       /* could race with key quiescency */
+                       if (ctx->lc_tags & LCT_REMEMBER)
+                               spin_lock(&lu_keys_guard);
+                       if (ctx->lc_value[i] != NULL) {
+                               struct lu_context_key *key;
+
+                               key = lu_keys[i];
+                               LASSERT(key != NULL);
+                               if (key->lct_exit != NULL)
+                                       key->lct_exit(ctx,
+                                                     key, ctx->lc_value[i]);
+                       }
+                       if (ctx->lc_tags & LCT_REMEMBER)
+                               spin_unlock(&lu_keys_guard);
                 }
         }
 }
@@ -1801,7 +1799,6 @@ int lu_context_refill(struct lu_context *ctx)
 {
         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
 }
-EXPORT_SYMBOL(lu_context_refill);
 
 /**
  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
@@ -1928,7 +1925,8 @@ static void lu_site_stats_get(cfs_hash_t *hs,
                struct hlist_head       *hhead;
 
                 cfs_hash_bd_lock(hs, &bd, 1);
-                stats->lss_busy  += bkt->lsb_busy;
+               stats->lss_busy  +=
+                       cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len;
                 stats->lss_total += cfs_hash_bd_count_get(&bd);
                 stats->lss_max_search = max((int)stats->lss_max_search,
                                             cfs_hash_bd_depmax_get(&bd));
@@ -2023,7 +2021,7 @@ static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
  * is safe to take the lu_sites_guard lock.
  *
  * Ideally we should accurately return the remaining number of cached
- * objects without taking the  lu_sites_guard lock, but this is not
+ * objects without taking the lu_sites_guard lock, but this is not
  * possible in the current implementation.
  */
 static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
@@ -2098,7 +2096,6 @@ void lu_context_keys_dump(void)
                 }
         }
 }
-EXPORT_SYMBOL(lu_context_keys_dump);
 
 /**
  * Initialization of global lu_* data.
@@ -2228,7 +2225,6 @@ int lu_site_stats_print(const struct lu_site *s, char *page, int count)
                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
 }
-EXPORT_SYMBOL(lu_site_stats_print);
 
 /**
  * Helper function to initialize a number of kmem slab caches at once.
@@ -2277,7 +2273,6 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
 {
        struct lu_site          *s = o->lo_dev->ld_site;
        struct lu_fid           *old = &o->lo_header->loh_fid;
-       struct lu_site_bkt_data *bkt;
        struct lu_object        *shadow;
        wait_queue_t             waiter;
        cfs_hash_t              *hs;
@@ -2292,9 +2287,7 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
        /* supposed to be unique */
        LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
        *old = *fid;
-       bkt = cfs_hash_bd_extra_get(hs, &bd);
        cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-       bkt->lsb_busy++;
        cfs_hash_bd_unlock(hs, &bd, 1);
 }
 EXPORT_SYMBOL(lu_object_assign_fid);
@@ -2393,4 +2386,3 @@ int lu_buf_check_and_grow(struct lu_buf *buf, size_t len)
        buf->lb_len = len;
        return 0;
 }
-EXPORT_SYMBOL(lu_buf_check_and_grow);