Whamcloud - gitweb
LU-5710 all: second batch of corrected typos and grammar errors
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 6384421..3660990 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -140,8 +140,6 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                return;
        }
 
-        LASSERT(bkt->lsb_busy > 0);
-        bkt->lsb_busy--;
         /*
          * When last reference is released, iterate over object
          * layers, and notify them that object is no longer busy.
@@ -151,15 +149,17 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                         o->lo_ops->loo_object_release(env, o);
         }
 
-        if (!lu_object_is_dying(top)) {
+       if (!lu_object_is_dying(top) &&
+           (lu_object_exists(orig) || lu_object_is_cl(orig))) {
                LASSERT(list_empty(&top->loh_lru));
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
+               bkt->lsb_lru_len++;
                 cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
                 return;
         }
 
         /*
-         * If object is dying (will not be cached), removed it
+        * If object is dying (will not be cached) then remove it
          * from hash table and LRU.
          *
          * This is done with hash table and LRU lists locked. As the only
@@ -206,7 +206,13 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
                cfs_hash_bd_t bd;
 
                cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
-               list_del_init(&top->loh_lru);
+               if (!list_empty(&top->loh_lru)) {
+                       struct lu_site_bkt_data *bkt;
+
+                       list_del_init(&top->loh_lru);
+                       bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
+                       bkt->lsb_lru_len--;
+               }
                cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
                cfs_hash_bd_unlock(obj_hash, &bd, 1);
        }
@@ -343,8 +349,8 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
         cfs_hash_bd_t            bd;
         cfs_hash_bd_t            bd2;
        struct list_head         dispose;
-        int                      did_sth;
-        int                      start;
+       int                      did_sth;
+       unsigned int             start;
         int                      count;
         int                      bnr;
        unsigned int             i;
@@ -358,7 +364,7 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
          * the dispose list, removing them from LRU and hash table.
          */
         start = s->ls_purge_start;
-        bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
+       bnr = (nr == ~0) ? -1 : nr / (int)CFS_HASH_NBKT(s->ls_obj_hash) + 1;
  again:
        /*
         * It doesn't make any sense to make purge threads parallel, that can
@@ -382,6 +388,7 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
                         cfs_hash_bd_del_locked(s->ls_obj_hash,
                                                &bd2, &h->loh_hash);
                        list_move(&h->loh_lru, &dispose);
+                       bkt->lsb_lru_len--;
                         if (did_sth == 0)
                                 did_sth = 1;
 
@@ -461,7 +468,7 @@ LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
  * Key, holding temporary buffer. This key is registered very early by
  * lu_global_init().
  */
-struct lu_context_key lu_global_key = {
+static struct lu_context_key lu_global_key = {
        .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
                    LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
        .lct_init = lu_global_key_init,
@@ -565,7 +572,6 @@ int lu_object_invariant(const struct lu_object *o)
         }
         return 1;
 }
-EXPORT_SYMBOL(lu_object_invariant);
 
 static struct lu_object *htable_lookup(struct lu_site *s,
                                       cfs_hash_bd_t *bd,
@@ -595,7 +601,10 @@ static struct lu_object *htable_lookup(struct lu_site *s,
         if (likely(!lu_object_is_dying(h))) {
                cfs_hash_get(s->ls_obj_hash, hnode);
                 lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
-               list_del_init(&h->loh_lru);
+               if (!list_empty(&h->loh_lru)) {
+                       list_del_init(&h->loh_lru);
+                       bkt->lsb_lru_len--;
+               }
                 return lu_object_top(h);
         }
 
@@ -615,31 +624,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
        return ERR_PTR(-EAGAIN);
 }
 
-static struct lu_object *htable_lookup_nowait(struct lu_site *s,
-                                             cfs_hash_bd_t *bd,
-                                             const struct lu_fid *f)
-{
-       struct hlist_node       *hnode;
-       struct lu_object_header *h;
-
-       /* cfs_hash_bd_peek_locked is a somehow "internal" function
-        * of cfs_hash, it doesn't add refcount on object. */
-       hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
-       if (hnode == NULL) {
-               lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
-               return ERR_PTR(-ENOENT);
-       }
-
-       h = container_of0(hnode, struct lu_object_header, loh_hash);
-       if (unlikely(lu_object_is_dying(h)))
-               return ERR_PTR(-ENOENT);
-
-       cfs_hash_get(s->ls_obj_hash, hnode);
-       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
-       list_del_init(&h->loh_lru);
-       return lu_object_top(h);
-}
-
 /**
  * Search cache for an object with the fid \a f. If such object is found,
  * return it. Otherwise, create new object, insert it into cache and return
@@ -685,7 +669,6 @@ static struct lu_object *lu_object_new(const struct lu_env *env,
         struct lu_object        *o;
         cfs_hash_t              *hs;
         cfs_hash_bd_t            bd;
-        struct lu_site_bkt_data *bkt;
 
         o = lu_object_alloc(env, dev, f, conf);
         if (unlikely(IS_ERR(o)))
@@ -693,9 +676,7 @@ static struct lu_object *lu_object_new(const struct lu_env *env,
 
         hs = dev->ld_site->ls_obj_hash;
         cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-        bkt = cfs_hash_bd_extra_get(hs, &bd);
         cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-        bkt->lsb_busy++;
         cfs_hash_bd_unlock(hs, &bd, 1);
 
        lu_object_limit(env, dev);
@@ -764,11 +745,7 @@ static struct lu_object *lu_object_find_try(const struct lu_env *env,
 
         shadow = htable_lookup(s, &bd, f, waiter, &version);
        if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
-                struct lu_site_bkt_data *bkt;
-
-                bkt = cfs_hash_bd_extra_get(hs, &bd);
                 cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-                bkt->lsb_busy++;
                 cfs_hash_bd_unlock(hs, &bd, 1);
 
                lu_object_limit(env, dev);
@@ -796,13 +773,10 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        struct lu_object        *obj;
        wait_queue_t           wait;
 
-       while (1) {
-               if (conf != NULL && conf->loc_flags & LOC_F_NOWAIT) {
-                       obj = lu_object_find_try(env, dev, f, conf, NULL);
-
-                       return obj;
-               }
+       if (conf != NULL && conf->loc_flags & LOC_F_NOWAIT)
+               return lu_object_find_try(env, dev, f, conf, NULL);
 
+       while (1) {
                obj = lu_object_find_try(env, dev, f, conf, &wait);
                if (obj != ERR_PTR(-EAGAIN))
                        return obj;
@@ -818,30 +792,6 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
 EXPORT_SYMBOL(lu_object_find_at);
 
 /**
- * Try to find the object in cache without waiting for the dead object
- * to be released nor allocating object if no cached one was found.
- *
- * The found object will be set as LU_OBJECT_HEARD_BANSHEE for purging.
- */
-void lu_object_purge(const struct lu_env *env, struct lu_device *dev,
-                    const struct lu_fid *f)
-{
-       struct lu_site          *s  = dev->ld_site;
-       cfs_hash_t              *hs = s->ls_obj_hash;
-       cfs_hash_bd_t            bd;
-       struct lu_object        *o;
-
-       cfs_hash_bd_get_and_lock(hs, f, &bd, 1);
-       o = htable_lookup_nowait(s, &bd, f);
-       cfs_hash_bd_unlock(hs, &bd, 1);
-       if (!IS_ERR(o)) {
-               set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
-               lu_object_put(env, o);
-       }
-}
-EXPORT_SYMBOL(lu_object_purge);
-
-/**
  * Find object with given fid, and return its slice belonging to given device.
  */
 struct lu_object *lu_object_find_slice(const struct lu_env *env,
@@ -849,17 +799,20 @@ struct lu_object *lu_object_find_slice(const struct lu_env *env,
                                        const struct lu_fid *f,
                                        const struct lu_object_conf *conf)
 {
-        struct lu_object *top;
-        struct lu_object *obj;
+       struct lu_object *top;
+       struct lu_object *obj;
+
+       top = lu_object_find(env, dev, f, conf);
+       if (IS_ERR(top))
+               return top;
+
+       obj = lu_object_locate(top->lo_header, dev->ld_type);
+       if (unlikely(obj == NULL)) {
+               lu_object_put(env, top);
+               obj = ERR_PTR(-ENOENT);
+       }
 
-        top = lu_object_find(env, dev, f, conf);
-        if (!IS_ERR(top)) {
-                obj = lu_object_locate(top->lo_header, dev->ld_type);
-                if (obj == NULL)
-                        lu_object_put(env, top);
-        } else
-                obj = top;
-        return obj;
+       return obj;
 }
 EXPORT_SYMBOL(lu_object_find_slice);
 
@@ -954,10 +907,10 @@ EXPORT_SYMBOL(lu_site_print);
 /**
  * Return desired hash table order.
  */
-static unsigned int lu_htable_order(struct lu_device *top)
+static unsigned long lu_htable_order(struct lu_device *top)
 {
        unsigned long cache_size;
-       unsigned int  bits;
+       unsigned long bits;
 
        /*
         * For ZFS based OSDs the cache should be disabled by default.  This
@@ -1049,14 +1002,7 @@ static void lu_obj_hop_get(cfs_hash_t *hs, struct hlist_node *hnode)
        struct lu_object_header *h;
 
        h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       if (atomic_add_return(1, &h->loh_ref) == 1) {
-               struct lu_site_bkt_data *bkt;
-               cfs_hash_bd_t            bd;
-
-               cfs_hash_bd_get(hs, &h->loh_fid, &bd);
-               bkt = cfs_hash_bd_extra_get(hs, &bd);
-               bkt->lsb_busy++;
-       }
+       atomic_inc(&h->loh_ref);
 }
 
 static void lu_obj_hop_put_locked(cfs_hash_t *hs, struct hlist_node *hnode)
@@ -1064,7 +1010,7 @@ static void lu_obj_hop_put_locked(cfs_hash_t *hs, struct hlist_node *hnode)
         LBUG(); /* we should never called it */
 }
 
-cfs_hash_ops_t lu_site_hash_ops = {
+static cfs_hash_ops_t lu_site_hash_ops = {
         .hs_hash        = lu_obj_hop_hash,
         .hs_key         = lu_obj_hop_key,
         .hs_keycmp      = lu_obj_hop_keycmp,
@@ -1098,14 +1044,14 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        struct lu_site_bkt_data *bkt;
        cfs_hash_bd_t bd;
        char name[16];
-       unsigned int bits;
+       unsigned long bits;
        unsigned int i;
        ENTRY;
 
        memset(s, 0, sizeof *s);
        mutex_init(&s->ls_purge_mutex);
        bits = lu_htable_order(top);
-       snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
+       snprintf(name, sizeof(name), "lu_site_%s", top->ld_type->ldt_name);
        for (bits = clamp_t(typeof(bits), bits,
                            LU_SITE_BITS_MIN, LU_SITE_BITS_MAX);
             bits >= LU_SITE_BITS_MIN; bits--) {
@@ -1123,7 +1069,7 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        }
 
        if (s->ls_obj_hash == NULL) {
-               CERROR("failed to create lu_site hash with bits: %d\n", bits);
+               CERROR("failed to create lu_site hash with bits: %lu\n", bits);
                return -ENOMEM;
        }
 
@@ -1408,7 +1354,6 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
                 }
         }
 }
-EXPORT_SYMBOL(lu_stack_fini);
 
 enum {
         /**
@@ -1420,6 +1365,7 @@ enum {
 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
 
 static DEFINE_SPINLOCK(lu_keys_guard);
+static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0);
 
 /**
  * Global counter incremented whenever key is registered, unregistered,
@@ -1496,6 +1442,19 @@ void lu_context_key_degister(struct lu_context_key *key)
        ++key_set_version;
        spin_lock(&lu_keys_guard);
        key_fini(&lu_shrink_env.le_ctx, key->lct_index);
+
+       /**
+        * Wait until all transient contexts referencing this key have
+        * run lu_context_key::lct_fini() method.
+        */
+       while (atomic_read(&key->lct_used) > 1) {
+               spin_unlock(&lu_keys_guard);
+               CDEBUG(D_INFO, "lu_context_key_degister: \"%s\" %p, %d\n",
+                      key->lct_owner ? key->lct_owner->name : "", key,
+                      atomic_read(&key->lct_used));
+               schedule();
+               spin_lock(&lu_keys_guard);
+       }
        if (lu_keys[key->lct_index]) {
                lu_keys[key->lct_index] = NULL;
                lu_ref_fini(&key->lct_reference);
@@ -1622,11 +1581,27 @@ void lu_context_key_quiesce(struct lu_context_key *key)
                  * XXX layering violation.
                  */
                 cl_env_cache_purge(~0);
-                key->lct_tags |= LCT_QUIESCENT;
                 /*
                  * XXX memory barrier has to go here.
                  */
                spin_lock(&lu_keys_guard);
+               key->lct_tags |= LCT_QUIESCENT;
+
+               /**
+                * Wait until all lu_context_key::lct_init() methods
+                * have completed.
+                */
+               while (atomic_read(&lu_key_initing_cnt) > 0) {
+                       spin_unlock(&lu_keys_guard);
+                       CDEBUG(D_INFO, "lu_context_key_quiesce: \"%s\""
+                              " %p, %d (%d)\n",
+                              key->lct_owner ? key->lct_owner->name : "",
+                              key, atomic_read(&key->lct_used),
+                              atomic_read(&lu_key_initing_cnt));
+                       schedule();
+                       spin_lock(&lu_keys_guard);
+               }
+
                list_for_each_entry(ctx, &lu_context_remembered,
                                    lc_remember)
                        key_fini(ctx, key->lct_index);
@@ -1634,14 +1609,12 @@ void lu_context_key_quiesce(struct lu_context_key *key)
                ++key_set_version;
        }
 }
-EXPORT_SYMBOL(lu_context_key_quiesce);
 
 void lu_context_key_revive(struct lu_context_key *key)
 {
         key->lct_tags &= ~LCT_QUIESCENT;
         ++key_set_version;
 }
-EXPORT_SYMBOL(lu_context_key_revive);
 
 static void keys_fini(struct lu_context *ctx)
 {
@@ -1661,6 +1634,19 @@ static int keys_fill(struct lu_context *ctx)
 {
        unsigned int i;
 
+       /*
+        * A serialisation with lu_context_key_quiesce() is needed, but some
+        * "key->lct_init()" are calling kernel memory allocation routine and
+        * can't be called while holding a spin_lock.
+        * "lu_keys_guard" is held while incrementing "lu_key_initing_cnt"
+        * to ensure the start of the serialisation.
+        * An atomic_t variable is still used, in order not to reacquire the
+        * lock when decrementing the counter.
+        */
+       spin_lock(&lu_keys_guard);
+       atomic_inc(&lu_key_initing_cnt);
+       spin_unlock(&lu_keys_guard);
+
         LINVRNT(ctx->lc_value != NULL);
         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
                 struct lu_context_key *key;
@@ -1678,13 +1664,19 @@ static int keys_fill(struct lu_context *ctx)
                         LINVRNT(key->lct_init != NULL);
                         LINVRNT(key->lct_index == i);
 
-                        value = key->lct_init(ctx, key);
-                        if (unlikely(IS_ERR(value)))
-                                return PTR_ERR(value);
-
                        LASSERT(key->lct_owner != NULL);
-                       if (!(ctx->lc_tags & LCT_NOREF))
-                               try_module_get(key->lct_owner);
+                       if (!(ctx->lc_tags & LCT_NOREF) &&
+                           try_module_get(key->lct_owner) == 0) {
+                               /* module is unloading, skip this key */
+                               continue;
+                       }
+
+                       value = key->lct_init(ctx, key);
+                       if (unlikely(IS_ERR(value))) {
+                               atomic_dec(&lu_key_initing_cnt);
+                               return PTR_ERR(value);
+                       }
+
                        lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
                        atomic_inc(&key->lct_used);
                         /*
@@ -1698,6 +1690,7 @@ static int keys_fill(struct lu_context *ctx)
                 }
                 ctx->lc_version = key_set_version;
         }
+       atomic_dec(&lu_key_initing_cnt);
         return 0;
 }
 
@@ -1778,15 +1771,20 @@ void lu_context_exit(struct lu_context *ctx)
         ctx->lc_state = LCS_LEFT;
         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                        if (ctx->lc_value[i] != NULL) {
-                                struct lu_context_key *key;
-
-                                key = lu_keys[i];
-                                LASSERT(key != NULL);
-                                if (key->lct_exit != NULL)
-                                        key->lct_exit(ctx,
-                                                      key, ctx->lc_value[i]);
-                        }
+                       /* could race with key quiescency */
+                       if (ctx->lc_tags & LCT_REMEMBER)
+                               spin_lock(&lu_keys_guard);
+                       if (ctx->lc_value[i] != NULL) {
+                               struct lu_context_key *key;
+
+                               key = lu_keys[i];
+                               LASSERT(key != NULL);
+                               if (key->lct_exit != NULL)
+                                       key->lct_exit(ctx,
+                                                     key, ctx->lc_value[i]);
+                       }
+                       if (ctx->lc_tags & LCT_REMEMBER)
+                               spin_unlock(&lu_keys_guard);
                 }
         }
 }
@@ -1801,7 +1799,6 @@ int lu_context_refill(struct lu_context *ctx)
 {
         return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
 }
-EXPORT_SYMBOL(lu_context_refill);
 
 /**
  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
@@ -1928,7 +1925,8 @@ static void lu_site_stats_get(cfs_hash_t *hs,
                struct hlist_head       *hhead;
 
                 cfs_hash_bd_lock(hs, &bd, 1);
-                stats->lss_busy  += bkt->lsb_busy;
+               stats->lss_busy  +=
+                       cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len;
                 stats->lss_total += cfs_hash_bd_count_get(&bd);
                 stats->lss_max_search = max((int)stats->lss_max_search,
                                             cfs_hash_bd_depmax_get(&bd));
@@ -2023,7 +2021,7 @@ static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
  * is safe to take the lu_sites_guard lock.
  *
  * Ideally we should accurately return the remaining number of cached
- * objects without taking the  lu_sites_guard lock, but this is not
+ * objects without taking the lu_sites_guard lock, but this is not
  * possible in the current implementation.
  */
 static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
@@ -2058,13 +2056,13 @@ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
 /**
  * Environment to be used in debugger, contains all tags.
  */
-struct lu_env lu_debugging_env;
+static struct lu_env lu_debugging_env;
 
 /**
  * Debugging printer function using printk().
  */
 int lu_printk_printer(const struct lu_env *env,
-                      void *unused, const char *format, ...)
+                     void *unused, const char *format, ...)
 {
         va_list args;
 
@@ -2098,7 +2096,6 @@ void lu_context_keys_dump(void)
                 }
         }
 }
-EXPORT_SYMBOL(lu_context_keys_dump);
 
 /**
  * Initialization of global lu_* data.
@@ -2172,7 +2169,7 @@ void lu_global_fini(void)
 
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
 {
-#ifdef LPROCFS
+#ifdef CONFIG_PROC_FS
         struct lprocfs_counter ret;
 
         lprocfs_stats_collect(stats, idx, &ret);
@@ -2228,7 +2225,6 @@ int lu_site_stats_print(const struct lu_site *s, char *page, int count)
                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
 }
-EXPORT_SYMBOL(lu_site_stats_print);
 
 /**
  * Helper function to initialize a number of kmem slab caches at once.
@@ -2277,7 +2273,6 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
 {
        struct lu_site          *s = o->lo_dev->ld_site;
        struct lu_fid           *old = &o->lo_header->loh_fid;
-       struct lu_site_bkt_data *bkt;
        struct lu_object        *shadow;
        wait_queue_t             waiter;
        cfs_hash_t              *hs;
@@ -2292,9 +2287,7 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
        /* supposed to be unique */
        LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
        *old = *fid;
-       bkt = cfs_hash_bd_extra_get(hs, &bd);
        cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-       bkt->lsb_busy++;
        cfs_hash_bd_unlock(hs, &bd, 1);
 }
 EXPORT_SYMBOL(lu_object_assign_fid);
@@ -2393,5 +2386,3 @@ int lu_buf_check_and_grow(struct lu_buf *buf, size_t len)
        buf->lb_len = len;
        return 0;
 }
-EXPORT_SYMBOL(lu_buf_check_and_grow);
-