Whamcloud - gitweb
LU-11089 obd: rename lu_keys_guard to lu_context_remembered_guard
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 543efcc..fbe034a 100644 (file)
 
 #include <linux/module.h>
 #include <linux/list.h>
+#ifdef HAVE_PROCESSOR_H
+#include <linux/processor.h>
+#else
+#include <libcfs/linux/processor.h>
+#endif
+
 #include <libcfs/libcfs.h>
 #include <libcfs/libcfs_hash.h> /* hash_long() */
 #include <libcfs/linux/linux-mem.h>
 #include <lu_object.h>
 #include <lu_ref.h>
 
+struct lu_site_bkt_data {
+       /**
+        * LRU list, updated on each access to object. Protected by
+        * bucket lock of lu_site::ls_obj_hash.
+        *
+        * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
+        * moved to the lu_site::ls_lru.prev (this is due to the non-existence
+        * of list_for_each_entry_safe_reverse()).
+        */
+       struct list_head                lsb_lru;
+       /**
+        * Wait-queue signaled when an object in this site is ultimately
+        * destroyed (lu_object_free()). It is used by lu_object_find() to
+        * wait before re-trying when object in the process of destruction is
+        * found in the hash table.
+        *
+        * \see htable_lookup().
+        */
+       wait_queue_head_t               lsb_marche_funebre;
+};
+
 enum {
        LU_CACHE_PERCENT_MAX     = 50,
        LU_CACHE_PERCENT_DEFAULT = 20
@@ -86,6 +113,18 @@ MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
 
+wait_queue_head_t *
+lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
+{
+       struct cfs_hash_bd bd;
+       struct lu_site_bkt_data *bkt;
+
+       cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
+       bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
+       return &bkt->lsb_marche_funebre;
+}
+EXPORT_SYMBOL(lu_site_wq_from_fid);
+
 /**
  * Decrease reference counter on object. If last reference is freed, return
  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
@@ -94,22 +133,18 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
 void lu_object_put(const struct lu_env *env, struct lu_object *o)
 {
        struct lu_site_bkt_data *bkt;
-       struct lu_object_header *top;
-       struct lu_site *site;
-       struct lu_object *orig;
+       struct lu_object_header *top = o->lo_header;
+       struct lu_site *site = o->lo_dev->ld_site;
+       struct lu_object *orig = o;
        struct cfs_hash_bd bd;
-       const struct lu_fid *fid;
-
-       top  = o->lo_header;
-       site = o->lo_dev->ld_site;
-       orig = o;
+       const struct lu_fid *fid = lu_object_fid(o);
+       bool is_dying;
 
        /*
         * till we have full fids-on-OST implemented anonymous objects
         * are possible in OSP. such an object isn't listed in the site
         * so we should not remove it from the site.
         */
-       fid = lu_object_fid(o);
        if (fid_is_zero(fid)) {
                LASSERT(top->loh_hash.next == NULL
                        && top->loh_hash.pprev == NULL);
@@ -127,8 +162,14 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
        cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
        bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
 
+       is_dying = lu_object_is_dying(top);
        if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
-               if (lu_object_is_dying(top)) {
+               /* at this point the object reference is dropped and lock is
+                * not taken, so lu_object should not be touched because it
+                * can be freed by concurrent thread. Use local variable for
+                * check.
+                */
+               if (is_dying) {
                        /*
                         * somebody may be waiting for this, currently only
                         * used for cl_object, see cl_object_put_last().
@@ -147,15 +188,17 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                        o->lo_ops->loo_object_release(env, o);
        }
 
+       /* don't use local 'is_dying' here because if was taken without lock
+        * but here we need the latest actual value of it so check lu_object
+        * directly here.
+        */
        if (!lu_object_is_dying(top) &&
            (lu_object_exists(orig) || lu_object_is_cl(orig))) {
                LASSERT(list_empty(&top->loh_lru));
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
-               bkt->lsb_lru_len++;
                percpu_counter_inc(&site->ls_lru_len_counter);
-               CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p, "
-                      "lru_len: %ld\n",
-                      o, site->ls_obj_hash, bkt, bkt->lsb_lru_len);
+               CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n",
+                      orig, top, site->ls_obj_hash, bkt);
                cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
                return;
        }
@@ -214,7 +257,6 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
 
                        list_del_init(&top->loh_lru);
                        bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
-                       bkt->lsb_lru_len--;
                        percpu_counter_dec(&site->ls_lru_len_counter);
                }
                cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
@@ -301,15 +343,15 @@ next:
  */
 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 {
-       struct lu_site_bkt_data *bkt;
+       wait_queue_head_t *wq;
        struct lu_site          *site;
        struct lu_object        *scan;
        struct list_head        *layers;
        struct list_head         splice;
 
-        site   = o->lo_dev->ld_site;
-        layers = &o->lo_header->loh_layers;
-        bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
+       site = o->lo_dev->ld_site;
+       layers = &o->lo_header->loh_layers;
+       wq = lu_site_wq_from_fid(site, &o->lo_header->loh_fid);
         /*
          * First call ->loo_object_delete() method to release all resources.
          */
@@ -338,8 +380,8 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
                o->lo_ops->loo_object_free(env, o);
        }
 
-       if (waitqueue_active(&bkt->lsb_marche_funebre))
-               wake_up_all(&bkt->lsb_marche_funebre);
+       if (waitqueue_active(wq))
+               wake_up_all(wq);
 }
 
 /**
@@ -400,7 +442,6 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                         cfs_hash_bd_del_locked(s->ls_obj_hash,
                                                &bd2, &h->loh_hash);
                        list_move(&h->loh_lru, &dispose);
-                       bkt->lsb_lru_len--;
                        percpu_counter_dec(&s->ls_lru_len_counter);
                         if (did_sth == 0)
                                 did_sth = 1;
@@ -615,7 +656,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        if (!list_empty(&h->loh_lru)) {
                list_del_init(&h->loh_lru);
-               bkt->lsb_lru_len--;
                percpu_counter_dec(&s->ls_lru_len_counter);
        }
        return lu_object_top(h);
@@ -658,29 +698,6 @@ static void lu_object_limit(const struct lu_env *env,
                              MIN(size - nr, LU_CACHE_NR_MAX_ADJUST), 0);
 }
 
-static struct lu_object *lu_object_new(const struct lu_env *env,
-                                      struct lu_device *dev,
-                                      const struct lu_fid *f,
-                                      const struct lu_object_conf *conf)
-{
-       struct lu_object *o;
-       struct cfs_hash *hs;
-       struct cfs_hash_bd bd;
-
-       o = lu_object_alloc(env, dev, f, conf);
-       if (unlikely(IS_ERR(o)))
-               return o;
-
-       hs = dev->ld_site->ls_obj_hash;
-       cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-       cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-       cfs_hash_bd_unlock(hs, &bd, 1);
-
-       lu_object_limit(env, dev);
-
-       return o;
-}
-
 /**
  * Core logic of lu_object_find*() functions.
  *
@@ -717,34 +734,35 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
         * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
         * just alloc and insert directly.
         *
-        * If dying object is found during index search, add @waiter to the
-        * site wait-queue and return ERR_PTR(-EAGAIN).
         */
-       if (conf && conf->loc_flags & LOC_F_NEW)
-               return lu_object_new(env, dev, f, conf);
-
        s  = dev->ld_site;
        hs = s->ls_obj_hash;
-       cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-       o = htable_lookup(s, &bd, f, &version);
-       cfs_hash_bd_unlock(hs, &bd, 1);
-       if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
-               return o;
+       cfs_hash_bd_get(hs, f, &bd);
+       if (!(conf && conf->loc_flags & LOC_F_NEW)) {
+               cfs_hash_bd_lock(hs, &bd, 1);
+               o = htable_lookup(s, &bd, f, &version);
+               cfs_hash_bd_unlock(hs, &bd, 1);
 
+               if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
+                       return o;
+       }
        /*
         * Allocate new object. This may result in rather complicated
         * operations, including fld queries, inode loading, etc.
         */
        o = lu_object_alloc(env, dev, f, conf);
-       if (unlikely(IS_ERR(o)))
+       if (IS_ERR(o))
                return o;
 
        LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
        cfs_hash_bd_lock(hs, &bd, 1);
 
-       shadow = htable_lookup(s, &bd, f, &version);
-       if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
+       if (conf && conf->loc_flags & LOC_F_NEW)
+               shadow = ERR_PTR(-ENOENT);
+       else
+               shadow = htable_lookup(s, &bd, f, &version);
+       if (likely(PTR_ERR(shadow) == -ENOENT)) {
                cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
@@ -1333,8 +1351,7 @@ enum {
 
 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
 
-DEFINE_RWLOCK(lu_keys_guard);
-static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0);
+static DECLARE_RWSEM(lu_key_initing);
 
 /**
  * Global counter incremented whenever key is registered, unregistered,
@@ -1342,7 +1359,7 @@ static atomic_t lu_key_initing_cnt = ATOMIC_INIT(0);
  * lu_context_refill(). No locking is provided, as initialization and shutdown
  * are supposed to be externally serialized.
  */
-static unsigned key_set_version = 0;
+static atomic_t key_set_version = ATOMIC_INIT(0);
 
 /**
  * Register new key.
@@ -1358,19 +1375,23 @@ int lu_context_key_register(struct lu_context_key *key)
         LASSERT(key->lct_owner != NULL);
 
         result = -ENFILE;
-       write_lock(&lu_keys_guard);
+       atomic_set(&key->lct_used, 1);
+       lu_ref_init(&key->lct_reference);
         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                if (lu_keys[i] == NULL) {
-                        key->lct_index = i;
-                       atomic_set(&key->lct_used, 1);
-                        lu_keys[i] = key;
-                        lu_ref_init(&key->lct_reference);
-                        result = 0;
-                        ++key_set_version;
-                        break;
-                }
+               if (lu_keys[i])
+                       continue;
+               key->lct_index = i;
+               if (cmpxchg(&lu_keys[i], NULL, key) != NULL)
+                       continue;
+
+               result = 0;
+               atomic_inc(&key_set_version);
+               break;
         }
-       write_unlock(&lu_keys_guard);
+       if (result) {
+               lu_ref_fini(&key->lct_reference);
+               atomic_set(&key->lct_used, 0);
+       }
        return result;
 }
 EXPORT_SYMBOL(lu_context_key_register);
@@ -1383,11 +1404,12 @@ static void key_fini(struct lu_context *ctx, int index)
                 key = lu_keys[index];
                 LASSERT(key != NULL);
                 LASSERT(key->lct_fini != NULL);
-               LASSERT(atomic_read(&key->lct_used) > 1);
+               LASSERT(atomic_read(&key->lct_used) > 0);
 
                 key->lct_fini(ctx, key, ctx->lc_value[index]);
                 lu_ref_del(&key->lct_reference, "ctx", ctx);
-               atomic_dec(&key->lct_used);
+               if (atomic_dec_and_test(&key->lct_used))
+                       wake_up_var(&key->lct_used);
 
                LASSERT(key->lct_owner != NULL);
                if ((ctx->lc_tags & LCT_NOREF) == 0) {
@@ -1408,31 +1430,19 @@ void lu_context_key_degister(struct lu_context_key *key)
 
        lu_context_key_quiesce(key);
 
-       ++key_set_version;
-       write_lock(&lu_keys_guard);
        key_fini(&lu_shrink_env.le_ctx, key->lct_index);
 
        /**
         * Wait until all transient contexts referencing this key have
         * run lu_context_key::lct_fini() method.
         */
-       while (atomic_read(&key->lct_used) > 1) {
-               write_unlock(&lu_keys_guard);
-               CDEBUG(D_INFO, "lu_context_key_degister: \"%s\" %p, %d\n",
-                      key->lct_owner ? key->lct_owner->name : "", key,
-                      atomic_read(&key->lct_used));
-               schedule();
-               write_lock(&lu_keys_guard);
-       }
-       if (lu_keys[key->lct_index]) {
-               lu_keys[key->lct_index] = NULL;
+       atomic_dec(&key->lct_used);
+       wait_var_event(&key->lct_used, atomic_read(&key->lct_used) == 0);
+
+       if (!WARN_ON(lu_keys[key->lct_index] == NULL))
                lu_ref_fini(&key->lct_reference);
-       }
-       write_unlock(&lu_keys_guard);
 
-       LASSERTF(atomic_read(&key->lct_used) == 1,
-                "key has instances: %d\n",
-                atomic_read(&key->lct_used));
+       smp_store_release(&lu_keys[key->lct_index], NULL);
 }
 EXPORT_SYMBOL(lu_context_key_degister);
 
@@ -1534,6 +1544,7 @@ EXPORT_SYMBOL(lu_context_key_get);
  * List of remembered contexts. XXX document me.
  */
 static LIST_HEAD(lu_context_remembered);
+static DEFINE_SPINLOCK(lu_context_remembered_guard);
 
 /**
  * Destroy \a key in all remembered contexts. This is used to destroy key
@@ -1546,41 +1557,28 @@ void lu_context_key_quiesce(struct lu_context_key *key)
 
        if (!(key->lct_tags & LCT_QUIESCENT)) {
                 /*
-                 * XXX memory barrier has to go here.
+                * The write-lock on lu_key_initing will ensure that any
+                * keys_fill() which didn't see LCT_QUIESCENT will have
+                * finished before we call key_fini().
                  */
-               write_lock(&lu_keys_guard);
+               down_write(&lu_key_initing);
                key->lct_tags |= LCT_QUIESCENT;
+               up_write(&lu_key_initing);
 
-               /**
-                * Wait until all lu_context_key::lct_init() methods
-                * have completed.
-                */
-               while (atomic_read(&lu_key_initing_cnt) > 0) {
-                       write_unlock(&lu_keys_guard);
-                       CDEBUG(D_INFO, "lu_context_key_quiesce: \"%s\""
-                              " %p, %d (%d)\n",
-                              key->lct_owner ? key->lct_owner->name : "",
-                              key, atomic_read(&key->lct_used),
-                              atomic_read(&lu_key_initing_cnt));
-                       schedule();
-                       write_lock(&lu_keys_guard);
-               }
-
-               list_for_each_entry(ctx, &lu_context_remembered,
-                                   lc_remember)
+               spin_lock(&lu_context_remembered_guard);
+               list_for_each_entry(ctx, &lu_context_remembered, lc_remember) {
+                       spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING);
                        key_fini(ctx, key->lct_index);
+               }
 
-               ++key_set_version;
-               write_unlock(&lu_keys_guard);
+               spin_unlock(&lu_context_remembered_guard);
        }
 }
 
 void lu_context_key_revive(struct lu_context_key *key)
 {
-       write_lock(&lu_keys_guard);
        key->lct_tags &= ~LCT_QUIESCENT;
-       ++key_set_version;
-       write_unlock(&lu_keys_guard);
+       atomic_inc(&key_set_version);
 }
 
 static void keys_fini(struct lu_context *ctx)
@@ -1600,29 +1598,24 @@ static void keys_fini(struct lu_context *ctx)
 static int keys_fill(struct lu_context *ctx)
 {
        unsigned int i;
-       unsigned pre_version;
+       int rc = 0;
 
        /*
-        * A serialisation with lu_context_key_quiesce() is needed, but some
-        * "key->lct_init()" are calling kernel memory allocation routine and
-        * can't be called while holding a spin_lock.
-        * "lu_keys_guard" is held while incrementing "lu_key_initing_cnt"
-        * to ensure the start of the serialisation.
-        * An atomic_t variable is still used, in order not to reacquire the
-        * lock when decrementing the counter.
+        * A serialisation with lu_context_key_quiesce() is needed, to
+        * ensure we see LCT_QUIESCENT and don't allocate a new value
+        * after it freed one.  The rwsem provides this.  As down_read()
+        * does optimistic spinning while the writer is active, this is
+        * unlikely to ever sleep.
         */
-       read_lock(&lu_keys_guard);
-       atomic_inc(&lu_key_initing_cnt);
-       pre_version = key_set_version;
-       read_unlock(&lu_keys_guard);
+       down_read(&lu_key_initing);
+       ctx->lc_version = atomic_read(&key_set_version);
 
-refill:
-       LINVRNT(ctx->lc_value != NULL);
+       LINVRNT(ctx->lc_value);
        for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
                struct lu_context_key *key;
 
                key = lu_keys[i];
-               if (ctx->lc_value[i] == NULL && key != NULL &&
+               if (!ctx->lc_value[i] && key &&
                    (key->lct_tags & ctx->lc_tags) &&
                    /*
                     * Don't create values for a LCT_QUIESCENT key, as this
@@ -1643,8 +1636,8 @@ refill:
 
                        value = key->lct_init(ctx, key);
                        if (unlikely(IS_ERR(value))) {
-                               atomic_dec(&lu_key_initing_cnt);
-                               return PTR_ERR(value);
+                               rc = PTR_ERR(value);
+                               break;
                        }
 
                        lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
@@ -1660,18 +1653,8 @@ refill:
                }
        }
 
-       read_lock(&lu_keys_guard);
-       if (pre_version != key_set_version) {
-               pre_version = key_set_version;
-               read_unlock(&lu_keys_guard);
-               goto refill;
-       }
-
-       ctx->lc_version = key_set_version;
-
-       atomic_dec(&lu_key_initing_cnt);
-       read_unlock(&lu_keys_guard);
-       return 0;
+       up_read(&lu_key_initing);
+       return rc;
 }
 
 static int keys_init(struct lu_context *ctx)
@@ -1694,9 +1677,9 @@ int lu_context_init(struct lu_context *ctx, __u32 tags)
        ctx->lc_state = LCS_INITIALIZED;
        ctx->lc_tags = tags;
        if (tags & LCT_REMEMBER) {
-               write_lock(&lu_keys_guard);
+               spin_lock(&lu_context_remembered_guard);
                list_add(&ctx->lc_remember, &lu_context_remembered);
-               write_unlock(&lu_keys_guard);
+               spin_unlock(&lu_context_remembered_guard);
        } else {
                INIT_LIST_HEAD(&ctx->lc_remember);
        }
@@ -1719,14 +1702,13 @@ void lu_context_fini(struct lu_context *ctx)
 
        if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
                LASSERT(list_empty(&ctx->lc_remember));
-               keys_fini(ctx);
-
-       } else { /* could race with key degister */
-               write_lock(&lu_keys_guard);
-               keys_fini(ctx);
+       } else {
+               /* could race with key degister */
+               spin_lock(&lu_context_remembered_guard);
                list_del_init(&ctx->lc_remember);
-               write_unlock(&lu_keys_guard);
+               spin_unlock(&lu_context_remembered_guard);
        }
+       keys_fini(ctx);
 }
 EXPORT_SYMBOL(lu_context_fini);
 
@@ -1747,28 +1729,35 @@ void lu_context_exit(struct lu_context *ctx)
 {
        unsigned int i;
 
-        LINVRNT(ctx->lc_state == LCS_ENTERED);
-        ctx->lc_state = LCS_LEFT;
-        if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
-               /* could race with key quiescency */
-               if (ctx->lc_tags & LCT_REMEMBER)
-                       read_lock(&lu_keys_guard);
-
+       LINVRNT(ctx->lc_state == LCS_ENTERED);
+       /*
+        * Disable preempt to ensure we get a warning if
+        * any lct_exit ever tries to sleep.  That would hurt
+        * lu_context_key_quiesce() which spins waiting for us.
+        * This also ensure we aren't preempted while the state
+        * is LCS_LEAVING, as that too would cause problems for
+        * lu_context_key_quiesce().
+        */
+       preempt_disable();
+       /*
+        * Ensure lu_context_key_quiesce() sees LCS_LEAVING
+        * or we see LCT_QUIESCENT
+        */
+       smp_store_mb(ctx->lc_state, LCS_LEAVING);
+       if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) {
                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                       if (ctx->lc_value[i] != NULL) {
-                               struct lu_context_key *key;
-
-                               key = lu_keys[i];
-                               LASSERT(key != NULL);
-                               if (key->lct_exit != NULL)
-                                       key->lct_exit(ctx,
-                                                     key, ctx->lc_value[i]);
-                       }
-                }
+                       struct lu_context_key *key;
 
-               if (ctx->lc_tags & LCT_REMEMBER)
-                       read_unlock(&lu_keys_guard);
+                       key = lu_keys[i];
+                       if (ctx->lc_value[i] &&
+                           !(key->lct_tags & LCT_QUIESCENT) &&
+                           key->lct_exit)
+                               key->lct_exit(ctx, key, ctx->lc_value[i]);
+               }
         }
+
+       smp_store_release(&ctx->lc_state, LCS_LEFT);
+       preempt_enable();
 }
 EXPORT_SYMBOL(lu_context_exit);
 
@@ -1779,7 +1768,10 @@ EXPORT_SYMBOL(lu_context_exit);
  */
 int lu_context_refill(struct lu_context *ctx)
 {
-        return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
+       if (likely(ctx->lc_version == atomic_read(&key_set_version)))
+               return 0;
+
+       return keys_fill(ctx);
 }
 
 /**
@@ -1789,44 +1781,46 @@ int lu_context_refill(struct lu_context *ctx)
  * predefined when the lu_device type are registered, during the module probe
  * phase.
  */
-__u32 lu_context_tags_default = 0;
-__u32 lu_session_tags_default = 0;
+u32 lu_context_tags_default;
+u32 lu_session_tags_default;
 
+#ifdef HAVE_SERVER_SUPPORT
 void lu_context_tags_update(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_context_tags_default |= tags;
-       key_set_version++;
-       write_unlock(&lu_keys_guard);
+       atomic_inc(&key_set_version);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_context_tags_update);
 
 void lu_context_tags_clear(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_context_tags_default &= ~tags;
-       key_set_version++;
-       write_unlock(&lu_keys_guard);
+       atomic_inc(&key_set_version);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_context_tags_clear);
 
 void lu_session_tags_update(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_session_tags_default |= tags;
-       key_set_version++;
-       write_unlock(&lu_keys_guard);
+       atomic_inc(&key_set_version);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_session_tags_update);
 
 void lu_session_tags_clear(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_session_tags_default &= ~tags;
-       key_set_version++;
-       write_unlock(&lu_keys_guard);
+       atomic_inc(&key_set_version);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_session_tags_clear);
+#endif /* HAVE_SERVER_SUPPORT */
 
 int lu_env_init(struct lu_env *env, __u32 tags)
 {
@@ -1887,6 +1881,101 @@ int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
 }
 EXPORT_SYMBOL(lu_env_refill_by_tags);
 
+
+struct lu_env_item {
+       struct task_struct *lei_task;   /* rhashtable key */
+       struct rhash_head lei_linkage;
+       struct lu_env *lei_env;
+};
+
+static const struct rhashtable_params lu_env_rhash_params = {
+       .key_len     = sizeof(struct task_struct *),
+       .key_offset  = offsetof(struct lu_env_item, lei_task),
+       .head_offset = offsetof(struct lu_env_item, lei_linkage),
+    };
+
+struct rhashtable lu_env_rhash;
+
+struct lu_env_percpu {
+       struct task_struct *lep_task;
+       struct lu_env *lep_env ____cacheline_aligned_in_smp;
+};
+
+static struct lu_env_percpu lu_env_percpu[NR_CPUS];
+
+int lu_env_add(struct lu_env *env)
+{
+       struct lu_env_item *lei, *old;
+
+       LASSERT(env);
+
+       OBD_ALLOC_PTR(lei);
+       if (!lei)
+               return -ENOMEM;
+
+       lei->lei_task = current;
+       lei->lei_env = env;
+
+       old = rhashtable_lookup_get_insert_fast(&lu_env_rhash,
+                                               &lei->lei_linkage,
+                                               lu_env_rhash_params);
+       LASSERT(!old);
+
+       return 0;
+}
+EXPORT_SYMBOL(lu_env_add);
+
+void lu_env_remove(struct lu_env *env)
+{
+       struct lu_env_item *lei;
+       const void *task = current;
+       int i;
+
+       for_each_possible_cpu(i) {
+               if (lu_env_percpu[i].lep_env == env) {
+                       LASSERT(lu_env_percpu[i].lep_task == task);
+                       lu_env_percpu[i].lep_task = NULL;
+                       lu_env_percpu[i].lep_env = NULL;
+               }
+       }
+
+       rcu_read_lock();
+       lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
+                                    lu_env_rhash_params);
+       if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage,
+                                         lu_env_rhash_params) == 0)
+               OBD_FREE_PTR(lei);
+       rcu_read_unlock();
+}
+EXPORT_SYMBOL(lu_env_remove);
+
+struct lu_env *lu_env_find(void)
+{
+       struct lu_env *env = NULL;
+       struct lu_env_item *lei;
+       const void *task = current;
+       int i = get_cpu();
+
+       if (lu_env_percpu[i].lep_task == current) {
+               env = lu_env_percpu[i].lep_env;
+               put_cpu();
+               LASSERT(env);
+               return env;
+       }
+
+       lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
+                                    lu_env_rhash_params);
+       if (lei) {
+               env = lei->lei_env;
+               lu_env_percpu[i].lep_task = current;
+               lu_env_percpu[i].lep_env = env;
+       }
+       put_cpu();
+
+       return env;
+}
+EXPORT_SYMBOL(lu_env_find);
+
 static struct shrinker *lu_site_shrinker;
 
 typedef struct lu_site_stats{
@@ -1896,19 +1985,24 @@ typedef struct lu_site_stats{
         unsigned        lss_busy;
 } lu_site_stats_t;
 
-static void lu_site_stats_get(struct cfs_hash *hs,
+static void lu_site_stats_get(const struct lu_site *s,
                               lu_site_stats_t *stats, int populated)
 {
+       struct cfs_hash *hs = s->ls_obj_hash;
        struct cfs_hash_bd bd;
-       unsigned int  i;
+       unsigned int i;
+       /*
+        * percpu_counter_sum_positive() won't accept a const pointer
+        * as it does modify the struct by taking a spinlock
+        */
+       struct lu_site *s2 = (struct lu_site *)s;
 
+       stats->lss_busy += cfs_hash_size_get(hs) -
+               percpu_counter_sum_positive(&s2->ls_lru_len_counter);
         cfs_hash_for_each_bucket(hs, &bd, i) {
-                struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
-               struct hlist_head       *hhead;
+               struct hlist_head *hhead;
 
                 cfs_hash_bd_lock(hs, &bd, 1);
-               stats->lss_busy  +=
-                       cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len;
                 stats->lss_total += cfs_hash_bd_count_get(&bd);
                 stats->lss_max_search = max((int)stats->lss_max_search,
                                             cfs_hash_bd_depmax_get(&bd));
@@ -2097,7 +2191,7 @@ void lu_context_keys_dump(void)
  */
 int lu_global_init(void)
 {
-        int result;
+       int result;
        DEF_SHRINKER_VAR(shvar, lu_cache_shrink,
                         lu_cache_shrink_count, lu_cache_shrink_scan);
 
@@ -2132,6 +2226,8 @@ int lu_global_init(void)
         if (lu_site_shrinker == NULL)
                 return -ENOMEM;
 
+       result = rhashtable_init(&lu_env_rhash, &lu_env_rhash_params);
+
         return result;
 }
 
@@ -2155,6 +2251,8 @@ void lu_global_fini(void)
         lu_env_fini(&lu_shrink_env);
        up_write(&lu_sites_guard);
 
+       rhashtable_destroy(&lu_env_rhash);
+
         lu_ref_global_fini();
 }
 
@@ -2179,7 +2277,7 @@ int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
        lu_site_stats_t stats;
 
        memset(&stats, 0, sizeof(stats));
-       lu_site_stats_get(s->ls_obj_hash, &stats, 1);
+       lu_site_stats_get(s, &stats, 1);
 
        seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
                   stats.lss_busy,