Whamcloud - gitweb
LU-9679 modules: convert MIN/MAX to kernel style
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 6392139..7ebe04e 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 #define DEBUG_SUBSYSTEM S_CLASS
 
-#include <libcfs/libcfs.h>
-
-#ifdef __KERNEL__
-# include <linux/module.h>
+#include <linux/module.h>
+#include <linux/list.h>
+#ifdef HAVE_PROCESSOR_H
+#include <linux/processor.h>
+#else
+#include <libcfs/linux/processor.h>
 #endif
+#include <linux/random.h>
 
-/* hash_long() */
-#include <libcfs/libcfs_hash.h>
+#include <libcfs/libcfs.h>
+#include <libcfs/libcfs_hash.h> /* hash_long() */
+#include <libcfs/linux/linux-mem.h>
 #include <obd_class.h>
 #include <obd_support.h>
 #include <lustre_disk.h>
 #include <lustre_fid.h>
 #include <lu_object.h>
-#include <libcfs/list.h>
+#include <lu_ref.h>
+
+struct lu_site_bkt_data {
+       /**
+        * LRU list, updated on each access to object. Protected by
+        * lsb_waitq.lock.
+        *
+        * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
+        * moved to the lu_site::ls_lru.prev
+        */
+       struct list_head                lsb_lru;
+       /**
+        * Wait-queue signaled when an object in this site is ultimately
+        * destroyed (lu_object_free()) or initialized (lu_object_start()).
+        * It is used by lu_object_find() to wait before re-trying when
+        * object in the process of destruction is found in the hash table;
+        * or wait object to be initialized by the allocator.
+        *
+        * \see htable_lookup().
+        */
+       wait_queue_head_t               lsb_waitq;
+};
+
+enum {
+       LU_CACHE_PERCENT_MAX     = 50,
+       LU_CACHE_PERCENT_DEFAULT = 20
+};
+
+#define        LU_CACHE_NR_MAX_ADJUST          512
+#define        LU_CACHE_NR_UNLIMITED           -1
+#define        LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
+#define        LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
+/** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
+#define        LU_CACHE_NR_ZFS_LIMIT           10240
+
+#define LU_SITE_BITS_MIN    12
+#define LU_SITE_BITS_MAX    24
+#define LU_SITE_BITS_MAX_CL 19
+/**
+ * Max 256 buckets, we don't want too many buckets because:
+ * - consume too much memory (currently max 16K)
+ * - avoid unbalanced LRU list
+ * With few cpus there is little gain from extra buckets, so
+ * we treat this as a maximum in lu_site_init().
+ */
+#define LU_SITE_BKT_BITS    8
+
+
+static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
+module_param(lu_cache_percent, int, 0644);
+MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
+
+static long lu_cache_nr = LU_CACHE_NR_DEFAULT;
+module_param(lu_cache_nr, long, 0644);
+MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
 
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
+static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
+
+static u32 lu_fid_hash(const void *data, u32 seed)
+{
+       const struct lu_fid *fid = data;
+
+       seed = cfs_hash_32(seed ^ fid->f_oid, 32);
+       seed ^= cfs_hash_64(fid->f_seq, 32);
+       return seed;
+}
+
+static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid)
+{
+       return lu_fid_hash(fid, s->ls_bkt_seed) &
+              (s->ls_bkt_cnt - 1);
+}
+
+wait_queue_head_t *
+lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
+{
+       struct lu_site_bkt_data *bkt;
+
+       bkt = &site->ls_bkts[lu_bkt_hash(site, fid)];
+       return &bkt->lsb_waitq;
+}
+EXPORT_SYMBOL(lu_site_wq_from_fid);
 
 /**
  * Decrease reference counter on object. If last reference is freed, return
@@ -68,30 +148,26 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o);
  */
 void lu_object_put(const struct lu_env *env, struct lu_object *o)
 {
-        struct lu_site_bkt_data *bkt;
-        struct lu_object_header *top;
-        struct lu_site          *site;
-        struct lu_object        *orig;
-        cfs_hash_bd_t            bd;
-       const struct lu_fid     *fid;
-
-        top  = o->lo_header;
-        site = o->lo_dev->ld_site;
-        orig = o;
+       struct lu_site_bkt_data *bkt;
+       struct lu_object_header *top = o->lo_header;
+       struct lu_site *site = o->lo_dev->ld_site;
+       struct lu_object *orig = o;
+       struct cfs_hash_bd bd;
+       const struct lu_fid *fid = lu_object_fid(o);
+       bool is_dying;
 
        /*
         * till we have full fids-on-OST implemented anonymous objects
         * are possible in OSP. such an object isn't listed in the site
         * so we should not remove it from the site.
         */
-       fid = lu_object_fid(o);
        if (fid_is_zero(fid)) {
                LASSERT(top->loh_hash.next == NULL
                        && top->loh_hash.pprev == NULL);
-               LASSERT(cfs_list_empty(&top->loh_lru));
-               if (!cfs_atomic_dec_and_test(&top->loh_ref))
+               LASSERT(list_empty(&top->loh_lru));
+               if (!atomic_dec_and_test(&top->loh_ref))
                        return;
-               cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
+               list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
                        if (o->lo_ops->loo_object_release != NULL)
                                o->lo_ops->loo_object_release(env, o);
                }
@@ -99,57 +175,70 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                return;
        }
 
-        cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
-        bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
-
-        if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
-                if (lu_object_is_dying(top)) {
+       cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
+
+       is_dying = lu_object_is_dying(top);
+       if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
+               /* at this point the object reference is dropped and lock is
+                * not taken, so lu_object should not be touched because it
+                * can be freed by concurrent thread. Use local variable for
+                * check.
+                */
+               if (is_dying) {
+                       /*
+                        * somebody may be waiting for this, currently only
+                        * used for cl_object, see cl_object_put_last().
+                        */
+                       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+                       wake_up_all(&bkt->lsb_waitq);
+               }
+               return;
+       }
 
-                        /*
-                         * somebody may be waiting for this, currently only
-                         * used for cl_object, see cl_object_put_last().
-                         */
-                        cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
-                }
-                return;
-        }
+       /*
+        * When last reference is released, iterate over object
+        * layers, and notify them that object is no longer busy.
+        */
+       list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
+               if (o->lo_ops->loo_object_release != NULL)
+                       o->lo_ops->loo_object_release(env, o);
+       }
 
-        LASSERT(bkt->lsb_busy > 0);
-        bkt->lsb_busy--;
-        /*
-         * When last reference is released, iterate over object
-         * layers, and notify them that object is no longer busy.
-         */
-        cfs_list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
-                if (o->lo_ops->loo_object_release != NULL)
-                        o->lo_ops->loo_object_release(env, o);
-        }
+       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+       spin_lock(&bkt->lsb_waitq.lock);
 
-        if (!lu_object_is_dying(top)) {
-                LASSERT(cfs_list_empty(&top->loh_lru));
-                cfs_list_add_tail(&top->loh_lru, &bkt->lsb_lru);
-                cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
-                return;
-        }
+       /* don't use local 'is_dying' here because if was taken without lock
+        * but here we need the latest actual value of it so check lu_object
+        * directly here.
+        */
+       if (!lu_object_is_dying(top) &&
+           (lu_object_exists(orig) || lu_object_is_cl(orig))) {
+               LASSERT(list_empty(&top->loh_lru));
+               list_add_tail(&top->loh_lru, &bkt->lsb_lru);
+               spin_unlock(&bkt->lsb_waitq.lock);
+               percpu_counter_inc(&site->ls_lru_len_counter);
+               CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n",
+                      orig, top, site->ls_obj_hash, bkt);
+               cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
+               return;
+       }
 
-        /*
-         * If object is dying (will not be cached), removed it
-         * from hash table and LRU.
-         *
-         * This is done with hash table and LRU lists locked. As the only
-         * way to acquire first reference to previously unreferenced
-         * object is through hash-table lookup (lu_object_find()),
-         * or LRU scanning (lu_site_purge()), that are done under hash-table
-         * and LRU lock, no race with concurrent object lookup is possible
-         * and we can safely destroy object below.
-         */
-        cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
-        cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
-        /*
-         * Object was already removed from hash and lru above, can
-         * kill it.
-         */
-        lu_object_free(env, orig);
+       /*
+        * If object is dying (will not be cached) then remove it
+        * from hash table (it is already not on the LRU).
+        *
+        * This is done with hash table lists locked. As the only
+        * way to acquire first reference to previously unreferenced
+        * object is through hash-table lookup (lu_object_find())
+        * which is done under hash-table, no race with concurrent
+        * object lookup is possible and we can safely destroy object below.
+        */
+       if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
+               cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
+       spin_unlock(&bkt->lsb_waitq.lock);
+       cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
+       /* Object was already removed from hash above, can kill it. */
+       lu_object_free(env, orig);
 }
 EXPORT_SYMBOL(lu_object_put);
 
@@ -159,29 +248,53 @@ EXPORT_SYMBOL(lu_object_put);
  */
 void lu_object_put_nocache(const struct lu_env *env, struct lu_object *o)
 {
-       set_bit(LU_OBJECT_HEARD_BANSHEE,
-                   &o->lo_header->loh_flags);
+       set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags);
        return lu_object_put(env, o);
 }
 EXPORT_SYMBOL(lu_object_put_nocache);
 
 /**
+ * Kill the object and take it out of LRU cache.
+ * Currently used by client code for layout change.
+ */
+void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
+{
+       struct lu_object_header *top;
+
+       top = o->lo_header;
+       set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
+       if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
+               struct lu_site *site = o->lo_dev->ld_site;
+               struct cfs_hash *obj_hash = site->ls_obj_hash;
+               struct cfs_hash_bd bd;
+
+               cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
+               if (!list_empty(&top->loh_lru)) {
+                       struct lu_site_bkt_data *bkt;
+
+                       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+                       spin_lock(&bkt->lsb_waitq.lock);
+                       list_del_init(&top->loh_lru);
+                       spin_unlock(&bkt->lsb_waitq.lock);
+                       percpu_counter_dec(&site->ls_lru_len_counter);
+               }
+               cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
+               cfs_hash_bd_unlock(obj_hash, &bd, 1);
+       }
+}
+EXPORT_SYMBOL(lu_object_unhash);
+
+/**
  * Allocate new object.
  *
  * This follows object creation protocol, described in the comment within
  * struct lu_device_operations definition.
  */
 static struct lu_object *lu_object_alloc(const struct lu_env *env,
-                                         struct lu_device *dev,
-                                         const struct lu_fid *f,
-                                         const struct lu_object_conf *conf)
-{
-        struct lu_object *scan;
-        struct lu_object *top;
-        cfs_list_t *layers;
-        int clean;
-        int result;
-        ENTRY;
+                                        struct lu_device *dev,
+                                        const struct lu_fid *f)
+{
+       struct lu_object *top;
 
        /*
         * Create top-level object slice. This will also create
@@ -189,47 +302,72 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
         */
        top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
        if (top == NULL)
-               RETURN(ERR_PTR(-ENOMEM));
+               return ERR_PTR(-ENOMEM);
        if (IS_ERR(top))
-               RETURN(top);
-        /*
-         * This is the only place where object fid is assigned. It's constant
-         * after this point.
-         */
-        top->lo_header->loh_fid = *f;
-        layers = &top->lo_header->loh_layers;
-        do {
-                /*
-                 * Call ->loo_object_init() repeatedly, until no more new
-                 * object slices are created.
-                 */
-                clean = 1;
-                cfs_list_for_each_entry(scan, layers, lo_linkage) {
-                        if (scan->lo_flags & LU_OBJECT_ALLOCATED)
-                                continue;
-                        clean = 0;
-                        scan->lo_header = top->lo_header;
-                        result = scan->lo_ops->loo_object_init(env, scan, conf);
-                        if (result != 0) {
-                                lu_object_free(env, top);
-                                RETURN(ERR_PTR(result));
-                        }
-                        scan->lo_flags |= LU_OBJECT_ALLOCATED;
-                }
-        } while (!clean);
-
-        cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
-                if (scan->lo_ops->loo_object_start != NULL) {
-                        result = scan->lo_ops->loo_object_start(env, scan);
-                        if (result != 0) {
-                                lu_object_free(env, top);
-                                RETURN(ERR_PTR(result));
-                        }
-                }
-        }
+               return top;
+       /*
+        * This is the only place where object fid is assigned. It's constant
+        * after this point.
+        */
+       top->lo_header->loh_fid = *f;
+
+       return top;
+}
+
+/**
+ * Initialize object.
+ *
+ * This is called after object hash insertion to avoid returning an object with
+ * stale attributes.
+ */
+static int lu_object_start(const struct lu_env *env, struct lu_device *dev,
+                          struct lu_object *top,
+                          const struct lu_object_conf *conf)
+{
+       struct lu_object *scan;
+       struct list_head *layers;
+       unsigned int init_mask = 0;
+       unsigned int init_flag;
+       int clean;
+       int result;
+
+       layers = &top->lo_header->loh_layers;
+
+       do {
+               /*
+                * Call ->loo_object_init() repeatedly, until no more new
+                * object slices are created.
+                */
+               clean = 1;
+               init_flag = 1;
+               list_for_each_entry(scan, layers, lo_linkage) {
+                       if (init_mask & init_flag)
+                               goto next;
+                       clean = 0;
+                       scan->lo_header = top->lo_header;
+                       result = scan->lo_ops->loo_object_init(env, scan, conf);
+                       if (result)
+                               return result;
+
+                       init_mask |= init_flag;
+next:
+                       init_flag <<= 1;
+               }
+       } while (!clean);
+
+       list_for_each_entry_reverse(scan, layers, lo_linkage) {
+               if (scan->lo_ops->loo_object_start != NULL) {
+                       result = scan->lo_ops->loo_object_start(env, scan);
+                       if (result)
+                               return result;
+               }
+       }
+
+       lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
+
+       set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags);
 
-        lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
-        RETURN(top);
+       return 0;
 }
 
 /**
@@ -237,19 +375,19 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
  */
 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 {
-        struct lu_site_bkt_data *bkt;
-        struct lu_site          *site;
-        struct lu_object        *scan;
-        cfs_list_t              *layers;
-        cfs_list_t               splice;
-
-        site   = o->lo_dev->ld_site;
-        layers = &o->lo_header->loh_layers;
-        bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
+       wait_queue_head_t *wq;
+       struct lu_site *site;
+       struct lu_object *scan;
+       struct list_head *layers;
+       LIST_HEAD(splice);
+
+       site = o->lo_dev->ld_site;
+       layers = &o->lo_header->loh_layers;
+       wq = lu_site_wq_from_fid(site, &o->lo_header->loh_fid);
         /*
          * First call ->loo_object_delete() method to release all resources.
          */
-        cfs_list_for_each_entry_reverse(scan, layers, lo_linkage) {
+       list_for_each_entry_reverse(scan, layers, lo_linkage) {
                 if (scan->lo_ops->loo_object_delete != NULL)
                         scan->lo_ops->loo_object_delete(env, scan);
         }
@@ -260,66 +398,79 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
          * necessary, because lu_object_header is freed together with the
          * top-level slice.
          */
-        CFS_INIT_LIST_HEAD(&splice);
-        cfs_list_splice_init(layers, &splice);
-        while (!cfs_list_empty(&splice)) {
-                /*
-                 * Free layers in bottom-to-top order, so that object header
-                 * lives as long as possible and ->loo_object_free() methods
-                 * can look at its contents.
-                 */
-                o = container_of0(splice.prev, struct lu_object, lo_linkage);
-                cfs_list_del_init(&o->lo_linkage);
-                LASSERT(o->lo_ops->loo_object_free != NULL);
-                o->lo_ops->loo_object_free(env, o);
-        }
+       list_splice_init(layers, &splice);
+       while (!list_empty(&splice)) {
+               /*
+                * Free layers in bottom-to-top order, so that object header
+                * lives as long as possible and ->loo_object_free() methods
+                * can look at its contents.
+                */
+               o = container_of0(splice.prev, struct lu_object, lo_linkage);
+               list_del_init(&o->lo_linkage);
+               LASSERT(o->lo_ops->loo_object_free != NULL);
+               o->lo_ops->loo_object_free(env, o);
+       }
 
-        if (cfs_waitq_active(&bkt->lsb_marche_funebre))
-                cfs_waitq_broadcast(&bkt->lsb_marche_funebre);
+       if (waitqueue_active(wq))
+               wake_up_all(wq);
 }
 
 /**
  * Free \a nr objects from the cold end of the site LRU list.
+ * if canblock is 0, then don't block awaiting for another
+ * instance of lu_site_purge() to complete
  */
-int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
+int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
+                         int nr, int canblock)
 {
         struct lu_object_header *h;
         struct lu_object_header *temp;
         struct lu_site_bkt_data *bkt;
-        cfs_hash_bd_t            bd;
-        cfs_hash_bd_t            bd2;
-        cfs_list_t               dispose;
-        int                      did_sth;
-        int                      start;
+       LIST_HEAD(dispose);
+       int                      did_sth;
+       unsigned int             start = 0;
         int                      count;
         int                      bnr;
-        int                      i;
+       unsigned int             i;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
+               RETURN(0);
 
-        CFS_INIT_LIST_HEAD(&dispose);
         /*
          * Under LRU list lock, scan LRU list and move unreferenced objects to
          * the dispose list, removing them from LRU and hash table.
          */
-        start = s->ls_purge_start;
-        bnr = (nr == ~0) ? -1 : nr / CFS_HASH_NBKT(s->ls_obj_hash) + 1;
+       if (nr != ~0)
+               start = s->ls_purge_start;
+       bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1;
  again:
+       /*
+        * It doesn't make any sense to make purge threads parallel, that can
+        * only bring troubles to us. See LU-5331.
+        */
+       if (canblock != 0)
+               mutex_lock(&s->ls_purge_mutex);
+       else if (mutex_trylock(&s->ls_purge_mutex) == 0)
+               goto out;
+
         did_sth = 0;
-        cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
-                if (i < start)
-                        continue;
+       for (i = start; i < s->ls_bkt_cnt ; i++) {
                 count = bnr;
-                cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
-                bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
+               bkt = &s->ls_bkts[i];
+               spin_lock(&bkt->lsb_waitq.lock);
 
-                cfs_list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
-                        LASSERT(cfs_atomic_read(&h->loh_ref) == 0);
+               list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
+                       LASSERT(atomic_read(&h->loh_ref) == 0);
 
-                        cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
-                        LASSERT(bd.bd_bucket == bd2.bd_bucket);
+                       LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i);
 
-                        cfs_hash_bd_del_locked(s->ls_obj_hash,
-                                               &bd2, &h->loh_hash);
-                        cfs_list_move(&h->loh_lru, &dispose);
+                       /* Cannot remove from hash under current spinlock,
+                        * so set flag to stop object from being found
+                        * by htable_lookup().
+                        */
+                       set_bit(LU_OBJECT_PURGING, &h->loh_flags);
+                       list_move(&h->loh_lru, &dispose);
+                       percpu_counter_dec(&s->ls_lru_len_counter);
                         if (did_sth == 0)
                                 did_sth = 1;
 
@@ -329,35 +480,37 @@ int lu_site_purge(const struct lu_env *env, struct lu_site *s, int nr)
                         if (count > 0 && --count == 0)
                                 break;
 
-                }
-                cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
-                cfs_cond_resched();
-                /*
-                 * Free everything on the dispose list. This is safe against
-                 * races due to the reasons described in lu_object_put().
-                 */
-                while (!cfs_list_empty(&dispose)) {
-                        h = container_of0(dispose.next,
-                                          struct lu_object_header, loh_lru);
-                        cfs_list_del_init(&h->loh_lru);
-                        lu_object_free(env, lu_object_top(h));
-                        lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
-                }
+               }
+               spin_unlock(&bkt->lsb_waitq.lock);
+               cond_resched();
+               /*
+                * Free everything on the dispose list. This is safe against
+                * races due to the reasons described in lu_object_put().
+                */
+               while ((h = list_first_entry_or_null(&dispose,
+                                                    struct lu_object_header,
+                                                    loh_lru)) != NULL) {
+                       cfs_hash_del(s->ls_obj_hash, &h->loh_fid, &h->loh_hash);
+                       list_del_init(&h->loh_lru);
+                       lu_object_free(env, lu_object_top(h));
+                       lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
+               }
 
                 if (nr == 0)
                         break;
         }
+       mutex_unlock(&s->ls_purge_mutex);
 
         if (nr != 0 && did_sth && start != 0) {
                 start = 0; /* restart from the first bucket */
                 goto again;
         }
         /* race on s->ls_purge_start, but nobody cares */
-        s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
-
+       s->ls_purge_start = i & (s->ls_bkt_cnt - 1);
+out:
         return nr;
 }
-EXPORT_SYMBOL(lu_site_purge);
+EXPORT_SYMBOL(lu_site_purge_objects);
 
 /*
  * Object printing.
@@ -398,11 +551,11 @@ LU_KEY_INIT_FINI(lu_global, struct lu_cdebug_data);
  * Key, holding temporary buffer. This key is registered very early by
  * lu_global_init().
  */
-struct lu_context_key lu_global_key = {
-        .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
-                    LCT_MG_THREAD | LCT_CL_THREAD,
-        .lct_init = lu_global_key_init,
-        .lct_fini = lu_global_key_fini
+static struct lu_context_key lu_global_key = {
+       .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD |
+                   LCT_MG_THREAD | LCT_CL_THREAD | LCT_LOCAL,
+       .lct_init = lu_global_key_init,
+       .lct_fini = lu_global_key_fini
 };
 
 /**
@@ -430,8 +583,8 @@ int lu_cdebug_printer(const struct lu_env *env,
         vsnprintf(key->lck_area + used,
                   ARRAY_SIZE(key->lck_area) - used, format, args);
         if (complete) {
-                if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
-                        libcfs_debug_msg(msgdata, "%s", key->lck_area);
+               if (cfs_cdebug_show(msgdata->msg_mask, msgdata->msg_subsys))
+                       libcfs_debug_msg(msgdata, "%s\n", key->lck_area);
                 key->lck_area[0] = 0;
         }
         va_end(args);
@@ -446,13 +599,13 @@ void lu_object_header_print(const struct lu_env *env, void *cookie,
                             lu_printer_t printer,
                             const struct lu_object_header *hdr)
 {
-        (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
-                   hdr, hdr->loh_flags, cfs_atomic_read(&hdr->loh_ref),
-                   PFID(&hdr->loh_fid),
-                   cfs_hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
-                   cfs_list_empty((cfs_list_t *)&hdr->loh_lru) ? \
-                   "" : " lru",
-                   hdr->loh_attr & LOHA_EXISTS ? " exist":"");
+       (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
+                  hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
+                  PFID(&hdr->loh_fid),
+                  hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
+                  list_empty((struct list_head *)&hdr->loh_lru) ? \
+                  "" : " lru",
+                  hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
 }
 EXPORT_SYMBOL(lu_object_header_print);
 
@@ -460,28 +613,30 @@ EXPORT_SYMBOL(lu_object_header_print);
  * Print human readable representation of the \a o to the \a printer.
  */
 void lu_object_print(const struct lu_env *env, void *cookie,
-                     lu_printer_t printer, const struct lu_object *o)
+                    lu_printer_t printer, const struct lu_object *o)
 {
-        static const char ruler[] = "........................................";
-        struct lu_object_header *top;
-        int depth;
+       static const char ruler[] = "........................................";
+       struct lu_object_header *top;
+       int depth = 4;
 
-        top = o->lo_header;
-        lu_object_header_print(env, cookie, printer, top);
-        (*printer)(env, cookie, "{ \n");
-        cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
-                depth = o->lo_depth + 4;
+       top = o->lo_header;
+       lu_object_header_print(env, cookie, printer, top);
+       (*printer)(env, cookie, "{\n");
 
-                /*
-                 * print `.' \a depth times followed by type name and address
-                 */
-                (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
-                           o->lo_dev->ld_type->ldt_name, o);
-                if (o->lo_ops->loo_object_print != NULL)
-                        o->lo_ops->loo_object_print(env, cookie, printer, o);
-                (*printer)(env, cookie, "\n");
-        }
-        (*printer)(env, cookie, "} header@%p\n", top);
+       list_for_each_entry(o, &top->loh_layers, lo_linkage) {
+               /*
+                * print `.' \a depth times followed by type name and address
+                */
+               (*printer)(env, cookie, "%*.*s%s@%p", depth, depth, ruler,
+                          o->lo_dev->ld_type->ldt_name, o);
+
+               if (o->lo_ops->loo_object_print != NULL)
+                       (*o->lo_ops->loo_object_print)(env, cookie, printer, o);
+
+               (*printer)(env, cookie, "\n");
+       }
+
+       (*printer)(env, cookie, "} header@%p\n", top);
 }
 EXPORT_SYMBOL(lu_object_print);
 
@@ -493,58 +648,60 @@ int lu_object_invariant(const struct lu_object *o)
         struct lu_object_header *top;
 
         top = o->lo_header;
-        cfs_list_for_each_entry(o, &top->loh_layers, lo_linkage) {
+       list_for_each_entry(o, &top->loh_layers, lo_linkage) {
                 if (o->lo_ops->loo_object_invariant != NULL &&
                     !o->lo_ops->loo_object_invariant(o))
                         return 0;
         }
         return 1;
 }
-EXPORT_SYMBOL(lu_object_invariant);
 
 static struct lu_object *htable_lookup(struct lu_site *s,
-                                       cfs_hash_bd_t *bd,
-                                       const struct lu_fid *f,
-                                       cfs_waitlink_t *waiter,
-                                       __u64 *version)
+                                      struct cfs_hash_bd *bd,
+                                      const struct lu_fid *f,
+                                      __u64 *version)
 {
-        struct lu_site_bkt_data *bkt;
-        struct lu_object_header *h;
-        cfs_hlist_node_t        *hnode;
-        __u64  ver = cfs_hash_bd_version_get(bd);
+       struct lu_object_header *h;
+       struct hlist_node *hnode;
+       __u64 ver = cfs_hash_bd_version_get(bd);
 
-        if (*version == ver)
-                return NULL;
+       if (*version == ver)
+               return ERR_PTR(-ENOENT);
 
-        *version = ver;
-        bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
+       *version = ver;
        /* cfs_hash_bd_peek_locked is a somehow "internal" function
         * of cfs_hash, it doesn't add refcount on object. */
        hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
-        if (hnode == NULL) {
-                lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
-                return NULL;
-        }
-
-        h = container_of0(hnode, struct lu_object_header, loh_hash);
-        if (likely(!lu_object_is_dying(h))) {
-               cfs_hash_get(s->ls_obj_hash, hnode);
-                lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
-                cfs_list_del_init(&h->loh_lru);
-                return lu_object_top(h);
-        }
-
-        /*
-         * Lookup found an object being destroyed this object cannot be
-         * returned (to assure that references to dying objects are eventually
-         * drained), and moreover, lookup has to wait until object is freed.
-         */
+       if (!hnode) {
+               lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
+               return ERR_PTR(-ENOENT);
+       }
 
-        cfs_waitlink_init(waiter);
-        cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
-        cfs_set_current_state(CFS_TASK_UNINT);
-        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
-        return ERR_PTR(-EAGAIN);
+       h = container_of0(hnode, struct lu_object_header, loh_hash);
+       if (!list_empty(&h->loh_lru)) {
+               struct lu_site_bkt_data *bkt;
+
+               bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
+               spin_lock(&bkt->lsb_waitq.lock);
+               /* Might have just been moved to the dispose list, in which
+                * case LU_OBJECT_PURGING will be set.  In that case,
+                * delete it from the hash table immediately.
+                * When lu_site_purge_objects() tried, it will find it
+                * isn't there, which is harmless.
+                */
+               if (test_bit(LU_OBJECT_PURGING, &h->loh_flags)) {
+                       spin_unlock(&bkt->lsb_waitq.lock);
+                       cfs_hash_bd_del_locked(s->ls_obj_hash, bd, hnode);
+                       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
+                       return ERR_PTR(-ENOENT);
+               }
+               list_del_init(&h->loh_lru);
+               spin_unlock(&bkt->lsb_waitq.lock);
+               percpu_counter_dec(&s->ls_lru_len_counter);
+       }
+       cfs_hash_get(s->ls_obj_hash, hnode);
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
+       return lu_object_top(h);
 }
 
 /**
@@ -560,131 +717,164 @@ struct lu_object *lu_object_find(const struct lu_env *env,
 }
 EXPORT_SYMBOL(lu_object_find);
 
-static struct lu_object *lu_object_new(const struct lu_env *env,
-                                       struct lu_device *dev,
-                                       const struct lu_fid *f,
-                                       const struct lu_object_conf *conf)
+/*
+ * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because
+ * the calculation for the number of objects to reclaim is not covered by
+ * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
+ * This ensures that many concurrent threads will not accidentally purge
+ * the entire cache.
+ */
+static void lu_object_limit(const struct lu_env *env,
+                           struct lu_device *dev)
 {
-        struct lu_object        *o;
-        cfs_hash_t              *hs;
-        cfs_hash_bd_t            bd;
-        struct lu_site_bkt_data *bkt;
+       __u64 size, nr;
 
-        o = lu_object_alloc(env, dev, f, conf);
-        if (unlikely(IS_ERR(o)))
-                return o;
+       if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
+               return;
 
-        hs = dev->ld_site->ls_obj_hash;
-        cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-        bkt = cfs_hash_bd_extra_get(hs, &bd);
-        cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-        bkt->lsb_busy++;
-        cfs_hash_bd_unlock(hs, &bd, 1);
-        return o;
+       size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
+       nr = (__u64)lu_cache_nr;
+       if (size <= nr)
+               return;
+
+       lu_site_purge_objects(env, dev->ld_site,
+                             min_t(__u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
+                             0);
 }
 
 /**
  * Core logic of lu_object_find*() functions.
+ *
+ * Much like lu_object_find(), but top level device of object is specifically
+ * \a dev rather than top level device of the site. This interface allows
+ * objects of different "stacking" to be created within the same site.
  */
-static struct lu_object *lu_object_find_try(const struct lu_env *env,
-                                            struct lu_device *dev,
-                                            const struct lu_fid *f,
-                                            const struct lu_object_conf *conf,
-                                            cfs_waitlink_t *waiter)
+struct lu_object *lu_object_find_at(const struct lu_env *env,
+                                   struct lu_device *dev,
+                                   const struct lu_fid *f,
+                                   const struct lu_object_conf *conf)
 {
-        struct lu_object      *o;
-        struct lu_object      *shadow;
-        struct lu_site        *s;
-        cfs_hash_t            *hs;
-        cfs_hash_bd_t          bd;
-        __u64                  version = 0;
+       struct lu_object *o;
+       struct lu_object *shadow;
+       struct lu_site *s;
+       struct cfs_hash *hs;
+       struct cfs_hash_bd bd;
+       struct lu_site_bkt_data *bkt;
+       __u64 version = 0;
+       int rc;
 
-        /*
-         * This uses standard index maintenance protocol:
-         *
-         *     - search index under lock, and return object if found;
-         *     - otherwise, unlock index, allocate new object;
-         *     - lock index and search again;
-         *     - if nothing is found (usual case), insert newly created
-         *       object into index;
-         *     - otherwise (race: other thread inserted object), free
-         *       object just allocated.
-         *     - unlock index;
-         *     - return object.
-         *
-         * For "LOC_F_NEW" case, we are sure the object is new established.
-         * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
-         * just alloc and insert directly.
-         *
-         * If dying object is found during index search, add @waiter to the
-         * site wait-queue and return ERR_PTR(-EAGAIN).
-         */
-        if (conf != NULL && conf->loc_flags & LOC_F_NEW)
-                return lu_object_new(env, dev, f, conf);
+       ENTRY;
 
-        s  = dev->ld_site;
-        hs = s->ls_obj_hash;
-        cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-        o = htable_lookup(s, &bd, f, waiter, &version);
-        cfs_hash_bd_unlock(hs, &bd, 1);
-        if (o != NULL)
-                return o;
+       /*
+        * This uses standard index maintenance protocol:
+        *
+        *     - search index under lock, and return object if found;
+        *     - otherwise, unlock index, allocate new object;
+        *     - lock index and search again;
+        *     - if nothing is found (usual case), insert newly created
+        *       object into index;
+        *     - otherwise (race: other thread inserted object), free
+        *       object just allocated.
+        *     - unlock index;
+        *     - return object.
+        *
+        * For "LOC_F_NEW" case, we are sure the object is new established.
+        * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
+        * just alloc and insert directly.
+        *
+        */
+       s  = dev->ld_site;
+       hs = s->ls_obj_hash;
 
-        /*
-         * Allocate new object. This may result in rather complicated
-         * operations, including fld queries, inode loading, etc.
-         */
-        o = lu_object_alloc(env, dev, f, conf);
-        if (unlikely(IS_ERR(o)))
-                return o;
+       if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
+               lu_site_purge(env, s, -1);
 
-        LASSERT(lu_fid_eq(lu_object_fid(o), f));
+       bkt = &s->ls_bkts[lu_bkt_hash(s, f)];
+       cfs_hash_bd_get(hs, f, &bd);
+       if (!(conf && conf->loc_flags & LOC_F_NEW)) {
+               cfs_hash_bd_lock(hs, &bd, 1);
+               o = htable_lookup(s, &bd, f, &version);
+               cfs_hash_bd_unlock(hs, &bd, 1);
 
-        cfs_hash_bd_lock(hs, &bd, 1);
+               if (!IS_ERR(o)) {
+                       if (likely(lu_object_is_inited(o->lo_header)))
+                               RETURN(o);
 
-        shadow = htable_lookup(s, &bd, f, waiter, &version);
-        if (likely(shadow == NULL)) {
-                struct lu_site_bkt_data *bkt;
+                       wait_event_idle(bkt->lsb_waitq,
+                                       lu_object_is_inited(o->lo_header) ||
+                                       lu_object_is_dying(o->lo_header));
 
-                bkt = cfs_hash_bd_extra_get(hs, &bd);
-                cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-                bkt->lsb_busy++;
-                cfs_hash_bd_unlock(hs, &bd, 1);
-                return o;
-        }
+                       if (lu_object_is_dying(o->lo_header)) {
+                               lu_object_put(env, o);
 
-        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
-        cfs_hash_bd_unlock(hs, &bd, 1);
-        lu_object_free(env, o);
-        return shadow;
-}
+                               RETURN(ERR_PTR(-ENOENT));
+                       }
 
-/**
- * Much like lu_object_find(), but top level device of object is specifically
- * \a dev rather than top level device of the site. This interface allows
- * objects of different "stacking" to be created within the same site.
- */
-struct lu_object *lu_object_find_at(const struct lu_env *env,
-                                    struct lu_device *dev,
-                                    const struct lu_fid *f,
-                                    const struct lu_object_conf *conf)
-{
-        struct lu_site_bkt_data *bkt;
-        struct lu_object        *obj;
-        cfs_waitlink_t           wait;
+                       RETURN(o);
+               }
 
-        while (1) {
-                obj = lu_object_find_try(env, dev, f, conf, &wait);
-                if (obj != ERR_PTR(-EAGAIN))
-                        return obj;
-                /*
-                 * lu_object_find_try() already added waiter into the
-                 * wait queue.
-                 */
-                cfs_waitq_wait(&wait, CFS_TASK_UNINT);
-                bkt = lu_site_bkt_from_fid(dev->ld_site, (void *)f);
-                cfs_waitq_del(&bkt->lsb_marche_funebre, &wait);
-        }
+               if (PTR_ERR(o) != -ENOENT)
+                       RETURN(o);
+       }
+
+       /*
+        * Allocate new object, NB, object is unitialized in case object
+        * is changed between allocation and hash insertion, thus the object
+        * with stale attributes is returned.
+        */
+       o = lu_object_alloc(env, dev, f);
+       if (IS_ERR(o))
+               RETURN(o);
+
+       LASSERT(lu_fid_eq(lu_object_fid(o), f));
+
+       CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
+
+       cfs_hash_bd_lock(hs, &bd, 1);
+
+       if (conf && conf->loc_flags & LOC_F_NEW)
+               shadow = ERR_PTR(-ENOENT);
+       else
+               shadow = htable_lookup(s, &bd, f, &version);
+       if (likely(PTR_ERR(shadow) == -ENOENT)) {
+               cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
+               cfs_hash_bd_unlock(hs, &bd, 1);
+
+               /*
+                * This may result in rather complicated operations, including
+                * fld queries, inode loading, etc.
+                */
+               rc = lu_object_start(env, dev, o, conf);
+               if (rc) {
+                       lu_object_put_nocache(env, o);
+                       RETURN(ERR_PTR(rc));
+               }
+
+               wake_up_all(&bkt->lsb_waitq);
+
+               lu_object_limit(env, dev);
+
+               RETURN(o);
+       }
+
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
+       cfs_hash_bd_unlock(hs, &bd, 1);
+       lu_object_free(env, o);
+
+       if (!(conf && conf->loc_flags & LOC_F_NEW) &&
+           !lu_object_is_inited(shadow->lo_header)) {
+               wait_event_idle(bkt->lsb_waitq,
+                               lu_object_is_inited(shadow->lo_header) ||
+                               lu_object_is_dying(shadow->lo_header));
+
+               if (lu_object_is_dying(shadow->lo_header)) {
+                       lu_object_put(env, shadow);
+
+                       RETURN(ERR_PTR(-ENOENT));
+               }
+       }
+
+       RETURN(shadow);
 }
 EXPORT_SYMBOL(lu_object_find_at);
 
@@ -696,62 +886,47 @@ struct lu_object *lu_object_find_slice(const struct lu_env *env,
                                        const struct lu_fid *f,
                                        const struct lu_object_conf *conf)
 {
-        struct lu_object *top;
-        struct lu_object *obj;
+       struct lu_object *top;
+       struct lu_object *obj;
+
+       top = lu_object_find(env, dev, f, conf);
+       if (IS_ERR(top))
+               return top;
 
-        top = lu_object_find(env, dev, f, conf);
-        if (!IS_ERR(top)) {
-                obj = lu_object_locate(top->lo_header, dev->ld_type);
-                if (obj == NULL)
-                        lu_object_put(env, top);
-        } else
-                obj = top;
-        return obj;
+       obj = lu_object_locate(top->lo_header, dev->ld_type);
+       if (unlikely(obj == NULL)) {
+               lu_object_put(env, top);
+               obj = ERR_PTR(-ENOENT);
+       }
+
+       return obj;
 }
 EXPORT_SYMBOL(lu_object_find_slice);
 
-/**
- * Global list of all device types.
- */
-static CFS_LIST_HEAD(lu_device_types);
-
 int lu_device_type_init(struct lu_device_type *ldt)
 {
        int result = 0;
 
-       CFS_INIT_LIST_HEAD(&ldt->ldt_linkage);
+       atomic_set(&ldt->ldt_device_nr, 0);
        if (ldt->ldt_ops->ldto_init)
                result = ldt->ldt_ops->ldto_init(ldt);
-       if (result == 0)
-               cfs_list_add(&ldt->ldt_linkage, &lu_device_types);
+
        return result;
 }
 EXPORT_SYMBOL(lu_device_type_init);
 
 void lu_device_type_fini(struct lu_device_type *ldt)
 {
-       cfs_list_del_init(&ldt->ldt_linkage);
        if (ldt->ldt_ops->ldto_fini)
                ldt->ldt_ops->ldto_fini(ldt);
 }
 EXPORT_SYMBOL(lu_device_type_fini);
 
-void lu_types_stop(void)
-{
-        struct lu_device_type *ldt;
-
-       cfs_list_for_each_entry(ldt, &lu_device_types, ldt_linkage) {
-               if (ldt->ldt_device_nr == 0 && ldt->ldt_ops->ldto_stop)
-                       ldt->ldt_ops->ldto_stop(ldt);
-       }
-}
-EXPORT_SYMBOL(lu_types_stop);
-
 /**
  * Global list of all sites on this node
  */
-static CFS_LIST_HEAD(lu_sites);
-static DEFINE_MUTEX(lu_sites_guard);
+static LIST_HEAD(lu_sites);
+static DECLARE_RWSEM(lu_sites_guard);
 
 /**
  * Global environment used by site shrinker.
@@ -765,24 +940,24 @@ struct lu_site_print_arg {
 };
 
 static int
-lu_site_obj_print(cfs_hash_t *hs, cfs_hash_bd_t *bd,
-                  cfs_hlist_node_t *hnode, void *data)
-{
-        struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
-        struct lu_object_header  *h;
-
-        h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
-        if (!cfs_list_empty(&h->loh_layers)) {
-                const struct lu_object *o;
-
-                o = lu_object_top(h);
-                lu_object_print(arg->lsp_env, arg->lsp_cookie,
-                                arg->lsp_printer, o);
-        } else {
-                lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
-                                       arg->lsp_printer, h);
-        }
-        return 0;
+lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+                 struct hlist_node *hnode, void *data)
+{
+       struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
+       struct lu_object_header  *h;
+
+       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
+       if (!list_empty(&h->loh_layers)) {
+               const struct lu_object *o;
+
+               o = lu_object_top(h);
+               lu_object_print(arg->lsp_env, arg->lsp_cookie,
+                               arg->lsp_printer, o);
+       } else {
+               lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
+                                      arg->lsp_printer, h);
+       }
+       return 0;
 }
 
 /**
@@ -801,22 +976,29 @@ void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
 }
 EXPORT_SYMBOL(lu_site_print);
 
-enum {
-        LU_CACHE_PERCENT_MAX     = 50,
-        LU_CACHE_PERCENT_DEFAULT = 20
-};
-
-static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
-CFS_MODULE_PARM(lu_cache_percent, "i", int, 0644,
-                "Percentage of memory to be used as lu_object cache");
-
 /**
  * Return desired hash table order.
  */
-static int lu_htable_order(void)
+static unsigned long lu_htable_order(struct lu_device *top)
 {
-        unsigned long cache_size;
-        int bits;
+       unsigned long cache_size;
+       unsigned long bits;
+       unsigned long bits_max = LU_SITE_BITS_MAX;
+
+       /*
+        * For ZFS based OSDs the cache should be disabled by default.  This
+        * allows the ZFS ARC maximum flexibility in determining what buffers
+        * to cache.  If Lustre has objects or buffer which it wants to ensure
+        * always stay cached it must maintain a hold on them.
+        */
+       if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
+               lu_cache_percent = 1;
+               lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
+               return LU_SITE_BITS_MIN;
+       }
+
+       if (strcmp(top->ld_type->ldt_name, LUSTRE_VVP_NAME) == 0)
+               bits_max = LU_SITE_BITS_MAX_CL;
 
         /*
          * Calculate hash table size, assuming that we want reasonable
@@ -825,12 +1007,12 @@ static int lu_htable_order(void)
          *
          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
          */
-        cache_size = cfs_num_physpages;
+       cache_size = cfs_totalram_pages();
 
 #if BITS_PER_LONG == 32
         /* limit hashtable size for lowmem systems to low RAM */
-        if (cache_size > 1 << (30 - CFS_PAGE_SHIFT))
-                cache_size = 1 << (30 - CFS_PAGE_SHIFT) * 3 / 4;
+       if (cache_size > 1 << (30 - PAGE_SHIFT))
+               cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
 #endif
 
         /* clear off unreasonable cache setting. */
@@ -843,75 +1025,69 @@ static int lu_htable_order(void)
                 lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
         }
         cache_size = cache_size / 100 * lu_cache_percent *
-                (CFS_PAGE_SIZE / 1024);
+               (PAGE_SIZE / 1024);
 
         for (bits = 1; (1 << bits) < cache_size; ++bits) {
                 ;
         }
-        return bits;
+
+       return clamp_t(typeof(bits), bits, LU_SITE_BITS_MIN, bits_max);
 }
 
-static unsigned lu_obj_hop_hash(cfs_hash_t *hs,
-                                const void *key, unsigned mask)
+static unsigned lu_obj_hop_hash(struct cfs_hash *hs,
+                               const void *key, unsigned mask)
 {
-        struct lu_fid  *fid = (struct lu_fid *)key;
-        __u32           hash;
+       struct lu_fid  *fid = (struct lu_fid *)key;
+       __u32           hash;
 
-        hash = fid_flatten32(fid);
-        hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
-        hash = cfs_hash_long(hash, hs->hs_bkt_bits);
+       hash = fid_flatten32(fid);
+       hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
+       hash = hash_long(hash, hs->hs_bkt_bits);
 
-        /* give me another random factor */
-        hash -= cfs_hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
+       /* give me another random factor */
+       hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
 
-        hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
-        hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
+       hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
+       hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
 
-        return hash & mask;
+       return hash & mask;
 }
 
-static void *lu_obj_hop_object(cfs_hlist_node_t *hnode)
+static void *lu_obj_hop_object(struct hlist_node *hnode)
 {
-        return cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
+       return hlist_entry(hnode, struct lu_object_header, loh_hash);
 }
 
-static void *lu_obj_hop_key(cfs_hlist_node_t *hnode)
+static void *lu_obj_hop_key(struct hlist_node *hnode)
 {
-        struct lu_object_header *h;
+       struct lu_object_header *h;
 
-        h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
-        return &h->loh_fid;
+       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
+       return &h->loh_fid;
 }
 
-static int lu_obj_hop_keycmp(const void *key, cfs_hlist_node_t *hnode)
+static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
 {
-        struct lu_object_header *h;
+       struct lu_object_header *h;
 
-        h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
-        return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
+       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
+       return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
 }
 
-static void lu_obj_hop_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
 {
-        struct lu_object_header *h;
-
-        h = cfs_hlist_entry(hnode, struct lu_object_header, loh_hash);
-        if (cfs_atomic_add_return(1, &h->loh_ref) == 1) {
-                struct lu_site_bkt_data *bkt;
-                cfs_hash_bd_t            bd;
+       struct lu_object_header *h;
 
-                cfs_hash_bd_get(hs, &h->loh_fid, &bd);
-                bkt = cfs_hash_bd_extra_get(hs, &bd);
-                bkt->lsb_busy++;
-        }
+       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
+       atomic_inc(&h->loh_ref);
 }
 
-static void lu_obj_hop_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
 {
         LBUG(); /* we should never called it */
 }
 
-cfs_hash_ops_t lu_site_hash_ops = {
+static struct cfs_hash_ops lu_site_hash_ops = {
         .hs_hash        = lu_obj_hop_hash,
         .hs_key         = lu_obj_hop_key,
         .hs_keycmp      = lu_obj_hop_keycmp,
@@ -923,8 +1099,8 @@ cfs_hash_ops_t lu_site_hash_ops = {
 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
 {
        spin_lock(&s->ls_ld_lock);
-       if (cfs_list_empty(&d->ld_linkage))
-               cfs_list_add(&d->ld_linkage, &s->ls_ld_linkage);
+       if (list_empty(&d->ld_linkage))
+               list_add(&d->ld_linkage, &s->ls_ld_linkage);
        spin_unlock(&s->ls_ld_lock);
 }
 EXPORT_SYMBOL(lu_dev_add_linkage);
@@ -932,64 +1108,79 @@ EXPORT_SYMBOL(lu_dev_add_linkage);
 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d)
 {
        spin_lock(&s->ls_ld_lock);
-       cfs_list_del_init(&d->ld_linkage);
+       list_del_init(&d->ld_linkage);
        spin_unlock(&s->ls_ld_lock);
 }
 EXPORT_SYMBOL(lu_dev_del_linkage);
 
 /**
- * Initialize site \a s, with \a d as the top level device.
- */
-#define LU_SITE_BITS_MIN    12
-#define LU_SITE_BITS_MAX    24
-/**
- * total 256 buckets, we don't want too many buckets because:
- * - consume too much memory
- * - avoid unbalanced LRU list
- */
-#define LU_SITE_BKT_BITS    8
-
+  * Initialize site \a s, with \a d as the top level device.
+  */
 int lu_site_init(struct lu_site *s, struct lu_device *top)
 {
-        struct lu_site_bkt_data *bkt;
-        cfs_hash_bd_t bd;
-        char name[16];
-        int bits;
-        int i;
-        ENTRY;
-
-        memset(s, 0, sizeof *s);
-        bits = lu_htable_order();
-        snprintf(name, 16, "lu_site_%s", top->ld_type->ldt_name);
-        for (bits = min(max(LU_SITE_BITS_MIN, bits), LU_SITE_BITS_MAX);
-             bits >= LU_SITE_BITS_MIN; bits--) {
-                s->ls_obj_hash = cfs_hash_create(name, bits, bits,
-                                                 bits - LU_SITE_BKT_BITS,
-                                                 sizeof(*bkt), 0, 0,
-                                                 &lu_site_hash_ops,
-                                                 CFS_HASH_SPIN_BKTLOCK |
-                                                 CFS_HASH_NO_ITEMREF |
-                                                 CFS_HASH_DEPTH |
-                                                 CFS_HASH_ASSERT_EMPTY);
-                if (s->ls_obj_hash != NULL)
-                        break;
-        }
+       struct lu_site_bkt_data *bkt;
+       char name[16];
+       unsigned long bits;
+       unsigned int i;
+       int rc;
+       ENTRY;
 
-        if (s->ls_obj_hash == NULL) {
-                CERROR("failed to create lu_site hash with bits: %d\n", bits);
-                return -ENOMEM;
-        }
+       memset(s, 0, sizeof *s);
+       mutex_init(&s->ls_purge_mutex);
 
-        cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
-                bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
-                CFS_INIT_LIST_HEAD(&bkt->lsb_lru);
-                cfs_waitq_init(&bkt->lsb_marche_funebre);
-        }
+#ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG
+       rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS);
+#else
+       rc = percpu_counter_init(&s->ls_lru_len_counter, 0);
+#endif
+       if (rc)
+               return -ENOMEM;
+
+       snprintf(name, sizeof(name), "lu_site_%s", top->ld_type->ldt_name);
+       for (bits = lu_htable_order(top);
+            bits >= LU_SITE_BITS_MIN; bits--) {
+               s->ls_obj_hash = cfs_hash_create(name, bits, bits,
+                                                bits - LU_SITE_BKT_BITS,
+                                                0, 0, 0,
+                                                &lu_site_hash_ops,
+                                                CFS_HASH_SPIN_BKTLOCK |
+                                                CFS_HASH_NO_ITEMREF |
+                                                CFS_HASH_DEPTH |
+                                                CFS_HASH_ASSERT_EMPTY |
+                                                CFS_HASH_COUNTER);
+               if (s->ls_obj_hash != NULL)
+                       break;
+       }
+
+       if (s->ls_obj_hash == NULL) {
+               CERROR("failed to create lu_site hash with bits: %lu\n", bits);
+               return -ENOMEM;
+       }
+
+       s->ls_bkt_seed = prandom_u32();
+       s->ls_bkt_cnt = max_t(long, 1 << LU_SITE_BKT_BITS,
+                             2 * num_possible_cpus());
+       s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt);
+       OBD_ALLOC_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*bkt));
+       if (!s->ls_bkts) {
+               cfs_hash_putref(s->ls_obj_hash);
+               s->ls_obj_hash = NULL;
+               s->ls_bkts = NULL;
+               return -ENOMEM;
+       }
+
+       for (i = 0; i < s->ls_bkt_cnt; i++) {
+               bkt = &s->ls_bkts[i];
+               INIT_LIST_HEAD(&bkt->lsb_lru);
+               init_waitqueue_head(&bkt->lsb_waitq);
+       }
 
         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
         if (s->ls_stats == NULL) {
-                cfs_hash_putref(s->ls_obj_hash);
+               OBD_FREE_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*bkt));
+               cfs_hash_putref(s->ls_obj_hash);
                 s->ls_obj_hash = NULL;
+               s->ls_bkts = NULL;
                 return -ENOMEM;
         }
 
@@ -1006,13 +1197,13 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
         lprocfs_counter_init(s->ls_stats, LU_SS_LRU_PURGED,
                              0, "lru_purged", "lru_purged");
 
-        CFS_INIT_LIST_HEAD(&s->ls_linkage);
+       INIT_LIST_HEAD(&s->ls_linkage);
         s->ls_top_dev = top;
         top->ld_site = s;
         lu_device_get(top);
         lu_ref_add(&top->ld_reference, "site-top", s);
 
-        CFS_INIT_LIST_HEAD(&s->ls_ld_linkage);
+       INIT_LIST_HEAD(&s->ls_ld_linkage);
        spin_lock_init(&s->ls_ld_lock);
 
        lu_dev_add_linkage(s, top);
@@ -1026,15 +1217,19 @@ EXPORT_SYMBOL(lu_site_init);
  */
 void lu_site_fini(struct lu_site *s)
 {
-       mutex_lock(&lu_sites_guard);
-        cfs_list_del_init(&s->ls_linkage);
-       mutex_unlock(&lu_sites_guard);
+       down_write(&lu_sites_guard);
+       list_del_init(&s->ls_linkage);
+       up_write(&lu_sites_guard);
+
+       percpu_counter_destroy(&s->ls_lru_len_counter);
 
         if (s->ls_obj_hash != NULL) {
                 cfs_hash_putref(s->ls_obj_hash);
                 s->ls_obj_hash = NULL;
         }
 
+       OBD_FREE_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*s->ls_bkts));
+
         if (s->ls_top_dev != NULL) {
                 s->ls_top_dev->ld_site = NULL;
                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
@@ -1053,11 +1248,11 @@ EXPORT_SYMBOL(lu_site_fini);
 int lu_site_init_finish(struct lu_site *s)
 {
         int result;
-       mutex_lock(&lu_sites_guard);
+       down_write(&lu_sites_guard);
         result = lu_context_refill(&lu_shrink_env.le_ctx);
         if (result == 0)
-                cfs_list_add(&s->ls_linkage, &lu_sites);
-       mutex_unlock(&lu_sites_guard);
+               list_add(&s->ls_linkage, &lu_sites);
+       up_write(&lu_sites_guard);
         return result;
 }
 EXPORT_SYMBOL(lu_site_init_finish);
@@ -1067,7 +1262,7 @@ EXPORT_SYMBOL(lu_site_init_finish);
  */
 void lu_device_get(struct lu_device *d)
 {
-        cfs_atomic_inc(&d->ld_ref);
+       atomic_inc(&d->ld_ref);
 }
 EXPORT_SYMBOL(lu_device_get);
 
@@ -1076,8 +1271,8 @@ EXPORT_SYMBOL(lu_device_get);
  */
 void lu_device_put(struct lu_device *d)
 {
-        LASSERT(cfs_atomic_read(&d->ld_ref) > 0);
-        cfs_atomic_dec(&d->ld_ref);
+       LASSERT(atomic_read(&d->ld_ref) > 0);
+       atomic_dec(&d->ld_ref);
 }
 EXPORT_SYMBOL(lu_device_put);
 
@@ -1086,14 +1281,16 @@ EXPORT_SYMBOL(lu_device_put);
  */
 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
 {
-        if (t->ldt_device_nr++ == 0 && t->ldt_ops->ldto_start != NULL)
-                t->ldt_ops->ldto_start(t);
-        memset(d, 0, sizeof *d);
-        cfs_atomic_set(&d->ld_ref, 0);
-        d->ld_type = t;
-        lu_ref_init(&d->ld_reference);
-        CFS_INIT_LIST_HEAD(&d->ld_linkage);
-        return 0;
+       if (atomic_inc_return(&t->ldt_device_nr) == 1 &&
+           t->ldt_ops->ldto_start != NULL)
+               t->ldt_ops->ldto_start(t);
+
+       memset(d, 0, sizeof *d);
+       d->ld_type = t;
+       lu_ref_init(&d->ld_reference);
+       INIT_LIST_HEAD(&d->ld_linkage);
+
+       return 0;
 }
 EXPORT_SYMBOL(lu_device_init);
 
@@ -1102,20 +1299,21 @@ EXPORT_SYMBOL(lu_device_init);
  */
 void lu_device_fini(struct lu_device *d)
 {
-        struct lu_device_type *t;
+       struct lu_device_type *t = d->ld_type;
 
-        t = d->ld_type;
-        if (d->ld_obd != NULL) {
-                d->ld_obd->obd_lu_dev = NULL;
-                d->ld_obd = NULL;
-        }
+       if (d->ld_obd != NULL) {
+               d->ld_obd->obd_lu_dev = NULL;
+               d->ld_obd = NULL;
+       }
 
-        lu_ref_fini(&d->ld_reference);
-        LASSERTF(cfs_atomic_read(&d->ld_ref) == 0,
-                 "Refcount is %u\n", cfs_atomic_read(&d->ld_ref));
-        LASSERT(t->ldt_device_nr > 0);
-        if (--t->ldt_device_nr == 0 && t->ldt_ops->ldto_stop != NULL)
-                t->ldt_ops->ldto_stop(t);
+       lu_ref_fini(&d->ld_reference);
+       LASSERTF(atomic_read(&d->ld_ref) == 0,
+                "Refcount is %u\n", atomic_read(&d->ld_ref));
+       LASSERT(atomic_read(&t->ldt_device_nr) > 0);
+
+       if (atomic_dec_and_test(&t->ldt_device_nr) &&
+           t->ldt_ops->ldto_stop != NULL)
+               t->ldt_ops->ldto_stop(t);
 }
 EXPORT_SYMBOL(lu_device_fini);
 
@@ -1123,16 +1321,17 @@ EXPORT_SYMBOL(lu_device_fini);
  * Initialize object \a o that is part of compound object \a h and was created
  * by device \a d.
  */
-int lu_object_init(struct lu_object *o,
-                   struct lu_object_header *h, struct lu_device *d)
+int lu_object_init(struct lu_object *o, struct lu_object_header *h,
+                  struct lu_device *d)
 {
-        memset(o, 0, sizeof *o);
-        o->lo_header = h;
-        o->lo_dev    = d;
-        lu_device_get(d);
-        o->lo_dev_ref = lu_ref_add(&d->ld_reference, "lu_object", o);
-        CFS_INIT_LIST_HEAD(&o->lo_linkage);
-        return 0;
+       memset(o, 0, sizeof(*o));
+       o->lo_header = h;
+       o->lo_dev = d;
+       lu_device_get(d);
+       lu_ref_add_at(&d->ld_reference, &o->lo_dev_ref, "lu_object", o);
+       INIT_LIST_HEAD(&o->lo_linkage);
+
+       return 0;
 }
 EXPORT_SYMBOL(lu_object_init);
 
@@ -1141,16 +1340,16 @@ EXPORT_SYMBOL(lu_object_init);
  */
 void lu_object_fini(struct lu_object *o)
 {
-        struct lu_device *dev = o->lo_dev;
+       struct lu_device *dev = o->lo_dev;
 
-        LASSERT(cfs_list_empty(&o->lo_linkage));
+       LASSERT(list_empty(&o->lo_linkage));
 
-        if (dev != NULL) {
-                lu_ref_del_at(&dev->ld_reference,
-                              o->lo_dev_ref , "lu_object", o);
-                lu_device_put(dev);
-                o->lo_dev = NULL;
-        }
+       if (dev != NULL) {
+               lu_ref_del_at(&dev->ld_reference, &o->lo_dev_ref,
+                             "lu_object", o);
+               lu_device_put(dev);
+               o->lo_dev = NULL;
+       }
 }
 EXPORT_SYMBOL(lu_object_fini);
 
@@ -1162,7 +1361,7 @@ EXPORT_SYMBOL(lu_object_fini);
  */
 void lu_object_add_top(struct lu_object_header *h, struct lu_object *o)
 {
-        cfs_list_move(&o->lo_linkage, &h->loh_layers);
+       list_move(&o->lo_linkage, &h->loh_layers);
 }
 EXPORT_SYMBOL(lu_object_add_top);
 
@@ -1174,7 +1373,7 @@ EXPORT_SYMBOL(lu_object_add_top);
  */
 void lu_object_add(struct lu_object *before, struct lu_object *o)
 {
-        cfs_list_move(&o->lo_linkage, &before->lo_linkage);
+       list_move(&o->lo_linkage, &before->lo_linkage);
 }
 EXPORT_SYMBOL(lu_object_add);
 
@@ -1184,10 +1383,10 @@ EXPORT_SYMBOL(lu_object_add);
 int lu_object_header_init(struct lu_object_header *h)
 {
         memset(h, 0, sizeof *h);
-        cfs_atomic_set(&h->loh_ref, 1);
-        CFS_INIT_HLIST_NODE(&h->loh_hash);
-        CFS_INIT_LIST_HEAD(&h->loh_lru);
-        CFS_INIT_LIST_HEAD(&h->loh_layers);
+       atomic_set(&h->loh_ref, 1);
+       INIT_HLIST_NODE(&h->loh_hash);
+       INIT_LIST_HEAD(&h->loh_lru);
+       INIT_LIST_HEAD(&h->loh_layers);
         lu_ref_init(&h->loh_reference);
         return 0;
 }
@@ -1198,9 +1397,9 @@ EXPORT_SYMBOL(lu_object_header_init);
  */
 void lu_object_header_fini(struct lu_object_header *h)
 {
-        LASSERT(cfs_list_empty(&h->loh_layers));
-        LASSERT(cfs_list_empty(&h->loh_lru));
-        LASSERT(cfs_hlist_unhashed(&h->loh_hash));
+       LASSERT(list_empty(&h->loh_layers));
+       LASSERT(list_empty(&h->loh_lru));
+       LASSERT(hlist_unhashed(&h->loh_hash));
         lu_ref_fini(&h->loh_reference);
 }
 EXPORT_SYMBOL(lu_object_header_fini);
@@ -1212,18 +1411,16 @@ EXPORT_SYMBOL(lu_object_header_fini);
 struct lu_object *lu_object_locate(struct lu_object_header *h,
                                    const struct lu_device_type *dtype)
 {
-        struct lu_object *o;
+       struct lu_object *o;
 
-        cfs_list_for_each_entry(o, &h->loh_layers, lo_linkage) {
-                if (o->lo_dev->ld_type == dtype)
-                        return o;
-        }
-        return NULL;
+       list_for_each_entry(o, &h->loh_layers, lo_linkage) {
+               if (o->lo_dev->ld_type == dtype)
+                       return o;
+       }
+       return NULL;
 }
 EXPORT_SYMBOL(lu_object_locate);
 
-
-
 /**
  * Finalize and free devices in the device stack.
  *
@@ -1249,17 +1446,10 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
 
         for (scan = top; scan != NULL; scan = next) {
                 const struct lu_device_type *ldt = scan->ld_type;
-                struct obd_type             *type;
 
                 next = ldt->ldt_ops->ldto_device_free(env, scan);
-                type = ldt->ldt_obd_type;
-                if (type != NULL) {
-                        type->typ_refcnt--;
-                        class_put_type(type);
-                }
         }
 }
-EXPORT_SYMBOL(lu_stack_fini);
 
 enum {
         /**
@@ -1270,7 +1460,7 @@ enum {
 
 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
 
-static DEFINE_SPINLOCK(lu_keys_guard);
+static DECLARE_RWSEM(lu_key_initing);
 
 /**
  * Global counter incremented whenever key is registered, unregistered,
@@ -1278,15 +1468,15 @@ static DEFINE_SPINLOCK(lu_keys_guard);
  * lu_context_refill(). No locking is provided, as initialization and shutdown
  * are supposed to be externally serialized.
  */
-static unsigned key_set_version = 0;
+static atomic_t key_set_version = ATOMIC_INIT(0);
 
 /**
  * Register new key.
  */
 int lu_context_key_register(struct lu_context_key *key)
 {
-        int result;
-        int i;
+       int result;
+       unsigned int i;
 
         LASSERT(key->lct_init != NULL);
         LASSERT(key->lct_fini != NULL);
@@ -1294,19 +1484,23 @@ int lu_context_key_register(struct lu_context_key *key)
         LASSERT(key->lct_owner != NULL);
 
         result = -ENFILE;
-       spin_lock(&lu_keys_guard);
+       atomic_set(&key->lct_used, 1);
+       lu_ref_init(&key->lct_reference);
         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                if (lu_keys[i] == NULL) {
-                        key->lct_index = i;
-                        cfs_atomic_set(&key->lct_used, 1);
-                        lu_keys[i] = key;
-                        lu_ref_init(&key->lct_reference);
-                        result = 0;
-                        ++key_set_version;
-                        break;
-                }
+               if (lu_keys[i])
+                       continue;
+               key->lct_index = i;
+               if (cmpxchg(&lu_keys[i], NULL, key) != NULL)
+                       continue;
+
+               result = 0;
+               atomic_inc(&key_set_version);
+               break;
         }
-       spin_unlock(&lu_keys_guard);
+       if (result) {
+               lu_ref_fini(&key->lct_reference);
+               atomic_set(&key->lct_used, 0);
+       }
        return result;
 }
 EXPORT_SYMBOL(lu_context_key_register);
@@ -1319,16 +1513,17 @@ static void key_fini(struct lu_context *ctx, int index)
                 key = lu_keys[index];
                 LASSERT(key != NULL);
                 LASSERT(key->lct_fini != NULL);
-                LASSERT(cfs_atomic_read(&key->lct_used) > 1);
+               LASSERT(atomic_read(&key->lct_used) > 0);
 
                 key->lct_fini(ctx, key, ctx->lc_value[index]);
                 lu_ref_del(&key->lct_reference, "ctx", ctx);
-                cfs_atomic_dec(&key->lct_used);
+               if (atomic_dec_and_test(&key->lct_used))
+                       wake_up_var(&key->lct_used);
 
                LASSERT(key->lct_owner != NULL);
                if ((ctx->lc_tags & LCT_NOREF) == 0) {
-                       LINVRNT(cfs_module_refcount(key->lct_owner) > 0);
-                       cfs_module_put(key->lct_owner);
+                       LINVRNT(module_refcount(key->lct_owner) > 0);
+                       module_put(key->lct_owner);
                }
                ctx->lc_value[index] = NULL;
        }
@@ -1339,23 +1534,24 @@ static void key_fini(struct lu_context *ctx, int index)
  */
 void lu_context_key_degister(struct lu_context_key *key)
 {
-       LASSERT(cfs_atomic_read(&key->lct_used) >= 1);
+       LASSERT(atomic_read(&key->lct_used) >= 1);
        LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
 
        lu_context_key_quiesce(key);
 
-       ++key_set_version;
-       spin_lock(&lu_keys_guard);
        key_fini(&lu_shrink_env.le_ctx, key->lct_index);
-       if (lu_keys[key->lct_index]) {
-               lu_keys[key->lct_index] = NULL;
+
+       /**
+        * Wait until all transient contexts referencing this key have
+        * run lu_context_key::lct_fini() method.
+        */
+       atomic_dec(&key->lct_used);
+       wait_var_event(&key->lct_used, atomic_read(&key->lct_used) == 0);
+
+       if (!WARN_ON(lu_keys[key->lct_index] == NULL))
                lu_ref_fini(&key->lct_reference);
-       }
-       spin_unlock(&lu_keys_guard);
 
-       LASSERTF(cfs_atomic_read(&key->lct_used) == 1,
-                "key has instances: %d\n",
-                cfs_atomic_read(&key->lct_used));
+       smp_store_release(&lu_keys[key->lct_index], NULL);
 }
 EXPORT_SYMBOL(lu_context_key_degister);
 
@@ -1456,7 +1652,8 @@ EXPORT_SYMBOL(lu_context_key_get);
 /**
  * List of remembered contexts. XXX document me.
  */
-static CFS_LIST_HEAD(lu_context_remembered);
+static LIST_HEAD(lu_context_remembered);
+static DEFINE_SPINLOCK(lu_context_remembered_guard);
 
 /**
  * Destroy \a key in all remembered contexts. This is used to destroy key
@@ -1465,38 +1662,37 @@ static CFS_LIST_HEAD(lu_context_remembered);
  */
 void lu_context_key_quiesce(struct lu_context_key *key)
 {
-        struct lu_context *ctx;
-        extern unsigned cl_env_cache_purge(unsigned nr);
+       struct lu_context *ctx;
 
-        if (!(key->lct_tags & LCT_QUIESCENT)) {
-                /*
-                 * XXX layering violation.
-                 */
-                cl_env_cache_purge(~0);
-                key->lct_tags |= LCT_QUIESCENT;
+       if (!(key->lct_tags & LCT_QUIESCENT)) {
                 /*
-                 * XXX memory barrier has to go here.
+                * The write-lock on lu_key_initing will ensure that any
+                * keys_fill() which didn't see LCT_QUIESCENT will have
+                * finished before we call key_fini().
                  */
-               spin_lock(&lu_keys_guard);
-               cfs_list_for_each_entry(ctx, &lu_context_remembered,
-                                       lc_remember)
+               down_write(&lu_key_initing);
+               key->lct_tags |= LCT_QUIESCENT;
+               up_write(&lu_key_initing);
+
+               spin_lock(&lu_context_remembered_guard);
+               list_for_each_entry(ctx, &lu_context_remembered, lc_remember) {
+                       spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING);
                        key_fini(ctx, key->lct_index);
-               spin_unlock(&lu_keys_guard);
-               ++key_set_version;
+               }
+
+               spin_unlock(&lu_context_remembered_guard);
        }
 }
-EXPORT_SYMBOL(lu_context_key_quiesce);
 
 void lu_context_key_revive(struct lu_context_key *key)
 {
-        key->lct_tags &= ~LCT_QUIESCENT;
-        ++key_set_version;
+       key->lct_tags &= ~LCT_QUIESCENT;
+       atomic_inc(&key_set_version);
 }
-EXPORT_SYMBOL(lu_context_key_revive);
 
 static void keys_fini(struct lu_context *ctx)
 {
-       int     i;
+       unsigned int i;
 
        if (ctx->lc_value == NULL)
                return;
@@ -1510,46 +1706,64 @@ static void keys_fini(struct lu_context *ctx)
 
 static int keys_fill(struct lu_context *ctx)
 {
-        int i;
+       unsigned int i;
+       int rc = 0;
 
-        LINVRNT(ctx->lc_value != NULL);
-        for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                struct lu_context_key *key;
+       /*
+        * A serialisation with lu_context_key_quiesce() is needed, to
+        * ensure we see LCT_QUIESCENT and don't allocate a new value
+        * after it freed one.  The rwsem provides this.  As down_read()
+        * does optimistic spinning while the writer is active, this is
+        * unlikely to ever sleep.
+        */
+       down_read(&lu_key_initing);
+       ctx->lc_version = atomic_read(&key_set_version);
+
+       LINVRNT(ctx->lc_value);
+       for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
+               struct lu_context_key *key;
+
+               key = lu_keys[i];
+               if (!ctx->lc_value[i] && key &&
+                   (key->lct_tags & ctx->lc_tags) &&
+                   /*
+                    * Don't create values for a LCT_QUIESCENT key, as this
+                    * will pin module owning a key.
+                    */
+                   !(key->lct_tags & LCT_QUIESCENT)) {
+                       void *value;
+
+                       LINVRNT(key->lct_init != NULL);
+                       LINVRNT(key->lct_index == i);
+
+                       LASSERT(key->lct_owner != NULL);
+                       if (!(ctx->lc_tags & LCT_NOREF) &&
+                           try_module_get(key->lct_owner) == 0) {
+                               /* module is unloading, skip this key */
+                               continue;
+                       }
+
+                       value = key->lct_init(ctx, key);
+                       if (unlikely(IS_ERR(value))) {
+                               rc = PTR_ERR(value);
+                               break;
+                       }
+
+                       lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
+                       atomic_inc(&key->lct_used);
+                       /*
+                        * This is the only place in the code, where an
+                        * element of ctx->lc_value[] array is set to non-NULL
+                        * value.
+                        */
+                       ctx->lc_value[i] = value;
+                       if (key->lct_exit != NULL)
+                               ctx->lc_tags |= LCT_HAS_EXIT;
+               }
+       }
 
-                key = lu_keys[i];
-                if (ctx->lc_value[i] == NULL && key != NULL &&
-                    (key->lct_tags & ctx->lc_tags) &&
-                    /*
-                     * Don't create values for a LCT_QUIESCENT key, as this
-                     * will pin module owning a key.
-                     */
-                    !(key->lct_tags & LCT_QUIESCENT)) {
-                        void *value;
-
-                        LINVRNT(key->lct_init != NULL);
-                        LINVRNT(key->lct_index == i);
-
-                        value = key->lct_init(ctx, key);
-                        if (unlikely(IS_ERR(value)))
-                                return PTR_ERR(value);
-
-                        LASSERT(key->lct_owner != NULL);
-                        if (!(ctx->lc_tags & LCT_NOREF))
-                                cfs_try_module_get(key->lct_owner);
-                        lu_ref_add_atomic(&key->lct_reference, "ctx", ctx);
-                        cfs_atomic_inc(&key->lct_used);
-                        /*
-                         * This is the only place in the code, where an
-                         * element of ctx->lc_value[] array is set to non-NULL
-                         * value.
-                         */
-                        ctx->lc_value[i] = value;
-                        if (key->lct_exit != NULL)
-                                ctx->lc_tags |= LCT_HAS_EXIT;
-                }
-                ctx->lc_version = key_set_version;
-        }
-        return 0;
+       up_read(&lu_key_initing);
+       return rc;
 }
 
 static int keys_init(struct lu_context *ctx)
@@ -1572,11 +1786,11 @@ int lu_context_init(struct lu_context *ctx, __u32 tags)
        ctx->lc_state = LCS_INITIALIZED;
        ctx->lc_tags = tags;
        if (tags & LCT_REMEMBER) {
-               spin_lock(&lu_keys_guard);
-               cfs_list_add(&ctx->lc_remember, &lu_context_remembered);
-               spin_unlock(&lu_keys_guard);
+               spin_lock(&lu_context_remembered_guard);
+               list_add(&ctx->lc_remember, &lu_context_remembered);
+               spin_unlock(&lu_context_remembered_guard);
        } else {
-               CFS_INIT_LIST_HEAD(&ctx->lc_remember);
+               INIT_LIST_HEAD(&ctx->lc_remember);
        }
 
        rc = keys_init(ctx);
@@ -1596,15 +1810,14 @@ void lu_context_fini(struct lu_context *ctx)
        ctx->lc_state = LCS_FINALIZED;
 
        if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
-               LASSERT(cfs_list_empty(&ctx->lc_remember));
-               keys_fini(ctx);
-
-       } else { /* could race with key degister */
-               spin_lock(&lu_keys_guard);
-               keys_fini(ctx);
-               cfs_list_del_init(&ctx->lc_remember);
-               spin_unlock(&lu_keys_guard);
+               LASSERT(list_empty(&ctx->lc_remember));
+       } else {
+               /* could race with key degister */
+               spin_lock(&lu_context_remembered_guard);
+               list_del_init(&ctx->lc_remember);
+               spin_unlock(&lu_context_remembered_guard);
        }
+       keys_fini(ctx);
 }
 EXPORT_SYMBOL(lu_context_fini);
 
@@ -1623,23 +1836,37 @@ EXPORT_SYMBOL(lu_context_enter);
  */
 void lu_context_exit(struct lu_context *ctx)
 {
-        int i;
+       unsigned int i;
 
-        LINVRNT(ctx->lc_state == LCS_ENTERED);
-        ctx->lc_state = LCS_LEFT;
-        if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
+       LINVRNT(ctx->lc_state == LCS_ENTERED);
+       /*
+        * Disable preempt to ensure we get a warning if
+        * any lct_exit ever tries to sleep.  That would hurt
+        * lu_context_key_quiesce() which spins waiting for us.
+        * This also ensure we aren't preempted while the state
+        * is LCS_LEAVING, as that too would cause problems for
+        * lu_context_key_quiesce().
+        */
+       preempt_disable();
+       /*
+        * Ensure lu_context_key_quiesce() sees LCS_LEAVING
+        * or we see LCT_QUIESCENT
+        */
+       smp_store_mb(ctx->lc_state, LCS_LEAVING);
+       if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) {
                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                        if (ctx->lc_value[i] != NULL) {
-                                struct lu_context_key *key;
-
-                                key = lu_keys[i];
-                                LASSERT(key != NULL);
-                                if (key->lct_exit != NULL)
-                                        key->lct_exit(ctx,
-                                                      key, ctx->lc_value[i]);
-                        }
-                }
+                       struct lu_context_key *key;
+
+                       key = lu_keys[i];
+                       if (ctx->lc_value[i] &&
+                           !(key->lct_tags & LCT_QUIESCENT) &&
+                           key->lct_exit)
+                               key->lct_exit(ctx, key, ctx->lc_value[i]);
+               }
         }
+
+       smp_store_release(&ctx->lc_state, LCS_LEFT);
+       preempt_enable();
 }
 EXPORT_SYMBOL(lu_context_exit);
 
@@ -1650,9 +1877,11 @@ EXPORT_SYMBOL(lu_context_exit);
  */
 int lu_context_refill(struct lu_context *ctx)
 {
-        return likely(ctx->lc_version == key_set_version) ? 0 : keys_fill(ctx);
+       if (likely(ctx->lc_version == atomic_read(&key_set_version)))
+               return 0;
+
+       return keys_fill(ctx);
 }
-EXPORT_SYMBOL(lu_context_refill);
 
 /**
  * lu_ctx_tags/lu_ses_tags will be updated if there are new types of
@@ -1661,42 +1890,42 @@ EXPORT_SYMBOL(lu_context_refill);
  * predefined when the lu_device type are registered, during the module probe
  * phase.
  */
-__u32 lu_context_tags_default = 0;
-__u32 lu_session_tags_default = 0;
+u32 lu_context_tags_default = LCT_CL_THREAD;
+u32 lu_session_tags_default = LCT_SESSION;
 
 void lu_context_tags_update(__u32 tags)
 {
-       spin_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_context_tags_default |= tags;
-       key_set_version++;
-       spin_unlock(&lu_keys_guard);
+       atomic_inc(&key_set_version);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_context_tags_update);
 
 void lu_context_tags_clear(__u32 tags)
 {
-       spin_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_context_tags_default &= ~tags;
-       key_set_version++;
-       spin_unlock(&lu_keys_guard);
+       atomic_inc(&key_set_version);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_context_tags_clear);
 
 void lu_session_tags_update(__u32 tags)
 {
-       spin_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_session_tags_default |= tags;
-       key_set_version++;
-       spin_unlock(&lu_keys_guard);
+       atomic_inc(&key_set_version);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_session_tags_update);
 
 void lu_session_tags_clear(__u32 tags)
 {
-       spin_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_session_tags_default &= ~tags;
-       key_set_version++;
-       spin_unlock(&lu_keys_guard);
+       atomic_inc(&key_set_version);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_session_tags_clear);
 
@@ -1759,7 +1988,114 @@ int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
 }
 EXPORT_SYMBOL(lu_env_refill_by_tags);
 
-static struct cfs_shrinker *lu_site_shrinker = NULL;
+
+struct lu_env_item {
+       struct task_struct *lei_task;   /* rhashtable key */
+       struct rhash_head lei_linkage;
+       struct lu_env *lei_env;
+       struct rcu_head lei_rcu_head;
+};
+
+static const struct rhashtable_params lu_env_rhash_params = {
+       .key_len     = sizeof(struct task_struct *),
+       .key_offset  = offsetof(struct lu_env_item, lei_task),
+       .head_offset = offsetof(struct lu_env_item, lei_linkage),
+    };
+
+struct rhashtable lu_env_rhash;
+
+struct lu_env_percpu {
+       struct task_struct *lep_task;
+       struct lu_env *lep_env ____cacheline_aligned_in_smp;
+};
+
+static struct lu_env_percpu lu_env_percpu[NR_CPUS];
+
+int lu_env_add(struct lu_env *env)
+{
+       struct lu_env_item *lei, *old;
+
+       LASSERT(env);
+
+       OBD_ALLOC_PTR(lei);
+       if (!lei)
+               return -ENOMEM;
+
+       lei->lei_task = current;
+       lei->lei_env = env;
+
+       old = rhashtable_lookup_get_insert_fast(&lu_env_rhash,
+                                               &lei->lei_linkage,
+                                               lu_env_rhash_params);
+       LASSERT(!old);
+
+       return 0;
+}
+EXPORT_SYMBOL(lu_env_add);
+
+static void lu_env_item_free(struct rcu_head *head)
+{
+       struct lu_env_item *lei;
+
+       lei = container_of(head, struct lu_env_item, lei_rcu_head);
+       OBD_FREE_PTR(lei);
+}
+
+void lu_env_remove(struct lu_env *env)
+{
+       struct lu_env_item *lei;
+       const void *task = current;
+       int i;
+
+       for_each_possible_cpu(i) {
+               if (lu_env_percpu[i].lep_env == env) {
+                       LASSERT(lu_env_percpu[i].lep_task == task);
+                       lu_env_percpu[i].lep_task = NULL;
+                       lu_env_percpu[i].lep_env = NULL;
+               }
+       }
+
+       /* The rcu_lock is not taking in this case since the key
+        * used is the actual task_struct. This implies that each
+        * object is only removed by the owning thread, so there
+        * can never be a race on a particular object.
+        */
+       lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
+                                    lu_env_rhash_params);
+       if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage,
+                                         lu_env_rhash_params) == 0)
+               call_rcu(&lei->lei_rcu_head, lu_env_item_free);
+}
+EXPORT_SYMBOL(lu_env_remove);
+
+struct lu_env *lu_env_find(void)
+{
+       struct lu_env *env = NULL;
+       struct lu_env_item *lei;
+       const void *task = current;
+       int i = get_cpu();
+
+       if (lu_env_percpu[i].lep_task == current) {
+               env = lu_env_percpu[i].lep_env;
+               put_cpu();
+               LASSERT(env);
+               return env;
+       }
+
+       lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
+                                    lu_env_rhash_params);
+       if (lei) {
+               env = lei->lei_env;
+               lu_env_percpu[i].lep_task = current;
+               lu_env_percpu[i].lep_env = env;
+       }
+       put_cpu();
+
+       return env;
+}
+EXPORT_SYMBOL(lu_env_find);
+
+static struct shrinker *lu_site_shrinker;
 
 typedef struct lu_site_stats{
         unsigned        lss_populated;
@@ -1768,37 +2104,102 @@ typedef struct lu_site_stats{
         unsigned        lss_busy;
 } lu_site_stats_t;
 
-static void lu_site_stats_get(cfs_hash_t *hs,
-                              lu_site_stats_t *stats, int populated)
+static void lu_site_stats_get(const struct lu_site *s,
+                             lu_site_stats_t *stats)
 {
-        cfs_hash_bd_t bd;
-        int           i;
-
-        cfs_hash_for_each_bucket(hs, &bd, i) {
-                struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
-                cfs_hlist_head_t        *hhead;
+       int cnt = cfs_hash_size_get(s->ls_obj_hash);
+       /*
+        * percpu_counter_sum_positive() won't accept a const pointer
+        * as it does modify the struct by taking a spinlock
+        */
+       struct lu_site *s2 = (struct lu_site *)s;
 
-                cfs_hash_bd_lock(hs, &bd, 1);
-                stats->lss_busy  += bkt->lsb_busy;
-                stats->lss_total += cfs_hash_bd_count_get(&bd);
-                stats->lss_max_search = max((int)stats->lss_max_search,
-                                            cfs_hash_bd_depmax_get(&bd));
-                if (!populated) {
-                        cfs_hash_bd_unlock(hs, &bd, 1);
-                        continue;
-                }
+       stats->lss_busy += cnt -
+               percpu_counter_sum_positive(&s2->ls_lru_len_counter);
 
-                cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
-                        if (!cfs_hlist_empty(hhead))
-                                stats->lss_populated++;
-                }
-                cfs_hash_bd_unlock(hs, &bd, 1);
-        }
+       stats->lss_total += cnt;
+       stats->lss_max_search = 0;
+       stats->lss_populated = 0;
 }
 
-#ifdef __KERNEL__
 
 /*
+ * lu_cache_shrink_count() returns an approximate number of cached objects
+ * that can be freed by shrink_slab(). A counter, which tracks the
+ * number of items in the site's lru, is maintained in a percpu_counter
+ * for each site. The percpu values are incremented and decremented as
+ * objects are added or removed from the lru. The percpu values are summed
+ * and saved whenever a percpu value exceeds a threshold. Thus the saved,
+ * summed value at any given time may not accurately reflect the current
+ * lru length. But this value is sufficiently accurate for the needs of
+ * a shrinker.
+ *
+ * Using a per cpu counter is a compromise solution to concurrent access:
+ * lu_object_put() can update the counter without locking the site and
+ * lu_cache_shrink_count can sum the counters without locking each
+ * ls_obj_hash bucket.
+ */
+static unsigned long lu_cache_shrink_count(struct shrinker *sk,
+                                          struct shrink_control *sc)
+{
+       struct lu_site *s;
+       struct lu_site *tmp;
+       unsigned long cached = 0;
+
+       if (!(sc->gfp_mask & __GFP_FS))
+               return 0;
+
+       down_read(&lu_sites_guard);
+       list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage)
+               cached += percpu_counter_read_positive(&s->ls_lru_len_counter);
+       up_read(&lu_sites_guard);
+
+       cached = (cached / 100) * sysctl_vfs_cache_pressure;
+       CDEBUG(D_INODE, "%ld objects cached, cache pressure %d\n",
+              cached, sysctl_vfs_cache_pressure);
+
+       return cached;
+}
+
+static unsigned long lu_cache_shrink_scan(struct shrinker *sk,
+                                         struct shrink_control *sc)
+{
+       struct lu_site *s;
+       struct lu_site *tmp;
+       unsigned long remain = sc->nr_to_scan;
+       LIST_HEAD(splice);
+
+       if (!(sc->gfp_mask & __GFP_FS))
+               /* We must not take the lu_sites_guard lock when
+                * __GFP_FS is *not* set because of the deadlock
+                * possibility detailed above. Additionally,
+                * since we cannot determine the number of
+                * objects in the cache without taking this
+                * lock, we're in a particularly tough spot. As
+                * a result, we'll just lie and say our cache is
+                * empty. This _should_ be ok, as we can't
+                * reclaim objects when __GFP_FS is *not* set
+                * anyways.
+                */
+               return SHRINK_STOP;
+
+       down_write(&lu_sites_guard);
+       list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
+               remain = lu_site_purge(&lu_shrink_env, s, remain);
+               /*
+                * Move just shrunk site to the tail of site list to
+                * assure shrinking fairness.
+                */
+               list_move_tail(&s->ls_linkage, &splice);
+       }
+       list_splice(&splice, lu_sites.prev);
+       up_write(&lu_sites_guard);
+
+       return sc->nr_to_scan - remain;
+}
+
+#ifndef HAVE_SHRINKER_COUNT
+/*
  * There exists a potential lock inversion deadlock scenario when using
  * Lustre on top of ZFS. This occurs between one of ZFS's
  * buf_hash_table.ht_lock's, and Lustre's lu_sites_guard lock. Essentially,
@@ -1813,64 +2214,29 @@ static void lu_site_stats_get(cfs_hash_t *hs,
  * is safe to take the lu_sites_guard lock.
  *
  * Ideally we should accurately return the remaining number of cached
- * objects without taking the  lu_sites_guard lock, but this is not
+ * objects without taking the lu_sites_guard lock, but this is not
  * possible in the current implementation.
  */
 static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
 {
-        lu_site_stats_t stats;
-        struct lu_site *s;
-        struct lu_site *tmp;
         int cached = 0;
-        int remain = shrink_param(sc, nr_to_scan);
-        CFS_LIST_HEAD(splice);
-
-       if (!(shrink_param(sc, gfp_mask) & __GFP_FS)) {
-               if (remain != 0)
-                        return -1;
-               else
-                       /* We must not take the lu_sites_guard lock when
-                        * __GFP_FS is *not* set because of the deadlock
-                        * possibility detailed above. Additionally,
-                        * since we cannot determine the number of
-                        * objects in the cache without taking this
-                        * lock, we're in a particularly tough spot. As
-                        * a result, we'll just lie and say our cache is
-                        * empty. This _should_ be ok, as we can't
-                        * reclaim objects when __GFP_FS is *not* set
-                        * anyways.
-                        */
-                       return 0;
-        }
+       struct shrink_control scv = {
+                .nr_to_scan = shrink_param(sc, nr_to_scan),
+                .gfp_mask   = shrink_param(sc, gfp_mask)
+       };
 
-       CDEBUG(D_INODE, "Shrink %d objects\n", remain);
-
-       mutex_lock(&lu_sites_guard);
-        cfs_list_for_each_entry_safe(s, tmp, &lu_sites, ls_linkage) {
-                if (shrink_param(sc, nr_to_scan) != 0) {
-                        remain = lu_site_purge(&lu_shrink_env, s, remain);
-                        /*
-                         * Move just shrunk site to the tail of site list to
-                         * assure shrinking fairness.
-                         */
-                        cfs_list_move_tail(&s->ls_linkage, &splice);
-                }
+       CDEBUG(D_INODE, "Shrink %lu objects\n", scv.nr_to_scan);
 
-                memset(&stats, 0, sizeof(stats));
-                lu_site_stats_get(s->ls_obj_hash, &stats, 0);
-                cached += stats.lss_total - stats.lss_busy;
-                if (shrink_param(sc, nr_to_scan) && remain <= 0)
-                        break;
-        }
-        cfs_list_splice(&splice, lu_sites.prev);
-       mutex_unlock(&lu_sites_guard);
+       if (scv.nr_to_scan != 0)
+               lu_cache_shrink_scan(shrinker, &scv);
 
-        cached = (cached / 100) * sysctl_vfs_cache_pressure;
-        if (shrink_param(sc, nr_to_scan) == 0)
-                CDEBUG(D_INODE, "%d objects cached\n", cached);
-        return cached;
+       cached = lu_cache_shrink_count(shrinker, &scv);
+       return cached;
 }
 
+#endif /* HAVE_SHRINKER_COUNT */
+
+
 /*
  * Debugging stuff.
  */
@@ -1878,13 +2244,13 @@ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
 /**
  * Environment to be used in debugger, contains all tags.
  */
-struct lu_env lu_debugging_env;
+static struct lu_env lu_debugging_env;
 
 /**
  * Debugging printer function using printk().
  */
 int lu_printk_printer(const struct lu_env *env,
-                      void *unused, const char *format, ...)
+                     void *unused, const char *format, ...)
 {
         va_list args;
 
@@ -1894,14 +2260,14 @@ int lu_printk_printer(const struct lu_env *env,
         return 0;
 }
 
-void lu_debugging_setup(void)
+int lu_debugging_setup(void)
 {
-        lu_env_init(&lu_debugging_env, ~0);
+       return lu_env_init(&lu_debugging_env, ~0);
 }
 
 void lu_context_keys_dump(void)
 {
-        int i;
+       unsigned int i;
 
         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
                 struct lu_context_key *key;
@@ -1911,84 +2277,22 @@ void lu_context_keys_dump(void)
                         CERROR("[%d]: %p %x (%p,%p,%p) %d %d \"%s\"@%p\n",
                                i, key, key->lct_tags,
                                key->lct_init, key->lct_fini, key->lct_exit,
-                               key->lct_index, cfs_atomic_read(&key->lct_used),
+                              key->lct_index, atomic_read(&key->lct_used),
                                key->lct_owner ? key->lct_owner->name : "",
                                key->lct_owner);
                         lu_ref_print(&key->lct_reference);
                 }
         }
 }
-EXPORT_SYMBOL(lu_context_keys_dump);
-#else  /* !__KERNEL__ */
-static int lu_cache_shrink(int nr, unsigned int gfp_mask)
-{
-        return 0;
-}
-#endif /* __KERNEL__ */
-
-int  cl_global_init(void);
-void cl_global_fini(void);
-int  lu_ref_global_init(void);
-void lu_ref_global_fini(void);
-
-int dt_global_init(void);
-void dt_global_fini(void);
-
-int llo_global_init(void);
-void llo_global_fini(void);
-
-/* context key constructor/destructor: lu_ucred_key_init, lu_ucred_key_fini */
-LU_KEY_INIT_FINI(lu_ucred, struct lu_ucred);
-
-static struct lu_context_key lu_ucred_key = {
-       .lct_tags = LCT_SESSION,
-       .lct_init = lu_ucred_key_init,
-       .lct_fini = lu_ucred_key_fini
-};
-
-/**
- * Get ucred key if session exists and ucred key is allocated on it.
- * Return NULL otherwise.
- */
-struct lu_ucred *lu_ucred(const struct lu_env *env)
-{
-       if (!env->le_ses)
-               return NULL;
-       return lu_context_key_get(env->le_ses, &lu_ucred_key);
-}
-EXPORT_SYMBOL(lu_ucred);
-
-/**
- * Get ucred key and check if it is properly initialized.
- * Return NULL otherwise.
- */
-struct lu_ucred *lu_ucred_check(const struct lu_env *env)
-{
-       struct lu_ucred *uc = lu_ucred(env);
-       if (uc && uc->uc_valid != UCRED_OLD && uc->uc_valid != UCRED_NEW)
-               return NULL;
-       return uc;
-}
-EXPORT_SYMBOL(lu_ucred_check);
-
-/**
- * Get ucred key, which must exist and must be properly initialized.
- * Assert otherwise.
- */
-struct lu_ucred *lu_ucred_assert(const struct lu_env *env)
-{
-       struct lu_ucred *uc = lu_ucred_check(env);
-       LASSERT(uc != NULL);
-       return uc;
-}
-EXPORT_SYMBOL(lu_ucred_assert);
 
 /**
  * Initialization of global lu_* data.
  */
 int lu_global_init(void)
 {
-        int result;
+       int result;
+       DEF_SHRINKER_VAR(shvar, lu_cache_shrink,
+                        lu_cache_shrink_count, lu_cache_shrink_scan);
 
         CDEBUG(D_INFO, "Lustre LU module (%p).\n", &lu_keys);
 
@@ -2001,19 +2305,14 @@ int lu_global_init(void)
         if (result != 0)
                 return result;
 
-       LU_CONTEXT_KEY_INIT(&lu_ucred_key);
-       result = lu_context_key_register(&lu_ucred_key);
-       if (result != 0)
-               return result;
-
         /*
          * At this level, we don't know what tags are needed, so allocate them
          * conservatively. This should not be too bad, because this
          * environment is global.
          */
-       mutex_lock(&lu_sites_guard);
+       down_write(&lu_sites_guard);
         result = lu_env_init(&lu_shrink_env, LCT_SHRINKER);
-       mutex_unlock(&lu_sites_guard);
+       up_write(&lu_sites_guard);
         if (result != 0)
                 return result;
 
@@ -2022,20 +2321,11 @@ int lu_global_init(void)
          * inode, one for ea. Unfortunately setting this high value results in
          * lu_object/inode cache consuming all the memory.
          */
-        lu_site_shrinker = cfs_set_shrinker(CFS_DEFAULT_SEEKS, lu_cache_shrink);
+       lu_site_shrinker = set_shrinker(DEFAULT_SEEKS, &shvar);
         if (lu_site_shrinker == NULL)
                 return -ENOMEM;
 
-#ifdef __KERNEL__
-       result = dt_global_init();
-       if (result != 0)
-               return result;
-
-       result = llo_global_init();
-       if (result != 0)
-               return result;
-#endif
-        result = cl_global_init();
+       result = rhashtable_init(&lu_env_rhash, &lu_env_rhash_params);
 
         return result;
 }
@@ -2045,45 +2335,35 @@ int lu_global_init(void)
  */
 void lu_global_fini(void)
 {
-        cl_global_fini();
-#ifdef __KERNEL__
-        llo_global_fini();
-        dt_global_fini();
-#endif
         if (lu_site_shrinker != NULL) {
-                cfs_remove_shrinker(lu_site_shrinker);
+               remove_shrinker(lu_site_shrinker);
                 lu_site_shrinker = NULL;
         }
 
-        lu_context_key_degister(&lu_global_key);
-       lu_context_key_degister(&lu_ucred_key);
+       lu_context_key_degister(&lu_global_key);
 
         /*
          * Tear shrinker environment down _after_ de-registering
          * lu_global_key, because the latter has a value in the former.
          */
-       mutex_lock(&lu_sites_guard);
+       down_write(&lu_sites_guard);
         lu_env_fini(&lu_shrink_env);
-       mutex_unlock(&lu_sites_guard);
+       up_write(&lu_sites_guard);
+
+       rhashtable_destroy(&lu_env_rhash);
 
         lu_ref_global_fini();
 }
 
-struct lu_buf LU_BUF_NULL = {
-        .lb_buf = NULL,
-        .lb_len = 0
-};
-EXPORT_SYMBOL(LU_BUF_NULL);
-
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
 {
-#ifdef LPROCFS
-        struct lprocfs_counter ret;
+#ifdef CONFIG_PROC_FS
+       struct lprocfs_counter ret;
 
-        lprocfs_stats_collect(stats, idx, &ret);
-        return (__u32)ret.lc_count;
+       lprocfs_stats_collect(stats, idx, &ret);
+       return (__u32)ret.lc_count;
 #else
-        return 0;
+       return 0;
 #endif
 }
 
@@ -2091,27 +2371,28 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
  * Output site statistical counters into a buffer. Suitable for
  * lprocfs_rd_*()-style functions.
  */
-int lu_site_stats_print(const struct lu_site *s, char *page, int count)
+int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
 {
-        lu_site_stats_t stats;
+       lu_site_stats_t stats;
 
-        memset(&stats, 0, sizeof(stats));
-        lu_site_stats_get(s->ls_obj_hash, &stats, 1);
+       memset(&stats, 0, sizeof(stats));
+       lu_site_stats_get(s, &stats);
 
-        return snprintf(page, count, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
-                        stats.lss_busy,
-                        stats.lss_total,
-                        stats.lss_populated,
-                        CFS_HASH_NHLIST(s->ls_obj_hash),
-                        stats.lss_max_search,
-                        ls_stats_read(s->ls_stats, LU_SS_CREATED),
-                        ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
-                        ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
-                        ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
-                        ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
-                        ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
+       seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
+                  stats.lss_busy,
+                  stats.lss_total,
+                  stats.lss_populated,
+                  CFS_HASH_NHLIST(s->ls_obj_hash),
+                  stats.lss_max_search,
+                  ls_stats_read(s->ls_stats, LU_SS_CREATED),
+                  ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
+                  ls_stats_read(s->ls_stats, LU_SS_CACHE_MISS),
+                  ls_stats_read(s->ls_stats, LU_SS_CACHE_RACE),
+                  ls_stats_read(s->ls_stats, LU_SS_CACHE_DEATH_RACE),
+                  ls_stats_read(s->ls_stats, LU_SS_LRU_PURGED));
+       return 0;
 }
-EXPORT_SYMBOL(lu_site_stats_print);
+EXPORT_SYMBOL(lu_site_stats_seq_print);
 
 /**
  * Helper function to initialize a number of kmem slab caches at once.
@@ -2122,9 +2403,9 @@ int lu_kmem_init(struct lu_kmem_descr *caches)
         struct lu_kmem_descr *iter = caches;
 
         for (result = 0; iter->ckd_cache != NULL; ++iter) {
-                *iter->ckd_cache = cfs_mem_cache_create(iter->ckd_name,
-                                                        iter->ckd_size,
-                                                        0, 0);
+               *iter->ckd_cache = kmem_cache_create(iter->ckd_name,
+                                                    iter->ckd_size,
+                                                    0, 0, NULL);
                 if (*iter->ckd_cache == NULL) {
                         result = -ENOMEM;
                         /* free all previously allocated caches */
@@ -2142,13 +2423,9 @@ EXPORT_SYMBOL(lu_kmem_init);
  */
 void lu_kmem_fini(struct lu_kmem_descr *caches)
 {
-        int rc;
-
         for (; caches->ckd_cache != NULL; ++caches) {
                 if (*caches->ckd_cache != NULL) {
-                        rc = cfs_mem_cache_destroy(*caches->ckd_cache);
-                        LASSERTF(rc == 0, "couldn't destroy %s slab\n",
-                                 caches->ckd_name);
+                       kmem_cache_destroy(*caches->ckd_cache);
                         *caches->ckd_cache = NULL;
                 }
         }
@@ -2164,24 +2441,26 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
 {
        struct lu_site          *s = o->lo_dev->ld_site;
        struct lu_fid           *old = &o->lo_header->loh_fid;
-       struct lu_site_bkt_data *bkt;
-       struct lu_object        *shadow;
-       cfs_waitlink_t           waiter;
-       cfs_hash_t              *hs;
-       cfs_hash_bd_t            bd;
-       __u64                    version = 0;
+       struct cfs_hash         *hs;
+       struct cfs_hash_bd       bd;
 
        LASSERT(fid_is_zero(old));
 
+       /* supposed to be unique */
        hs = s->ls_obj_hash;
        cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
-       shadow = htable_lookup(s, &bd, fid, &waiter, &version);
-       /* supposed to be unique */
-       LASSERT(shadow == NULL);
+#ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
+       {
+               __u64 version = 0;
+               struct lu_object *shadow;
+
+               shadow = htable_lookup(s, &bd, fid, &version);
+               /* supposed to be unique */
+               LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
+       }
+#endif
        *old = *fid;
-       bkt = cfs_hash_bd_extra_get(hs, &bd);
        cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-       bkt->lsb_busy++;
        cfs_hash_bd_unlock(hs, &bd, 1);
 }
 EXPORT_SYMBOL(lu_object_assign_fid);
@@ -2195,12 +2474,97 @@ struct lu_object *lu_object_anon(const struct lu_env *env,
                                 struct lu_device *dev,
                                 const struct lu_object_conf *conf)
 {
-       struct lu_fid     fid;
+       struct lu_fid fid;
        struct lu_object *o;
+       int rc;
 
        fid_zero(&fid);
-       o = lu_object_alloc(env, dev, &fid, conf);
+       o = lu_object_alloc(env, dev, &fid);
+       if (!IS_ERR(o)) {
+               rc = lu_object_start(env, dev, o, conf);
+               if (rc) {
+                       lu_object_free(env, o);
+                       return ERR_PTR(rc);
+               }
+       }
 
        return o;
 }
 EXPORT_SYMBOL(lu_object_anon);
+
+struct lu_buf LU_BUF_NULL = {
+       .lb_buf = NULL,
+       .lb_len = 0
+};
+EXPORT_SYMBOL(LU_BUF_NULL);
+
+void lu_buf_free(struct lu_buf *buf)
+{
+       LASSERT(buf);
+       if (buf->lb_buf) {
+               LASSERT(buf->lb_len > 0);
+               OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+               buf->lb_buf = NULL;
+               buf->lb_len = 0;
+       }
+}
+EXPORT_SYMBOL(lu_buf_free);
+
+void lu_buf_alloc(struct lu_buf *buf, size_t size)
+{
+       LASSERT(buf);
+       LASSERT(buf->lb_buf == NULL);
+       LASSERT(buf->lb_len == 0);
+       OBD_ALLOC_LARGE(buf->lb_buf, size);
+       if (likely(buf->lb_buf))
+               buf->lb_len = size;
+}
+EXPORT_SYMBOL(lu_buf_alloc);
+
+void lu_buf_realloc(struct lu_buf *buf, size_t size)
+{
+       lu_buf_free(buf);
+       lu_buf_alloc(buf, size);
+}
+EXPORT_SYMBOL(lu_buf_realloc);
+
+struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, size_t len)
+{
+       if (buf->lb_buf == NULL && buf->lb_len == 0)
+               lu_buf_alloc(buf, len);
+
+       if ((len > buf->lb_len) && (buf->lb_buf != NULL))
+               lu_buf_realloc(buf, len);
+
+       return buf;
+}
+EXPORT_SYMBOL(lu_buf_check_and_alloc);
+
+/**
+ * Increase the size of the \a buf.
+ * preserves old data in buffer
+ * old buffer remains unchanged on error
+ * \retval 0 or -ENOMEM
+ */
+int lu_buf_check_and_grow(struct lu_buf *buf, size_t len)
+{
+       char *ptr;
+
+       if (len <= buf->lb_len)
+               return 0;
+
+       OBD_ALLOC_LARGE(ptr, len);
+       if (ptr == NULL)
+               return -ENOMEM;
+
+       /* Free the old buf */
+       if (buf->lb_buf != NULL) {
+               memcpy(ptr, buf->lb_buf, buf->lb_len);
+               OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+       }
+
+       buf->lb_buf = ptr;
+       buf->lb_len = len;
+       return 0;
+}
+EXPORT_SYMBOL(lu_buf_check_and_grow);