Whamcloud - gitweb
LU-8346 obdclass: Set lc_version
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index c478d51..751e3d1 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2016, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -44,8 +40,9 @@
 
 #define DEBUG_SUBSYSTEM S_CLASS
 
-#include <libcfs/libcfs.h>
 #include <linux/module.h>
+#include <linux/list.h>
+#include <libcfs/libcfs.h>
 #include <libcfs/libcfs_hash.h> /* hash_long() */
 #include <obd_class.h>
 #include <obd_support.h>
 #include <lustre_fid.h>
 #include <lu_object.h>
 #include <lu_ref.h>
-#include <libcfs/list.h>
 
 enum {
        LU_CACHE_PERCENT_MAX     = 50,
        LU_CACHE_PERCENT_DEFAULT = 20
 };
 
-#define        LU_CACHE_NR_MAX_ADJUST          128
+#define        LU_CACHE_NR_MAX_ADJUST          512
 #define        LU_CACHE_NR_UNLIMITED           -1
 #define        LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
 #define        LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
@@ -69,6 +65,7 @@ enum {
 
 #define LU_SITE_BITS_MIN    12
 #define LU_SITE_BITS_MAX    24
+#define LU_SITE_BITS_MAX_CL 19
 /**
  * total 256 buckets, we don't want too many buckets because:
  * - consume too much memory
@@ -78,12 +75,12 @@ enum {
 
 
 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
-CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
-               "Percentage of memory to be used as lu_object cache");
+module_param(lu_cache_percent, int, 0644);
+MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
 
 static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
-CFS_MODULE_PARM(lu_cache_nr, "l", long, 0644,
-               "Maximum number of objects in lu_object cache");
+module_param(lu_cache_nr, long, 0644);
+MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
 
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
@@ -95,16 +92,16 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
  */
 void lu_object_put(const struct lu_env *env, struct lu_object *o)
 {
-        struct lu_site_bkt_data *bkt;
-        struct lu_object_header *top;
-        struct lu_site          *site;
-        struct lu_object        *orig;
-       struct cfs_hash_bd            bd;
-       const struct lu_fid     *fid;
+       struct lu_site_bkt_data *bkt;
+       struct lu_object_header *top;
+       struct lu_site *site;
+       struct lu_object *orig;
+       struct cfs_hash_bd bd;
+       const struct lu_fid *fid;
 
-        top  = o->lo_header;
-        site = o->lo_dev->ld_site;
-        orig = o;
+       top  = o->lo_header;
+       site = o->lo_dev->ld_site;
+       orig = o;
 
        /*
         * till we have full fids-on-OST implemented anonymous objects
@@ -126,12 +123,11 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                return;
        }
 
-        cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
-        bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
+       cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
+       bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
 
        if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
                if (lu_object_is_dying(top)) {
-
                        /*
                         * somebody may be waiting for this, currently only
                         * used for cl_object, see cl_object_put_last().
@@ -141,47 +137,47 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                return;
        }
 
-        /*
-         * When last reference is released, iterate over object
-         * layers, and notify them that object is no longer busy.
-         */
+       /*
+        * When last reference is released, iterate over object
+        * layers, and notify them that object is no longer busy.
+        */
        list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
-                if (o->lo_ops->loo_object_release != NULL)
-                        o->lo_ops->loo_object_release(env, o);
-        }
+               if (o->lo_ops->loo_object_release != NULL)
+                       o->lo_ops->loo_object_release(env, o);
+       }
 
        if (!lu_object_is_dying(top) &&
            (lu_object_exists(orig) || lu_object_is_cl(orig))) {
                LASSERT(list_empty(&top->loh_lru));
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
                bkt->lsb_lru_len++;
-               lprocfs_counter_incr(site->ls_stats, LU_SS_LRU_LEN);
+               percpu_counter_inc(&site->ls_lru_len_counter);
                CDEBUG(D_INODE, "Add %p to site lru. hash: %p, bkt: %p, "
                       "lru_len: %ld\n",
                       o, site->ls_obj_hash, bkt, bkt->lsb_lru_len);
-                cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
-                return;
-        }
+               cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
+               return;
+       }
 
-        /*
+       /*
         * If object is dying (will not be cached) then remove it
-         * from hash table and LRU.
-         *
-         * This is done with hash table and LRU lists locked. As the only
-         * way to acquire first reference to previously unreferenced
-         * object is through hash-table lookup (lu_object_find()),
-         * or LRU scanning (lu_site_purge()), that are done under hash-table
-         * and LRU lock, no race with concurrent object lookup is possible
-         * and we can safely destroy object below.
-         */
+        * from hash table and LRU.
+        *
+        * This is done with hash table and LRU lists locked. As the only
+        * way to acquire first reference to previously unreferenced
+        * object is through hash-table lookup (lu_object_find()),
+        * or LRU scanning (lu_site_purge()), that are done under hash-table
+        * and LRU lock, no race with concurrent object lookup is possible
+        * and we can safely destroy object below.
+        */
        if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
                cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
-        cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
-        /*
-         * Object was already removed from hash and lru above, can
-         * kill it.
-         */
-        lu_object_free(env, orig);
+       cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
+       /*
+        * Object was already removed from hash and lru above, can
+        * kill it.
+        */
+       lu_object_free(env, orig);
 }
 EXPORT_SYMBOL(lu_object_put);
 
@@ -218,7 +214,7 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
                        list_del_init(&top->loh_lru);
                        bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
                        bkt->lsb_lru_len--;
-                       lprocfs_counter_decr(site->ls_stats, LU_SS_LRU_LEN);
+                       percpu_counter_dec(&site->ls_lru_len_counter);
                }
                cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
                cfs_hash_bd_unlock(obj_hash, &bd, 1);
@@ -347,8 +343,11 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 
 /**
  * Free \a nr objects from the cold end of the site LRU list.
+ * if canblock is 0, then don't block awaiting for another
+ * instance of lu_site_purge() to complete
  */
-int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
+int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
+                         int nr, int canblock)
 {
         struct lu_object_header *h;
         struct lu_object_header *temp;
@@ -357,7 +356,7 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
        struct cfs_hash_bd            bd2;
        struct list_head         dispose;
        int                      did_sth;
-       unsigned int             start;
+       unsigned int             start = 0;
         int                      count;
         int                      bnr;
        unsigned int             i;
@@ -370,14 +369,19 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
          * Under LRU list lock, scan LRU list and move unreferenced objects to
          * the dispose list, removing them from LRU and hash table.
          */
-        start = s->ls_purge_start;
+       if (nr != ~0)
+               start = s->ls_purge_start;
        bnr = (nr == ~0) ? -1 : nr / (int)CFS_HASH_NBKT(s->ls_obj_hash) + 1;
  again:
        /*
         * It doesn't make any sense to make purge threads parallel, that can
         * only bring troubles to us. See LU-5331.
         */
-       mutex_lock(&s->ls_purge_mutex);
+       if (canblock != 0)
+               mutex_lock(&s->ls_purge_mutex);
+       else if (mutex_trylock(&s->ls_purge_mutex) == 0)
+               goto out;
+
         did_sth = 0;
         cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
                 if (i < start)
@@ -396,7 +400,7 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
                                                &bd2, &h->loh_hash);
                        list_move(&h->loh_lru, &dispose);
                        bkt->lsb_lru_len--;
-                       lprocfs_counter_decr(s->ls_stats, LU_SS_LRU_LEN);
+                       percpu_counter_dec(&s->ls_lru_len_counter);
                         if (did_sth == 0)
                                 did_sth = 1;
 
@@ -433,9 +437,10 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
         /* race on s->ls_purge_start, but nobody cares */
         s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
 
+out:
         return nr;
 }
-EXPORT_SYMBOL(lu_site_purge);
+EXPORT_SYMBOL(lu_site_purge_objects);
 
 /*
  * Object printing.
@@ -584,53 +589,35 @@ int lu_object_invariant(const struct lu_object *o)
 static struct lu_object *htable_lookup(struct lu_site *s,
                                       struct cfs_hash_bd *bd,
                                       const struct lu_fid *f,
-                                      wait_queue_t *waiter,
                                       __u64 *version)
 {
        struct lu_site_bkt_data *bkt;
        struct lu_object_header *h;
-       struct hlist_node       *hnode;
-       __u64  ver = cfs_hash_bd_version_get(bd);
+       struct hlist_node *hnode;
+       __u64 ver = cfs_hash_bd_version_get(bd);
 
-        if (*version == ver)
+       if (*version == ver)
                return ERR_PTR(-ENOENT);
 
-        *version = ver;
-        bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
+       *version = ver;
+       bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
        /* cfs_hash_bd_peek_locked is a somehow "internal" function
         * of cfs_hash, it doesn't add refcount on object. */
        hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
-        if (hnode == NULL) {
-                lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
+       if (!hnode) {
+               lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
                return ERR_PTR(-ENOENT);
-        }
-
-        h = container_of0(hnode, struct lu_object_header, loh_hash);
-        if (likely(!lu_object_is_dying(h))) {
-               cfs_hash_get(s->ls_obj_hash, hnode);
-                lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
-               if (!list_empty(&h->loh_lru)) {
-                       list_del_init(&h->loh_lru);
-                       bkt->lsb_lru_len--;
-                       lprocfs_counter_decr(s->ls_stats, LU_SS_LRU_LEN);
-               }
-                return lu_object_top(h);
-        }
-
-        /*
-         * Lookup found an object being destroyed this object cannot be
-         * returned (to assure that references to dying objects are eventually
-         * drained), and moreover, lookup has to wait until object is freed.
-         */
-
-       if (likely(waiter != NULL)) {
-               init_waitqueue_entry(waiter, current);
-               add_wait_queue(&bkt->lsb_marche_funebre, waiter);
-               set_current_state(TASK_UNINTERRUPTIBLE);
-               lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
        }
 
-       return ERR_PTR(-EAGAIN);
+       h = container_of0(hnode, struct lu_object_header, loh_hash);
+       cfs_hash_get(s->ls_obj_hash, hnode);
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
+       if (!list_empty(&h->loh_lru)) {
+               list_del_init(&h->loh_lru);
+               bkt->lsb_lru_len--;
+               percpu_counter_dec(&s->ls_lru_len_counter);
+       }
+       return lu_object_top(h);
 }
 
 /**
@@ -663,140 +650,112 @@ static void lu_object_limit(const struct lu_env *env,
 
        size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
        nr = (__u64)lu_cache_nr;
-       if (size > nr)
-               lu_site_purge(env, dev->ld_site,
-                             MIN(size - nr, LU_CACHE_NR_MAX_ADJUST));
+       if (size <= nr)
+               return;
 
-       return;
+       lu_site_purge_objects(env, dev->ld_site,
+                             MIN(size - nr, LU_CACHE_NR_MAX_ADJUST), 0);
 }
 
 static struct lu_object *lu_object_new(const struct lu_env *env,
-                                       struct lu_device *dev,
-                                       const struct lu_fid *f,
-                                       const struct lu_object_conf *conf)
+                                      struct lu_device *dev,
+                                      const struct lu_fid *f,
+                                      const struct lu_object_conf *conf)
 {
-        struct lu_object        *o;
-       struct cfs_hash              *hs;
-       struct cfs_hash_bd            bd;
+       struct lu_object *o;
+       struct cfs_hash *hs;
+       struct cfs_hash_bd bd;
 
-        o = lu_object_alloc(env, dev, f, conf);
-        if (unlikely(IS_ERR(o)))
-                return o;
+       o = lu_object_alloc(env, dev, f, conf);
+       if (unlikely(IS_ERR(o)))
+               return o;
 
-        hs = dev->ld_site->ls_obj_hash;
-        cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-        cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-        cfs_hash_bd_unlock(hs, &bd, 1);
+       hs = dev->ld_site->ls_obj_hash;
+       cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
+       cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
+       cfs_hash_bd_unlock(hs, &bd, 1);
 
        lu_object_limit(env, dev);
 
-        return o;
+       return o;
 }
 
 /**
  * Core logic of lu_object_find*() functions.
+ *
+ * Much like lu_object_find(), but top level device of object is specifically
+ * \a dev rather than top level device of the site. This interface allows
+ * objects of different "stacking" to be created within the same site.
  */
-static struct lu_object *lu_object_find_try(const struct lu_env *env,
-                                           struct lu_device *dev,
-                                           const struct lu_fid *f,
-                                           const struct lu_object_conf *conf,
-                                           wait_queue_t *waiter)
+struct lu_object *lu_object_find_at(const struct lu_env *env,
+                                   struct lu_device *dev,
+                                   const struct lu_fid *f,
+                                   const struct lu_object_conf *conf)
 {
-       struct lu_object      *o;
-       struct lu_object      *shadow;
-       struct lu_site        *s;
-       struct cfs_hash            *hs;
-       struct cfs_hash_bd          bd;
-       __u64                  version = 0;
+       struct lu_object *o;
+       struct lu_object *shadow;
+       struct lu_site *s;
+       struct cfs_hash *hs;
+       struct cfs_hash_bd bd;
+       __u64 version = 0;
 
-        /*
-         * This uses standard index maintenance protocol:
-         *
-         *     - search index under lock, and return object if found;
-         *     - otherwise, unlock index, allocate new object;
-         *     - lock index and search again;
-         *     - if nothing is found (usual case), insert newly created
-         *       object into index;
-         *     - otherwise (race: other thread inserted object), free
-         *       object just allocated.
-         *     - unlock index;
-         *     - return object.
-         *
-         * For "LOC_F_NEW" case, we are sure the object is new established.
-         * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
-         * just alloc and insert directly.
-         *
-         * If dying object is found during index search, add @waiter to the
-         * site wait-queue and return ERR_PTR(-EAGAIN).
-         */
-        if (conf != NULL && conf->loc_flags & LOC_F_NEW)
-                return lu_object_new(env, dev, f, conf);
-
-        s  = dev->ld_site;
-        hs = s->ls_obj_hash;
-        cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-        o = htable_lookup(s, &bd, f, waiter, &version);
-        cfs_hash_bd_unlock(hs, &bd, 1);
+       /*
+        * This uses standard index maintenance protocol:
+        *
+        *     - search index under lock, and return object if found;
+        *     - otherwise, unlock index, allocate new object;
+        *     - lock index and search again;
+        *     - if nothing is found (usual case), insert newly created
+        *       object into index;
+        *     - otherwise (race: other thread inserted object), free
+        *       object just allocated.
+        *     - unlock index;
+        *     - return object.
+        *
+        * For "LOC_F_NEW" case, we are sure the object is new established.
+        * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
+        * just alloc and insert directly.
+        *
+        * If dying object is found during index search, add @waiter to the
+        * site wait-queue and return ERR_PTR(-EAGAIN).
+        */
+       if (conf && conf->loc_flags & LOC_F_NEW)
+               return lu_object_new(env, dev, f, conf);
+
+       s  = dev->ld_site;
+       hs = s->ls_obj_hash;
+       cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
+       o = htable_lookup(s, &bd, f, &version);
+       cfs_hash_bd_unlock(hs, &bd, 1);
        if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
-                return o;
+               return o;
 
-        /*
-         * Allocate new object. This may result in rather complicated
-         * operations, including fld queries, inode loading, etc.
-         */
-        o = lu_object_alloc(env, dev, f, conf);
-        if (unlikely(IS_ERR(o)))
-                return o;
+       /*
+        * Allocate new object. This may result in rather complicated
+        * operations, including fld queries, inode loading, etc.
+        */
+       o = lu_object_alloc(env, dev, f, conf);
+       if (unlikely(IS_ERR(o)))
+               return o;
 
-        LASSERT(lu_fid_eq(lu_object_fid(o), f));
+       LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
-        cfs_hash_bd_lock(hs, &bd, 1);
+       cfs_hash_bd_lock(hs, &bd, 1);
 
-        shadow = htable_lookup(s, &bd, f, waiter, &version);
+       shadow = htable_lookup(s, &bd, f, &version);
        if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
-                cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-                cfs_hash_bd_unlock(hs, &bd, 1);
+               cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
+               cfs_hash_bd_unlock(hs, &bd, 1);
 
                lu_object_limit(env, dev);
 
-                return o;
-        }
-
-        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
-        cfs_hash_bd_unlock(hs, &bd, 1);
-        lu_object_free(env, o);
-        return shadow;
-}
-
-/**
- * Much like lu_object_find(), but top level device of object is specifically
- * \a dev rather than top level device of the site. This interface allows
- * objects of different "stacking" to be created within the same site.
- */
-struct lu_object *lu_object_find_at(const struct lu_env *env,
-                                   struct lu_device *dev,
-                                   const struct lu_fid *f,
-                                   const struct lu_object_conf *conf)
-{
-       struct lu_site_bkt_data *bkt;
-       struct lu_object        *obj;
-       wait_queue_t           wait;
-
-       if (conf != NULL && conf->loc_flags & LOC_F_NOWAIT)
-               return lu_object_find_try(env, dev, f, conf, NULL);
-
-       while (1) {
-               obj = lu_object_find_try(env, dev, f, conf, &wait);
-               if (obj != ERR_PTR(-EAGAIN))
-                       return obj;
-               /*
-                * lu_object_find_try() already added waiter into the
-                * wait queue.
-                */
-               schedule();
-               bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
-               remove_wait_queue(&bkt->lsb_marche_funebre, &wait);
+               return o;
        }
+
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
+       cfs_hash_bd_unlock(hs, &bd, 1);
+       lu_object_free(env, o);
+       return shadow;
 }
 EXPORT_SYMBOL(lu_object_find_at);
 
@@ -825,35 +784,20 @@ struct lu_object *lu_object_find_slice(const struct lu_env *env,
 }
 EXPORT_SYMBOL(lu_object_find_slice);
 
-/**
- * Global list of all device types.
- */
-static struct list_head lu_device_types;
-
 int lu_device_type_init(struct lu_device_type *ldt)
 {
        int result = 0;
 
        atomic_set(&ldt->ldt_device_nr, 0);
-       INIT_LIST_HEAD(&ldt->ldt_linkage);
        if (ldt->ldt_ops->ldto_init)
                result = ldt->ldt_ops->ldto_init(ldt);
 
-       if (result == 0) {
-               spin_lock(&obd_types_lock);
-               list_add(&ldt->ldt_linkage, &lu_device_types);
-               spin_unlock(&obd_types_lock);
-       }
-
        return result;
 }
 EXPORT_SYMBOL(lu_device_type_init);
 
 void lu_device_type_fini(struct lu_device_type *ldt)
 {
-       spin_lock(&obd_types_lock);
-       list_del_init(&ldt->ldt_linkage);
-       spin_unlock(&obd_types_lock);
        if (ldt->ldt_ops->ldto_fini)
                ldt->ldt_ops->ldto_fini(ldt);
 }
@@ -862,8 +806,8 @@ EXPORT_SYMBOL(lu_device_type_fini);
 /**
  * Global list of all sites on this node
  */
-static struct list_head lu_sites;
-static DEFINE_MUTEX(lu_sites_guard);
+static LIST_HEAD(lu_sites);
+static DECLARE_RWSEM(lu_sites_guard);
 
 /**
  * Global environment used by site shrinker.
@@ -920,6 +864,7 @@ static unsigned long lu_htable_order(struct lu_device *top)
 {
        unsigned long cache_size;
        unsigned long bits;
+       unsigned long bits_max = LU_SITE_BITS_MAX;
 
        /*
         * For ZFS based OSDs the cache should be disabled by default.  This
@@ -933,6 +878,9 @@ static unsigned long lu_htable_order(struct lu_device *top)
                return LU_SITE_BITS_MIN;
        }
 
+       if (strcmp(top->ld_type->ldt_name, LUSTRE_VVP_NAME) == 0)
+               bits_max = LU_SITE_BITS_MAX_CL;
+
         /*
          * Calculate hash table size, assuming that we want reasonable
          * performance when 20% of total memory is occupied by cache of
@@ -944,8 +892,8 @@ static unsigned long lu_htable_order(struct lu_device *top)
 
 #if BITS_PER_LONG == 32
         /* limit hashtable size for lowmem systems to low RAM */
-       if (cache_size > 1 << (30 - PAGE_CACHE_SHIFT))
-               cache_size = 1 << (30 - PAGE_CACHE_SHIFT) * 3 / 4;
+       if (cache_size > 1 << (30 - PAGE_SHIFT))
+               cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
 #endif
 
         /* clear off unreasonable cache setting. */
@@ -958,12 +906,13 @@ static unsigned long lu_htable_order(struct lu_device *top)
                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
         }
         cache_size = cache_size / 100 * lu_cache_percent *
-               (PAGE_CACHE_SIZE / 1024);
+               (PAGE_SIZE / 1024);
 
         for (bits = 1; (1 << bits) < cache_size; ++bits) {
                 ;
         }
-        return bits;
+
+       return clamp_t(typeof(bits), bits, LU_SITE_BITS_MIN, bits_max);
 }
 
 static unsigned lu_obj_hop_hash(struct cfs_hash *hs,
@@ -1055,14 +1004,22 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        char name[16];
        unsigned long bits;
        unsigned int i;
+       int rc;
        ENTRY;
 
        memset(s, 0, sizeof *s);
        mutex_init(&s->ls_purge_mutex);
-       bits = lu_htable_order(top);
+
+#ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG
+       rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS);
+#else
+       rc = percpu_counter_init(&s->ls_lru_len_counter, 0);
+#endif
+       if (rc)
+               return -ENOMEM;
+
        snprintf(name, sizeof(name), "lu_site_%s", top->ld_type->ldt_name);
-       for (bits = clamp_t(typeof(bits), bits,
-                           LU_SITE_BITS_MIN, LU_SITE_BITS_MAX);
+       for (bits = lu_htable_order(top);
             bits >= LU_SITE_BITS_MIN; bits--) {
                s->ls_obj_hash = cfs_hash_create(name, bits, bits,
                                                 bits - LU_SITE_BKT_BITS,
@@ -1107,12 +1064,6 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
                              0, "cache_death_race", "cache_death_race");
         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
                              0, "lru_purged", "lru_purged");
-       /*
-        * Unlike other counters, lru_len can be decremented so
-        * need lc_sum instead of just lc_count
-        */
-       lprocfs_counter_init(s->ls_stats, LU_SS_LRU_LEN,
-                            LPROCFS_CNTR_AVGMINMAX, "lru_len", "lru_len");
 
        INIT_LIST_HEAD(&s->ls_linkage);
         s->ls_top_dev = top;
@@ -1134,9 +1085,11 @@ EXPORT_SYMBOL(lu_site_init);
  */
 void lu_site_fini(struct lu_site *s)
 {
-       mutex_lock(&lu_sites_guard);
+       down_write(&lu_sites_guard);
        list_del_init(&s->ls_linkage);
-       mutex_unlock(&lu_sites_guard);
+       up_write(&lu_sites_guard);
+
+       percpu_counter_destroy(&s->ls_lru_len_counter);
 
         if (s->ls_obj_hash != NULL) {
                 cfs_hash_putref(s->ls_obj_hash);
@@ -1161,11 +1114,11 @@ EXPORT_SYMBOL(lu_site_fini);
 int lu_site_init_finish(struct lu_site *s)
 {
         int result;
-       mutex_lock(&lu_sites_guard);
+       down_write(&lu_sites_guard);
         result = lu_context_refill(&lu_shrink_env.le_ctx);
         if (result == 0)
                list_add(&s->ls_linkage, &lu_sites);
-       mutex_unlock(&lu_sites_guard);
+       up_write(&lu_sites_guard);
         return result;
 }
 EXPORT_SYMBOL(lu_site_init_finish);
@@ -1579,7 +1532,7 @@ EXPORT_SYMBOL(lu_context_key_get);
 /**
  * List of remembered contexts. XXX document me.
  */
-static struct list_head lu_context_remembered;
+static LIST_HEAD(lu_context_remembered);
 
 /**
  * Destroy \a key in all remembered contexts. This is used to destroy key
@@ -1588,14 +1541,9 @@ static struct list_head lu_context_remembered;
  */
 void lu_context_key_quiesce(struct lu_context_key *key)
 {
-        struct lu_context *ctx;
-        extern unsigned cl_env_cache_purge(unsigned nr);
+       struct lu_context *ctx;
 
-        if (!(key->lct_tags & LCT_QUIESCENT)) {
-                /*
-                 * XXX layering violation.
-                 */
-                cl_env_cache_purge(~0);
+       if (!(key->lct_tags & LCT_QUIESCENT)) {
                 /*
                  * XXX memory barrier has to go here.
                  */
@@ -1620,15 +1568,18 @@ void lu_context_key_quiesce(struct lu_context_key *key)
                list_for_each_entry(ctx, &lu_context_remembered,
                                    lc_remember)
                        key_fini(ctx, key->lct_index);
-               write_unlock(&lu_keys_guard);
+
                ++key_set_version;
+               write_unlock(&lu_keys_guard);
        }
 }
 
 void lu_context_key_revive(struct lu_context_key *key)
 {
-        key->lct_tags &= ~LCT_QUIESCENT;
-        ++key_set_version;
+       write_lock(&lu_keys_guard);
+       key->lct_tags &= ~LCT_QUIESCENT;
+       ++key_set_version;
+       write_unlock(&lu_keys_guard);
 }
 
 static void keys_fini(struct lu_context *ctx)
@@ -1648,6 +1599,7 @@ static void keys_fini(struct lu_context *ctx)
 static int keys_fill(struct lu_context *ctx)
 {
        unsigned int i;
+       unsigned pre_version;
 
        /*
         * A serialisation with lu_context_key_quiesce() is needed, but some
@@ -1660,24 +1612,26 @@ static int keys_fill(struct lu_context *ctx)
         */
        read_lock(&lu_keys_guard);
        atomic_inc(&lu_key_initing_cnt);
+       pre_version = key_set_version;
        read_unlock(&lu_keys_guard);
 
-        LINVRNT(ctx->lc_value != NULL);
-        for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                struct lu_context_key *key;
+refill:
+       LINVRNT(ctx->lc_value != NULL);
+       for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
+               struct lu_context_key *key;
 
-                key = lu_keys[i];
-                if (ctx->lc_value[i] == NULL && key != NULL &&
-                    (key->lct_tags & ctx->lc_tags) &&
-                    /*
-                     * Don't create values for a LCT_QUIESCENT key, as this
-                     * will pin module owning a key.
-                     */
-                    !(key->lct_tags & LCT_QUIESCENT)) {
-                        void *value;
-
-                        LINVRNT(key->lct_init != NULL);
-                        LINVRNT(key->lct_index == i);
+               key = lu_keys[i];
+               if (ctx->lc_value[i] == NULL && key != NULL &&
+                   (key->lct_tags & ctx->lc_tags) &&
+                   /*
+                    * Don't create values for a LCT_QUIESCENT key, as this
+                    * will pin module owning a key.
+                    */
+                   !(key->lct_tags & LCT_QUIESCENT)) {
+                       void *value;
+
+                       LINVRNT(key->lct_init != NULL);
+                       LINVRNT(key->lct_index == i);
 
                        LASSERT(key->lct_owner != NULL);
                        if (!(ctx->lc_tags & LCT_NOREF) &&
@@ -1694,19 +1648,29 @@ static int keys_fill(struct lu_context *ctx)
 
                        lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
                        atomic_inc(&key->lct_used);
-                        /*
-                         * This is the only place in the code, where an
-                         * element of ctx->lc_value[] array is set to non-NULL
-                         * value.
-                         */
-                        ctx->lc_value[i] = value;
-                        if (key->lct_exit != NULL)
-                                ctx->lc_tags |= LCT_HAS_EXIT;
-                }
-                ctx->lc_version = key_set_version;
-        }
+                       /*
+                        * This is the only place in the code, where an
+                        * element of ctx->lc_value[] array is set to non-NULL
+                        * value.
+                        */
+                       ctx->lc_value[i] = value;
+                       if (key->lct_exit != NULL)
+                               ctx->lc_tags |= LCT_HAS_EXIT;
+               }
+       }
+
+       read_lock(&lu_keys_guard);
+       if (pre_version != key_set_version) {
+               pre_version = key_set_version;
+               read_unlock(&lu_keys_guard);
+               goto refill;
+       }
+
+       ctx->lc_version = key_set_version;
+
        atomic_dec(&lu_key_initing_cnt);
-        return 0;
+       read_unlock(&lu_keys_guard);
+       return 0;
 }
 
 static int keys_init(struct lu_context *ctx)
@@ -1785,10 +1749,11 @@ void lu_context_exit(struct lu_context *ctx)
         LINVRNT(ctx->lc_state == LCS_ENTERED);
         ctx->lc_state = LCS_LEFT;
         if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
+               /* could race with key quiescency */
+               if (ctx->lc_tags & LCT_REMEMBER)
+                       read_lock(&lu_keys_guard);
+
                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                       /* could race with key quiescency */
-                       if (ctx->lc_tags & LCT_REMEMBER)
-                               read_lock(&lu_keys_guard);
                        if (ctx->lc_value[i] != NULL) {
                                struct lu_context_key *key;
 
@@ -1798,9 +1763,10 @@ void lu_context_exit(struct lu_context *ctx)
                                        key->lct_exit(ctx,
                                                      key, ctx->lc_value[i]);
                        }
-                       if (ctx->lc_tags & LCT_REMEMBER)
-                               read_unlock(&lu_keys_guard);
                 }
+
+               if (ctx->lc_tags & LCT_REMEMBER)
+                       read_unlock(&lu_keys_guard);
         }
 }
 EXPORT_SYMBOL(lu_context_exit);
@@ -1960,12 +1926,15 @@ static void lu_site_stats_get(struct cfs_hash *hs,
 
 
 /*
- * lu_cache_shrink_count returns the number of cached objects that are
- * candidates to be freed by shrink_slab(). A counter, which tracks
- * the number of items in the site's lru, is maintained in the per cpu
- * stats of each site. The counter is incremented when an object is added
- * to a site's lru and decremented when one is removed. The number of
- * free-able objects is the sum of all per cpu counters for all sites.
+ * lu_cache_shrink_count() returns an approximate number of cached objects
+ * that can be freed by shrink_slab(). A counter, which tracks the
+ * number of items in the site's lru, is maintained in a percpu_counter
+ * for each site. The percpu values are incremented and decremented as
+ * objects are added or removed from the lru. The percpu values are summed
+ * and saved whenever a percpu value exceeds a threshold. Thus the saved,
+ * summed value at any given time may not accurately reflect the current
+ * lru length. But this value is sufficiently accurate for the needs of
+ * a shrinker.
  *
  * Using a per cpu counter is a compromise solution to concurrent access:
  * lu_object_put() can update the counter without locking the site and
@@ -1982,11 +1951,10 @@ static unsigned long lu_cache_shrink_count(struct shrinker *sk,
        if (!(sc->gfp_mask & __GFP_FS))
                return 0;
 
-       mutex_lock(&lu_sites_guard);
-       list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
-               cached += ls_stats_read(s->ls_stats, LU_SS_LRU_LEN);
-       }
-       mutex_unlock(&lu_sites_guard);
+       down_read(&lu_sites_guard);
+       list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage)
+               cached += percpu_counter_read_positive(&s->ls_lru_len_counter);
+       up_read(&lu_sites_guard);
 
        cached = (cached / 100) * sysctl_vfs_cache_pressure;
        CDEBUG(D_INODE, "%ld objects cached, cache pressure %d\n",
@@ -2017,7 +1985,7 @@ static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
                 */
                return SHRINK_STOP;
 
-       mutex_lock(&lu_sites_guard);
+       down_write(&lu_sites_guard);
        list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
                remain = lu_site_purge(&lu_shrink_env, s, remain);
                /*
@@ -2027,7 +1995,7 @@ static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
                list_move_tail(&s->ls_linkage, &splice);
        }
        list_splice(&splice, lu_sites.prev);
-       mutex_unlock(&lu_sites_guard);
+       up_write(&lu_sites_guard);
 
        return sc->nr_to_scan - remain;
 }
@@ -2134,10 +2102,6 @@ int lu_global_init(void)
 
         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
 
-       INIT_LIST_HEAD(&lu_device_types);
-       INIT_LIST_HEAD(&lu_context_remembered);
-       INIT_LIST_HEAD(&lu_sites);
-
         result = lu_ref_global_init();
         if (result != 0)
                 return result;
@@ -2152,9 +2116,9 @@ int lu_global_init(void)
          * conservatively. This should not be too bad, because this
          * environment is global.
          */
-       mutex_lock(&lu_sites_guard);
+       down_write(&lu_sites_guard);
         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
-       mutex_unlock(&lu_sites_guard);
+       up_write(&lu_sites_guard);
         if (result != 0)
                 return result;
 
@@ -2186,9 +2150,9 @@ void lu_global_fini(void)
          * Tear shrinker environment down _after_ de-registering
          * lu_global_key, because the latter has a value in the former.
          */
-       mutex_lock(&lu_sites_guard);
+       down_write(&lu_sites_guard);
         lu_env_fini(&lu_shrink_env);
-       mutex_unlock(&lu_sites_guard);
+       up_write(&lu_sites_guard);
 
         lu_ref_global_fini();
 }
@@ -2199,14 +2163,7 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
        struct lprocfs_counter ret;
 
        lprocfs_stats_collect(stats, idx, &ret);
-       if (idx == LU_SS_LRU_LEN)
-               /*
-                * protect against counter on cpu A being decremented
-                * before counter is incremented on cpu B; unlikely
-                */
-               return (__u32)((ret.lc_sum > 0) ? ret.lc_sum : 0);
-       else
-               return (__u32)ret.lc_count;
+       return (__u32)ret.lc_count;
 #else
        return 0;
 #endif
@@ -2223,19 +2180,19 @@ int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
        memset(&stats, 0, sizeof(stats));
        lu_site_stats_get(s->ls_obj_hash, &stats, 1);
 
-       return seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d %d\n",
-                         stats.lss_busy,
-                         stats.lss_total,
-                         stats.lss_populated,
-                         CFS_HASH_NHLIST(s->ls_obj_hash),
-                         stats.lss_max_search,
-                         ls_stats_read(s->ls_stats, LU_SS_CREATED),
-                         ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
-                         ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
-                         ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
-                         ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
-                         ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED),
-                         ls_stats_read(s->ls_stats, LU_SS_LRU_LEN));
+       seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
+                  stats.lss_busy,
+                  stats.lss_total,
+                  stats.lss_populated,
+                  CFS_HASH_NHLIST(s->ls_obj_hash),
+                  stats.lss_max_search,
+                  ls_stats_read(s->ls_stats, LU_SS_CREATED),
+                  ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
+                  ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
+                  ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
+                  ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
+                  ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
+       return 0;
 }
 EXPORT_SYMBOL(lu_site_stats_seq_print);
 
@@ -2286,19 +2243,24 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
 {
        struct lu_site          *s = o->lo_dev->ld_site;
        struct lu_fid           *old = &o->lo_header->loh_fid;
-       struct lu_object        *shadow;
-       wait_queue_t             waiter;
        struct cfs_hash         *hs;
        struct cfs_hash_bd       bd;
-       __u64                    version = 0;
 
        LASSERT(fid_is_zero(old));
 
+       /* supposed to be unique */
        hs = s->ls_obj_hash;
        cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
-       shadow = htable_lookup(s, &bd, fid, &waiter, &version);
-       /* supposed to be unique */
-       LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
+#ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
+       {
+               __u64 version = 0;
+               struct lu_object *shadow;
+
+               shadow = htable_lookup(s, &bd, fid, &version);
+               /* supposed to be unique */
+               LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
+       }
+#endif
        *old = *fid;
        cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
        cfs_hash_bd_unlock(hs, &bd, 1);
@@ -2399,3 +2361,4 @@ int lu_buf_check_and_grow(struct lu_buf *buf, size_t len)
        buf->lb_len = len;
        return 0;
 }
+EXPORT_SYMBOL(lu_buf_check_and_grow);