Whamcloud - gitweb
LU-8130 lu_object: convert lu_object cache to rhashtable 07/36707/32
authorMr NeilBrown <neilb@suse.de>
Thu, 14 May 2020 12:03:33 +0000 (08:03 -0400)
committerOleg Drokin <green@whamcloud.com>
Sun, 28 Jun 2020 02:47:33 +0000 (02:47 +0000)
The lu_object cache is a little more complex than the other lustre
hash tables for two reasons.
1/ there is a debugfs file which displays the contents of the cache,
  so we need to use rhashtable_walk in a way that works for seq_file.

2/ There is a (sharded) lru list for objects which are no longer
   referenced, so finding an object needs to consider races with the
   lru as well as with the hash table.

The debugfs file already manages walking the libcfs hash table keeping
a current-position in the private data.  We can fairly easily convert
that to a struct rhashtable_iter.  The debugfs file actually reports
pages, and there are multiple pages per hashtable object.  So as well
as rhashtable_iter, we need the current page index.

For the double-locking, the current code uses direct-access to the
bucket locks that libcfs_hash provides.  rhashtable doesn't provide
that access - callers must provide their own locking or use rcu
techniques.

The lsb_waitq.lock is still used to manage the lru list, but with
this patch it is no longer nested *inside* the hashtable locks, but
instead is outside.  It is used to protect an object with a refcount
of zero.

When purging old objects from an lru, we first set
LU_OBJECT_HEARD_BANSHEE while holding the lsb_waitq.lock,
then remove all the entries from the hashtable separately.

When removing the last reference from an object, we first take the
lsb_waitq.lock, then decrement the reference and add to the lru list
or discard it setting LU_OBJECT_UNHASHED.

When we find an object in the hashtable with a refcount of zero, we
take the corresponding lsb_waitq.lock and check that neither
LU_OBJECT_HEARD_BANSHEE or LU_OBJECT_UNHASHED is set.  If neither is,
we can safely increment the refcount.  If either are, the object is
gone.

This way, we only ever manipulate an object with a refcount of zero
while holding the lsb_waitq.lock.

As there is nothing to stop us using the resizing capabilities of
rhashtable, the code to try to guess the perfect hash size has been
removed.

Also: the "is_dying" variable in lu_object_put() is racey - the value
could change the moment it is sampled.  It is also not needed as it is
only used to avoid a wakeup, which is not particularly expensive.
In the same code as comment says that 'top' could not be accessed, but
the code then immediately accesses 'top' to calculate 'bkt'.
So move the initialization of 'bkt' to before 'top' becomes unsafe.

Also: Change "wake_up_all()" to "wake_up()".  wake_up_all() is only
relevant when an exclusive wait is used.

Moving from the libcfs hashtable to rhashtable also gives the
benefit of a very large performance boost.

Before patch:

SUMMARY rate: (of 5 iterations)
   Operation                      Max            Min           Mean        Std Dev
   ---------                      ---            ---           ----        -------
   Directory creation:      12036.610      11091.880      11452.978        318.829
   Directory stat:          25871.734      24232.310      24935.661        574.996
   Directory removal:       12698.769      12239.685      12491.008        149.149
   File creation:           11722.036      11673.961      11692.157         15.966
   File stat:               62304.540      58237.124      60282.003       1479.103
   File read:               24204.811      23889.091      24048.577        110.245
   File removal:             9412.930       9111.828       9217.546        120.894
   Tree creation:            3515.536       3195.627       3442.609        123.792
   Tree removal:              433.917        418.935        428.038          5.545

After patch:

SUMMARY rate: (of 5 iterations)
   Operation                      Max            Min           Mean        Std Dev
   ---------                      ---            ---           ----        -------
   Directory creation:      11873.308        303.626       9371.860       4539.539
   Directory stat:          31116.512      30190.574      30568.091        335.545
   Directory removal:       13082.121      12645.228      12943.239        157.695
   File creation:           12607.135      12293.319      12466.647        138.307
   File stat:              124419.347     105240.996     116919.977       7847.165
   File read:               39707.270      36295.477      38266.011       1328.857
   File removal:             9614.333       9273.931       9477.299        140.201
   Tree creation:            3572.602       3017.580       3339.547        207.061
   Tree removal:              487.687          0.004        282.188        230.659

Change-Id: I618dc2e2da003c240a887126f600e7eac5df951c
Signed-off-by: Mr NeilBrown <neilb@suse.de>
Reviewed-on: https://review.whamcloud.com/36707
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Shaun Tancheff <shaun.tancheff@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lu_object.h
lustre/llite/vvp_dev.c
lustre/lod/lod_dev.c
lustre/lov/lovsub_dev.c
lustre/mgs/mgs_handler.c
lustre/obdclass/lu_object.c
lustre/ofd/ofd_dev.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-zfs/osd_handler.c
lustre/osp/lwp_dev.c
lustre/osp/osp_dev.c

index 668dfdc..187ecdd 100644 (file)
@@ -38,6 +38,7 @@
 #include <uapi/linux/lustre/lustre_idl.h>
 #include <lu_ref.h>
 #include <linux/percpu_counter.h>
+#include <linux/rhashtable.h>
 #include <linux/ctype.h>
 #include <obd_target.h>
 
@@ -499,11 +500,6 @@ enum lu_object_header_flags {
         * intialized yet, the object allocator will initialize it.
         */
        LU_OBJECT_INITED        = 2,
-       /**
-        * Object is being purged, so mustn't be returned by
-        * htable_lookup()
-        */
-       LU_OBJECT_PURGING       = 3,
 };
 
 enum lu_object_header_attr {
@@ -527,6 +523,8 @@ enum lu_object_header_attr {
  * it is created for things like not-yet-existing child created by mkdir or
  * create calls. lu_object_operations::loo_exists() can be used to check
  * whether object is backed by persistent storage entity.
+ * Any object containing this structre which might be placed in an
+ * rhashtable via loh_hash MUST be freed using call_rcu() or rcu_kfree().
  */
 struct lu_object_header {
        /**
@@ -548,9 +546,9 @@ struct lu_object_header {
         */
        __u32                   loh_attr;
        /**
-        * Linkage into per-site hash table. Protected by lu_site::ls_guard.
+        * Linkage into per-site hash table.
         */
-       struct hlist_node       loh_hash;
+       struct rhash_head       loh_hash;
        /**
         * Linkage into per-site LRU list. Protected by lu_site::ls_guard.
         */
@@ -596,7 +594,7 @@ struct lu_site {
         /**
          * objects hash table
          */
-       struct cfs_hash         *ls_obj_hash;
+       struct rhashtable       ls_obj_hash;
        /*
         * buckets for summary data
         */
@@ -676,7 +674,8 @@ int  lu_object_init       (struct lu_object *o,
 void lu_object_fini       (struct lu_object *o);
 void lu_object_add_top    (struct lu_object_header *h, struct lu_object *o);
 void lu_object_add        (struct lu_object *before, struct lu_object *o);
-
+struct lu_object *lu_object_get_first(struct lu_object_header *h,
+                                     struct lu_device *dev);
 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d);
 void lu_dev_del_linkage(struct lu_site *s, struct lu_device *d);
 
@@ -734,8 +733,8 @@ static inline int lu_site_purge(const struct lu_env *env, struct lu_site *s,
        return lu_site_purge_objects(env, s, nr, 1);
 }
 
-void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
-                   lu_printer_t printer);
+void lu_site_print(const struct lu_env *env, struct lu_site *s, atomic_t *ref,
+                  int msg_flags, lu_printer_t printer);
 struct lu_object *lu_object_find(const struct lu_env *env,
                                  struct lu_device *dev, const struct lu_fid *f,
                                  const struct lu_object_conf *conf);
index 58efc98..b9daf52 100644 (file)
@@ -374,21 +374,13 @@ int cl_sb_fini(struct super_block *sb)
  *
  ****************************************************************************/
 
-struct vvp_pgcache_id {
-        unsigned                 vpi_bucket;
-        unsigned                 vpi_depth;
-        uint32_t                 vpi_index;
-
-        unsigned                 vpi_curdep;
-        struct lu_object_header *vpi_obj;
-};
-
 struct vvp_seq_private {
        struct ll_sb_info       *vsp_sbi;
        struct lu_env           *vsp_env;
        u16                     vsp_refcheck;
        struct cl_object        *vsp_clob;
-       struct vvp_pgcache_id   vvp_id;
+       struct rhashtable_iter  vsp_iter;
+       u32                     vsp_page_index;
        /*
         * prev_pos is the 'pos' of the last object returned
         * by ->start of ->next.
@@ -396,80 +388,43 @@ struct vvp_seq_private {
        loff_t                  vvp_prev_pos;
 };
 
-static int vvp_pgcache_obj_get(struct cfs_hash *hs, struct cfs_hash_bd *bd,
-                              struct hlist_node *hnode, void *data)
-{
-        struct vvp_pgcache_id   *id  = data;
-        struct lu_object_header *hdr = cfs_hash_object(hs, hnode);
-
-       if (lu_object_is_dying(hdr))
-               return 0;
-
-        if (id->vpi_curdep-- > 0)
-                return 0; /* continue */
-
-        cfs_hash_get(hs, hnode);
-        id->vpi_obj = hdr;
-        return 1;
-}
-
-static struct cl_object *vvp_pgcache_obj(const struct lu_env *env,
-                                        struct lu_device *dev,
-                                        struct vvp_pgcache_id *id)
-{
-       LASSERT(lu_device_is_cl(dev));
-
-       id->vpi_obj = NULL;
-       id->vpi_curdep = id->vpi_depth;
-
-       cfs_hash_hlist_for_each(dev->ld_site->ls_obj_hash, id->vpi_bucket,
-                               vvp_pgcache_obj_get, id);
-       if (id->vpi_obj != NULL) {
-               struct lu_object *lu_obj;
-
-               lu_obj = lu_object_locate(id->vpi_obj, dev->ld_type);
-               if (lu_obj != NULL) {
-                       lu_object_ref_add(lu_obj, "dump", current);
-                       return lu2cl(lu_obj);
-               }
-               lu_object_put(env, lu_object_top(id->vpi_obj));
-       }
-       return NULL;
-}
-
 static struct page *vvp_pgcache_current(struct vvp_seq_private *priv)
 {
        struct lu_device *dev = &priv->vsp_sbi->ll_cl->cd_lu_dev;
+       struct lu_object_header *h;
+       struct page *vmpage = NULL;
 
-       while (1) {
+       rhashtable_walk_start(&priv->vsp_iter);
+       while ((h = rhashtable_walk_next(&priv->vsp_iter)) != NULL) {
                struct inode *inode;
-               struct page *vmpage;
                int nr;
 
                if (!priv->vsp_clob) {
-                       struct cl_object *clob;
-
-                       while ((clob = vvp_pgcache_obj(priv->vsp_env, dev, &priv->vvp_id)) == NULL &&
-                              ++(priv->vvp_id.vpi_bucket) < CFS_HASH_NHLIST(dev->ld_site->ls_obj_hash))
-                               priv->vvp_id.vpi_depth = 0;
-                       if (!clob)
-                               return NULL;
-                       priv->vsp_clob = clob;
-                       priv->vvp_id.vpi_index = 0;
+                       struct lu_object *lu_obj;
+
+                       lu_obj = lu_object_get_first(h, dev);
+                       if (!lu_obj)
+                               continue;
+
+                       priv->vsp_clob = lu2cl(lu_obj);
+                       lu_object_ref_add(lu_obj, "dump", current);
+                       priv->vsp_page_index = 0;
                }
 
                inode = vvp_object_inode(priv->vsp_clob);
-               nr = find_get_pages_contig(inode->i_mapping, priv->vvp_id.vpi_index, 1, &vmpage);
+               nr = find_get_pages_contig(inode->i_mapping,
+                                          priv->vsp_page_index, 1, &vmpage);
                if (nr > 0) {
-                       priv->vvp_id.vpi_index = vmpage->index;
-                       return vmpage;
+                       priv->vsp_page_index = vmpage->index;
+                       break;
                }
                lu_object_ref_del(&priv->vsp_clob->co_lu, "dump", current);
                cl_object_put(priv->vsp_env, priv->vsp_clob);
                priv->vsp_clob = NULL;
-               priv->vvp_id.vpi_index = 0;
-               priv->vvp_id.vpi_depth++;
+               priv->vsp_page_index = 0;
        }
+       rhashtable_walk_stop(&priv->vsp_iter);
+       return vmpage;
 }
 
 #define seq_page_flag(seq, page, flag, has_flags) do {                  \
@@ -534,7 +489,10 @@ static int vvp_pgcache_show(struct seq_file *f, void *v)
 static void vvp_pgcache_rewind(struct vvp_seq_private *priv)
 {
        if (priv->vvp_prev_pos) {
-               memset(&priv->vvp_id, 0, sizeof(priv->vvp_id));
+               struct lu_site *s = priv->vsp_sbi->ll_cl->cd_lu_dev.ld_site;
+
+               rhashtable_walk_exit(&priv->vsp_iter);
+               rhashtable_walk_enter(&s->ls_obj_hash, &priv->vsp_iter);
                priv->vvp_prev_pos = 0;
                if (priv->vsp_clob) {
                        lu_object_ref_del(&priv->vsp_clob->co_lu, "dump",
@@ -547,7 +505,7 @@ static void vvp_pgcache_rewind(struct vvp_seq_private *priv)
 
 static struct page *vvp_pgcache_next_page(struct vvp_seq_private *priv)
 {
-       priv->vvp_id.vpi_index += 1;
+       priv->vsp_page_index += 1;
        return vvp_pgcache_current(priv);
 }
 
@@ -561,7 +519,7 @@ static void *vvp_pgcache_start(struct seq_file *f, loff_t *pos)
                /* Return the current item */;
        } else {
                WARN_ON(*pos != priv->vvp_prev_pos + 1);
-               priv->vvp_id.vpi_index += 1;
+               priv->vsp_page_index += 1;
        }
 
        priv->vvp_prev_pos = *pos;
@@ -593,6 +551,7 @@ static struct seq_operations vvp_pgcache_ops = {
 static int vvp_dump_pgcache_seq_open(struct inode *inode, struct file *filp)
 {
        struct vvp_seq_private *priv;
+       struct lu_site *s;
 
        priv = __seq_open_private(filp, &vvp_pgcache_ops, sizeof(*priv));
        if (!priv)
@@ -601,7 +560,6 @@ static int vvp_dump_pgcache_seq_open(struct inode *inode, struct file *filp)
        priv->vsp_sbi = inode->i_private;
        priv->vsp_env = cl_env_get(&priv->vsp_refcheck);
        priv->vsp_clob = NULL;
-       memset(&priv->vvp_id, 0, sizeof(priv->vvp_id));
        if (IS_ERR(priv->vsp_env)) {
                int err = PTR_ERR(priv->vsp_env);
 
@@ -609,6 +567,9 @@ static int vvp_dump_pgcache_seq_open(struct inode *inode, struct file *filp)
                return err;
        }
 
+       s = priv->vsp_sbi->ll_cl->cd_lu_dev.ld_site;
+       rhashtable_walk_enter(&s->ls_obj_hash, &priv->vsp_iter);
+
        return 0;
 }
 
@@ -621,8 +582,8 @@ static int vvp_dump_pgcache_seq_release(struct inode *inode, struct file *file)
                lu_object_ref_del(&priv->vsp_clob->co_lu, "dump", current);
                cl_object_put(priv->vsp_env, priv->vsp_clob);
        }
-
        cl_env_put(priv->vsp_env, &priv->vsp_refcheck);
+       rhashtable_walk_exit(&priv->vsp_iter);
        return seq_release_private(inode, file);
 }
 
index 6ec7826..86cb84e 100644 (file)
@@ -1841,10 +1841,9 @@ static struct lu_device *lod_device_free(const struct lu_env *env,
 
        ENTRY;
 
-       if (atomic_read(&lu->ld_ref) > 0 &&
-           !cfs_hash_is_empty(lu->ld_site->ls_obj_hash)) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
-               lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer);
+       if (atomic_read(&lu->ld_site->ls_obj_hash.nelems)) {
+               lu_site_print(env, lu->ld_site, &lu->ld_ref, D_ERROR,
+                             lu_cdebug_printer);
        }
        LASSERTF(atomic_read(&lu->ld_ref) == 0, "lu is %p\n", lu);
        dt_device_fini(&lod->lod_dt_dev);
index 90a11e7..c694c66 100644 (file)
@@ -90,10 +90,7 @@ static struct lu_device *lovsub_device_free(const struct lu_env *env,
        struct lovsub_device *lsd = lu2lovsub_dev(d);
        struct lu_device *next = cl2lu_dev(lsd->acid_next);
 
-       if (atomic_read(&d->ld_ref) && d->ld_site) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
-               lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
-       }
+       lu_site_print(env, d->ld_site, &d->ld_ref, D_ERROR, lu_cdebug_printer);
        cl_device_fini(lu2cl_dev(d));
        OBD_FREE_PTR(lsd);
        return next;
index e314fa9..9da8271 100644 (file)
@@ -1405,11 +1405,9 @@ err_ns:
        obd->obd_namespace = NULL;
 err_ops:
        lu_site_purge(env, mgs2lu_dev(mgs)->ld_site, ~0);
-       if (!cfs_hash_is_empty(mgs2lu_dev(mgs)->ld_site->ls_obj_hash)) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_OTHER, NULL);
-               lu_site_print(env, mgs2lu_dev(mgs)->ld_site, &msgdata,
-                               lu_cdebug_printer);
-       }
+       lu_site_print(env, mgs2lu_dev(mgs)->ld_site,
+                     &mgs2lu_dev(mgs)->ld_site->ls_obj_hash.nelems,
+                     D_OTHER, lu_cdebug_printer);
        obd_disconnect(mgs->mgs_bottom_exp);
 err_lmi:
        if (lmi)
@@ -1582,11 +1580,8 @@ static struct lu_device *mgs_device_fini(const struct lu_env *env,
        obd->obd_namespace = NULL;
 
        lu_site_purge(env, d->ld_site, ~0);
-       if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_OTHER, NULL);
-               lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
-       }
-
+       lu_site_print(env, d->ld_site, &d->ld_site->ls_obj_hash.nelems,
+                     D_OTHER, lu_cdebug_printer);
        LASSERT(mgs->mgs_bottom_exp);
        obd_disconnect(mgs->mgs_bottom_exp);
 
index 440767d..5da1aa6 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_CLASS
 
+#include <linux/delay.h>
 #include <linux/module.h>
 #include <linux/list.h>
 #include <linux/processor.h>
 #include <linux/random.h>
 
 #include <libcfs/libcfs.h>
-#include <libcfs/libcfs_hash.h> /* hash_long() */
 #include <libcfs/linux/linux-mem.h>
 #include <obd_class.h>
 #include <obd_support.h>
@@ -84,13 +84,12 @@ enum {
 #define        LU_CACHE_NR_MAX_ADJUST          512
 #define        LU_CACHE_NR_UNLIMITED           -1
 #define        LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
-#define        LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
 /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
 #define        LU_CACHE_NR_ZFS_LIMIT           10240
 
-#define LU_SITE_BITS_MIN    12
-#define LU_SITE_BITS_MAX    24
-#define LU_SITE_BITS_MAX_CL 19
+#define        LU_CACHE_NR_MIN                 4096
+#define        LU_CACHE_NR_MAX                 0x80000000UL
+
 /**
  * Max 256 buckets, we don't want too many buckets because:
  * - consume too much memory (currently max 16K)
@@ -100,7 +99,6 @@ enum {
  */
 #define LU_SITE_BKT_BITS    8
 
-
 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
 module_param(lu_cache_percent, int, 0644);
 MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
@@ -112,7 +110,7 @@ MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
 
-static u32 lu_fid_hash(const void *data, u32 seed)
+static u32 lu_fid_hash(const void *data, u32 len, u32 seed)
 {
        const struct lu_fid *fid = data;
 
@@ -121,9 +119,17 @@ static u32 lu_fid_hash(const void *data, u32 seed)
        return seed;
 }
 
+static const struct rhashtable_params obj_hash_params = {
+       .key_len        = sizeof(struct lu_fid),
+       .key_offset     = offsetof(struct lu_object_header, loh_fid),
+       .head_offset    = offsetof(struct lu_object_header, loh_hash),
+       .hashfn         = lu_fid_hash,
+       .automatic_shrinking = true,
+};
+
 static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid)
 {
-       return lu_fid_hash(fid, s->ls_bkt_seed) &
+       return lu_fid_hash(fid, sizeof(*fid), s->ls_bkt_seed) &
               (s->ls_bkt_cnt - 1);
 }
 
@@ -148,9 +154,7 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
        struct lu_object_header *top = o->lo_header;
        struct lu_site *site = o->lo_dev->ld_site;
        struct lu_object *orig = o;
-       struct cfs_hash_bd bd;
        const struct lu_fid *fid = lu_object_fid(o);
-       bool is_dying;
 
        /*
         * till we have full fids-on-OST implemented anonymous objects
@@ -158,8 +162,6 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
         * so we should not remove it from the site.
         */
        if (fid_is_zero(fid)) {
-               LASSERT(top->loh_hash.next == NULL
-                       && top->loh_hash.pprev == NULL);
                LASSERT(list_empty(&top->loh_lru));
                if (!atomic_dec_and_test(&top->loh_ref))
                        return;
@@ -171,40 +173,45 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                return;
        }
 
-       cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
-
-       is_dying = lu_object_is_dying(top);
-       if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
-               /* at this point the object reference is dropped and lock is
+       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+       if (atomic_add_unless(&top->loh_ref, -1, 1)) {
+still_active:
+               /*
+                * At this point the object reference is dropped and lock is
                 * not taken, so lu_object should not be touched because it
-                * can be freed by concurrent thread. Use local variable for
-                * check.
+                * can be freed by concurrent thread.
+                *
+                * Somebody may be waiting for this, currently only used for
+                * cl_object, see cl_object_put_last().
                 */
-               if (is_dying) {
-                       /*
-                        * somebody may be waiting for this, currently only
-                        * used for cl_object, see cl_object_put_last().
-                        */
-                       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
-                       wake_up_all(&bkt->lsb_waitq);
-               }
+               wake_up(&bkt->lsb_waitq);
+
                return;
        }
 
+       spin_lock(&bkt->lsb_waitq.lock);
+       if (!atomic_dec_and_test(&top->loh_ref)) {
+               spin_unlock(&bkt->lsb_waitq.lock);
+               goto still_active;
+       }
+
+       /*
+        * Refcount is zero, and cannot be incremented without taking the bkt
+        * lock, so object is stable.
+        */
+
        /*
-        * When last reference is released, iterate over object
-        * layers, and notify them that object is no longer busy.
+        * When last reference is released, iterate over object layers, and
+        * notify them that object is no longer busy.
         */
        list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
                if (o->lo_ops->loo_object_release != NULL)
                        o->lo_ops->loo_object_release(env, o);
        }
 
-       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
-       spin_lock(&bkt->lsb_waitq.lock);
-
-       /* don't use local 'is_dying' here because if was taken without lock
-        * but here we need the latest actual value of it so check lu_object
+       /*
+        * Don't use local 'is_dying' here because if was taken without lock but
+        * here we need the latest actual value of it so check lu_object
         * directly here.
         */
        if (!lu_object_is_dying(top) &&
@@ -213,26 +220,26 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
                spin_unlock(&bkt->lsb_waitq.lock);
                percpu_counter_inc(&site->ls_lru_len_counter);
-               CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n",
-                      orig, top, site->ls_obj_hash, bkt);
-               cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
+               CDEBUG(D_INODE, "Add %p/%p to site lru. bkt: %p\n",
+                      orig, top, bkt);
                return;
        }
 
        /*
-        * If object is dying (will not be cached) then remove it
-        * from hash table (it is already not on the LRU).
+        * If object is dying (will not be cached) then remove it from hash
+        * table (it is already not on the LRU).
         *
-        * This is done with hash table lists locked. As the only
-        * way to acquire first reference to previously unreferenced
-        * object is through hash-table lookup (lu_object_find())
-        * which is done under hash-table, no race with concurrent
-        * object lookup is possible and we can safely destroy object below.
+        * This is done with bucket lock held.  As the only way to acquire first
+        * reference to previously unreferenced object is through hash-table
+        * lookup (lu_object_find()) which takes the lock for first reference,
+        * no race with concurrent object lookup is possible and we can safely
+        * destroy object below.
         */
        if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
-               cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
+               rhashtable_remove_fast(&site->ls_obj_hash, &top->loh_hash,
+                                      obj_hash_params);
+
        spin_unlock(&bkt->lsb_waitq.lock);
-       cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
        /* Object was already removed from hash above, can kill it. */
        lu_object_free(env, orig);
 }
@@ -261,21 +268,19 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
        set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
        if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
                struct lu_site *site = o->lo_dev->ld_site;
-               struct cfs_hash *obj_hash = site->ls_obj_hash;
-               struct cfs_hash_bd bd;
+               struct rhashtable *obj_hash = &site->ls_obj_hash;
+               struct lu_site_bkt_data *bkt;
 
-               cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
+               bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+               spin_lock(&bkt->lsb_waitq.lock);
                if (!list_empty(&top->loh_lru)) {
-                       struct lu_site_bkt_data *bkt;
-
-                       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
-                       spin_lock(&bkt->lsb_waitq.lock);
                        list_del_init(&top->loh_lru);
-                       spin_unlock(&bkt->lsb_waitq.lock);
                        percpu_counter_dec(&site->ls_lru_len_counter);
                }
-               cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
-               cfs_hash_bd_unlock(obj_hash, &bd, 1);
+               spin_unlock(&bkt->lsb_waitq.lock);
+
+               rhashtable_remove_fast(obj_hash, &top->loh_hash,
+                                      obj_hash_params);
        }
 }
 EXPORT_SYMBOL(lu_object_unhash);
@@ -419,39 +424,39 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                          int nr, int canblock)
 {
-        struct lu_object_header *h;
-        struct lu_object_header *temp;
-        struct lu_site_bkt_data *bkt;
+       struct lu_object_header *h;
+       struct lu_object_header *temp;
+       struct lu_site_bkt_data *bkt;
        LIST_HEAD(dispose);
        int                      did_sth;
        unsigned int             start = 0;
-        int                      count;
-        int                      bnr;
+       int                      count;
+       int                      bnr;
        unsigned int             i;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
                RETURN(0);
 
-        /*
-         * Under LRU list lock, scan LRU list and move unreferenced objects to
-         * the dispose list, removing them from LRU and hash table.
-         */
+       /*
+        * Under LRU list lock, scan LRU list and move unreferenced objects to
+        * the dispose list, removing them from LRU and hash table.
+        */
        if (nr != ~0)
                start = s->ls_purge_start;
        bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1;
- again:
+again:
        /*
         * It doesn't make any sense to make purge threads parallel, that can
-        * only bring troubles to us. See LU-5331.
+        * only bring troubles to us.  See LU-5331.
         */
        if (canblock != 0)
                mutex_lock(&s->ls_purge_mutex);
        else if (mutex_trylock(&s->ls_purge_mutex) == 0)
                goto out;
 
-        did_sth = 0;
+       did_sth = 0;
        for (i = start; i < s->ls_bkt_cnt ; i++) {
-                count = bnr;
+               count = bnr;
                bkt = &s->ls_bkts[i];
                spin_lock(&bkt->lsb_waitq.lock);
 
@@ -460,21 +465,19 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
 
                        LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i);
 
-                       /* Cannot remove from hash under current spinlock,
-                        * so set flag to stop object from being found
-                        * by htable_lookup().
-                        */
-                       set_bit(LU_OBJECT_PURGING, &h->loh_flags);
+                       set_bit(LU_OBJECT_UNHASHED, &h->loh_flags);
+                       rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
+                                              obj_hash_params);
                        list_move(&h->loh_lru, &dispose);
                        percpu_counter_dec(&s->ls_lru_len_counter);
-                        if (did_sth == 0)
-                                did_sth = 1;
+                       if (did_sth == 0)
+                               did_sth = 1;
 
-                        if (nr != ~0 && --nr == 0)
-                                break;
+                       if (nr != ~0 && --nr == 0)
+                               break;
 
-                        if (count > 0 && --count == 0)
-                                break;
+                       if (count > 0 && --count == 0)
+                               break;
 
                }
                spin_unlock(&bkt->lsb_waitq.lock);
@@ -486,25 +489,24 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                while ((h = list_first_entry_or_null(&dispose,
                                                     struct lu_object_header,
                                                     loh_lru)) != NULL) {
-                       cfs_hash_del(s->ls_obj_hash, &h->loh_fid, &h->loh_hash);
                        list_del_init(&h->loh_lru);
                        lu_object_free(env, lu_object_top(h));
                        lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
                }
 
-                if (nr == 0)
-                        break;
-        }
+               if (nr == 0)
+                       break;
+       }
        mutex_unlock(&s->ls_purge_mutex);
 
-        if (nr != 0 && did_sth && start != 0) {
-                start = 0; /* restart from the first bucket */
-                goto again;
-        }
-        /* race on s->ls_purge_start, but nobody cares */
+       if (nr != 0 && did_sth && start != 0) {
+               start = 0; /* restart from the first bucket */
+               goto again;
+       }
+       /* race on s->ls_purge_start, but nobody cares */
        s->ls_purge_start = i & (s->ls_bkt_cnt - 1);
 out:
-        return nr;
+       return nr;
 }
 EXPORT_SYMBOL(lu_site_purge_objects);
 
@@ -598,9 +600,9 @@ void lu_object_header_print(const struct lu_env *env, void *cookie,
        (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
                   hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
                   PFID(&hdr->loh_fid),
-                  hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
-                  list_empty((struct list_head *)&hdr->loh_lru) ? \
-                  "" : " lru",
+                  test_bit(LU_OBJECT_UNHASHED,
+                           &hdr->loh_flags) ? "" : " hash",
+                  list_empty(&hdr->loh_lru) ? "" : " lru",
                   hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
 }
 EXPORT_SYMBOL(lu_object_header_print);
@@ -652,50 +654,96 @@ int lu_object_invariant(const struct lu_object *o)
         return 1;
 }
 
-static struct lu_object *htable_lookup(struct lu_site *s,
-                                      struct cfs_hash_bd *bd,
+/*
+ * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because the
+ * calculation for the number of objects to reclaim is not covered by a lock the
+ * maximum number of objects is capped by LU_CACHE_MAX_ADJUST.  This ensures
+ * that many concurrent threads will not accidentally purge the entire cache.
+ */
+static void lu_object_limit(const struct lu_env *env,
+                           struct lu_device *dev)
+{
+       u64 size, nr;
+
+       if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
+               return;
+
+       size = atomic_read(&dev->ld_site->ls_obj_hash.nelems);
+       nr = (u64)lu_cache_nr;
+       if (size <= nr)
+               return;
+
+       lu_site_purge_objects(env, dev->ld_site,
+                             min_t(u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
+                             0);
+}
+
+static struct lu_object *htable_lookup(const struct lu_env *env,
+                                      struct lu_device *dev,
+                                      struct lu_site_bkt_data *bkt,
                                       const struct lu_fid *f,
-                                      __u64 *version)
+                                      struct lu_object_header *new)
 {
+       struct lu_site *s = dev->ld_site;
        struct lu_object_header *h;
-       struct hlist_node *hnode;
-       __u64 ver = cfs_hash_bd_version_get(bd);
 
-       if (*version == ver)
+try_again:
+       rcu_read_lock();
+       if (new)
+               h = rhashtable_lookup_get_insert_fast(&s->ls_obj_hash,
+                                                     &new->loh_hash,
+                                                     obj_hash_params);
+       else
+               h = rhashtable_lookup(&s->ls_obj_hash, f, obj_hash_params);
+
+       if (IS_ERR_OR_NULL(h)) {
+               /* Not found */
+               if (!new)
+                       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
+               rcu_read_unlock();
+               if (PTR_ERR(h) == -ENOMEM) {
+                       msleep(20);
+                       goto try_again;
+               }
+               lu_object_limit(env, dev);
+               if (PTR_ERR(h) == -E2BIG)
+                       goto try_again;
+
                return ERR_PTR(-ENOENT);
+       }
 
-       *version = ver;
-       /* cfs_hash_bd_peek_locked is a somehow "internal" function
-        * of cfs_hash, it doesn't add refcount on object. */
-       hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
-       if (!hnode) {
+       if (atomic_inc_not_zero(&h->loh_ref)) {
+               rcu_read_unlock();
+               return lu_object_top(h);
+       }
+
+       spin_lock(&bkt->lsb_waitq.lock);
+       if (lu_object_is_dying(h) ||
+           test_bit(LU_OBJECT_UNHASHED, &h->loh_flags)) {
+               spin_unlock(&bkt->lsb_waitq.lock);
+               rcu_read_unlock();
+               if (new) {
+                       /*
+                        * Old object might have already been removed, or will
+                        * be soon.  We need to insert our new object, so
+                        * remove the old one just in case it is still there.
+                        */
+                       rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
+                                              obj_hash_params);
+                       goto try_again;
+               }
                lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
                return ERR_PTR(-ENOENT);
        }
+       /* Now protected by spinlock */
+       rcu_read_unlock();
 
-       h = container_of(hnode, struct lu_object_header, loh_hash);
        if (!list_empty(&h->loh_lru)) {
-               struct lu_site_bkt_data *bkt;
-
-               bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
-               spin_lock(&bkt->lsb_waitq.lock);
-               /* Might have just been moved to the dispose list, in which
-                * case LU_OBJECT_PURGING will be set.  In that case,
-                * delete it from the hash table immediately.
-                * When lu_site_purge_objects() tried, it will find it
-                * isn't there, which is harmless.
-                */
-               if (test_bit(LU_OBJECT_PURGING, &h->loh_flags)) {
-                       spin_unlock(&bkt->lsb_waitq.lock);
-                       cfs_hash_bd_del_locked(s->ls_obj_hash, bd, hnode);
-                       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
-                       return ERR_PTR(-ENOENT);
-               }
                list_del_init(&h->loh_lru);
-               spin_unlock(&bkt->lsb_waitq.lock);
                percpu_counter_dec(&s->ls_lru_len_counter);
        }
-       cfs_hash_get(s->ls_obj_hash, hnode);
+       atomic_inc(&h->loh_ref);
+       spin_unlock(&bkt->lsb_waitq.lock);
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        return lu_object_top(h);
 }
@@ -714,29 +762,37 @@ struct lu_object *lu_object_find(const struct lu_env *env,
 EXPORT_SYMBOL(lu_object_find);
 
 /*
- * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because
- * the calculation for the number of objects to reclaim is not covered by
- * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
- * This ensures that many concurrent threads will not accidentally purge
- * the entire cache.
+ * Get a 'first' reference to an object that was found while looking through the
+ * hash table.
  */
-static void lu_object_limit(const struct lu_env *env,
-                           struct lu_device *dev)
+struct lu_object *lu_object_get_first(struct lu_object_header *h,
+                                     struct lu_device *dev)
 {
-       __u64 size, nr;
+       struct lu_site *s = dev->ld_site;
+       struct lu_object *ret;
 
-       if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
-               return;
+       if (IS_ERR_OR_NULL(h) || lu_object_is_dying(h))
+               return NULL;
 
-       size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
-       nr = (__u64)lu_cache_nr;
-       if (size <= nr)
-               return;
+       ret = lu_object_locate(h, dev->ld_type);
+       if (!ret)
+               return ret;
 
-       lu_site_purge_objects(env, dev->ld_site,
-                             min_t(__u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
-                             0);
+       if (!atomic_inc_not_zero(&h->loh_ref)) {
+               struct lu_site_bkt_data *bkt;
+
+               bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
+               spin_lock(&bkt->lsb_waitq.lock);
+               if (!lu_object_is_dying(h) &&
+                   !test_bit(LU_OBJECT_UNHASHED, &h->loh_flags))
+                       atomic_inc(&h->loh_ref);
+               else
+                       ret = NULL;
+               spin_unlock(&bkt->lsb_waitq.lock);
+       }
+       return ret;
 }
+EXPORT_SYMBOL(lu_object_get_first);
 
 /**
  * Core logic of lu_object_find*() functions.
@@ -753,10 +809,8 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        struct lu_object *o;
        struct lu_object *shadow;
        struct lu_site *s;
-       struct cfs_hash *hs;
-       struct cfs_hash_bd bd;
        struct lu_site_bkt_data *bkt;
-       __u64 version = 0;
+       struct rhashtable *hs;
        int rc;
 
        ENTRY;
@@ -780,17 +834,14 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
         *
         */
        s  = dev->ld_site;
-       hs = s->ls_obj_hash;
+       hs = &s->ls_obj_hash;
 
        if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
                lu_site_purge(env, s, -1);
 
        bkt = &s->ls_bkts[lu_bkt_hash(s, f)];
-       cfs_hash_bd_get(hs, f, &bd);
        if (!(conf && conf->loc_flags & LOC_F_NEW)) {
-               cfs_hash_bd_lock(hs, &bd, 1);
-               o = htable_lookup(s, &bd, f, &version);
-               cfs_hash_bd_unlock(hs, &bd, 1);
+               o = htable_lookup(env, dev, bkt, f, NULL);
 
                if (!IS_ERR(o)) {
                        if (likely(lu_object_is_inited(o->lo_header)))
@@ -826,17 +877,21 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
 
        CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
 
-       cfs_hash_bd_lock(hs, &bd, 1);
-
-       if (conf && conf->loc_flags & LOC_F_NEW)
-               shadow = ERR_PTR(-ENOENT);
-       else
-               shadow = htable_lookup(s, &bd, f, &version);
+       if (conf && conf->loc_flags & LOC_F_NEW) {
+               int status = rhashtable_insert_fast(hs, &o->lo_header->loh_hash,
+                                                   obj_hash_params);
+               if (status)
+                       /* Strange error - go the slow way */
+                       shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
+               else
+                       shadow = ERR_PTR(-ENOENT);
+       } else {
+               shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
+       }
        if (likely(PTR_ERR(shadow) == -ENOENT)) {
-               cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-               cfs_hash_bd_unlock(hs, &bd, 1);
-
                /*
+                * The new object has been successfully inserted.
+                *
                 * This may result in rather complicated operations, including
                 * fld queries, inode loading, etc.
                 */
@@ -846,7 +901,7 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
                        RETURN(ERR_PTR(rc));
                }
 
-               wake_up_all(&bkt->lsb_waitq);
+               wake_up(&bkt->lsb_waitq);
 
                lu_object_limit(env, dev);
 
@@ -854,10 +909,10 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        }
 
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
-       cfs_hash_bd_unlock(hs, &bd, 1);
        lu_object_free(env, o);
 
        if (!(conf && conf->loc_flags & LOC_F_NEW) &&
+           !IS_ERR(shadow) &&
            !lu_object_is_inited(shadow->lo_header)) {
                wait_event_idle(bkt->lsb_waitq,
                                lu_object_is_inited(shadow->lo_header) ||
@@ -935,14 +990,9 @@ struct lu_site_print_arg {
         lu_printer_t     lsp_printer;
 };
 
-static int
-lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
-                 struct hlist_node *hnode, void *data)
+static void
+lu_site_obj_print(struct lu_object_header *h, struct lu_site_print_arg *arg)
 {
-       struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
-       struct lu_object_header  *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
        if (!list_empty(&h->loh_layers)) {
                const struct lu_object *o;
 
@@ -953,33 +1003,45 @@ lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
                lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
                                       arg->lsp_printer, h);
        }
-       return 0;
 }
 
 /**
  * Print all objects in \a s.
  */
-void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
-                   lu_printer_t printer)
+void lu_site_print(const struct lu_env *env, struct lu_site *s, atomic_t *ref,
+                  int msg_flag, lu_printer_t printer)
 {
-        struct lu_site_print_arg arg = {
-                .lsp_env     = (struct lu_env *)env,
-                .lsp_cookie  = cookie,
-                .lsp_printer = printer,
-        };
+       struct lu_site_print_arg arg = {
+               .lsp_env     = (struct lu_env *)env,
+               .lsp_printer = printer,
+       };
+       struct rhashtable_iter iter;
+       struct lu_object_header *h;
+       LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, msg_flag, NULL);
+
+       if (!s || !atomic_read(ref))
+               return;
 
-        cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
+       arg.lsp_cookie = (void *)&msgdata;
+
+       rhashtable_walk_enter(&s->ls_obj_hash, &iter);
+       rhashtable_walk_start(&iter);
+       while ((h = rhashtable_walk_next(&iter)) != NULL) {
+               if (IS_ERR(h))
+                       continue;
+               lu_site_obj_print(h, &arg);
+       }
+       rhashtable_walk_stop(&iter);
+       rhashtable_walk_exit(&iter);
 }
 EXPORT_SYMBOL(lu_site_print);
 
 /**
  * Return desired hash table order.
  */
-static unsigned long lu_htable_order(struct lu_device *top)
+static void lu_htable_limits(struct lu_device *top)
 {
        unsigned long cache_size;
-       unsigned long bits;
-       unsigned long bits_max = LU_SITE_BITS_MAX;
 
        /*
         * For ZFS based OSDs the cache should be disabled by default.  This
@@ -988,110 +1050,40 @@ static unsigned long lu_htable_order(struct lu_device *top)
         * always stay cached it must maintain a hold on them.
         */
        if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
-               lu_cache_percent = 1;
                lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
-               return LU_SITE_BITS_MIN;
+               return;
        }
 
-       if (strcmp(top->ld_type->ldt_name, LUSTRE_VVP_NAME) == 0)
-               bits_max = LU_SITE_BITS_MAX_CL;
-
-        /*
-         * Calculate hash table size, assuming that we want reasonable
-         * performance when 20% of total memory is occupied by cache of
-         * lu_objects.
-         *
-         * Size of lu_object is (arbitrary) taken as 1K (together with inode).
-         */
+       /*
+        * Calculate hash table size, assuming that we want reasonable
+        * performance when 20% of total memory is occupied by cache of
+        * lu_objects.
+        *
+        * Size of lu_object is (arbitrary) taken as 1K (together with inode).
+        */
        cache_size = cfs_totalram_pages();
 
 #if BITS_PER_LONG == 32
-        /* limit hashtable size for lowmem systems to low RAM */
+       /* limit hashtable size for lowmem systems to low RAM */
        if (cache_size > 1 << (30 - PAGE_SHIFT))
                cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
 #endif
 
-        /* clear off unreasonable cache setting. */
-        if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
-                CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
-                      " the range of (0, %u]. Will use default value: %u.\n",
-                      lu_cache_percent, LU_CACHE_PERCENT_MAX,
-                      LU_CACHE_PERCENT_DEFAULT);
+       /* clear off unreasonable cache setting. */
+       if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
+               CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n",
+                     lu_cache_percent, LU_CACHE_PERCENT_MAX,
+                     LU_CACHE_PERCENT_DEFAULT);
 
-                lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
-        }
-        cache_size = cache_size / 100 * lu_cache_percent *
+               lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
+       }
+       cache_size = cache_size / 100 * lu_cache_percent *
                (PAGE_SIZE / 1024);
 
-        for (bits = 1; (1 << bits) < cache_size; ++bits) {
-                ;
-        }
-
-       return clamp_t(typeof(bits), bits, LU_SITE_BITS_MIN, bits_max);
-}
-
-static unsigned lu_obj_hop_hash(struct cfs_hash *hs,
-                               const void *key, unsigned mask)
-{
-       struct lu_fid  *fid = (struct lu_fid *)key;
-       __u32           hash;
-
-       hash = fid_flatten32(fid);
-       hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
-       hash = hash_long(hash, hs->hs_bkt_bits);
-
-       /* give me another random factor */
-       hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
-
-       hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
-       hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
-
-       return hash & mask;
-}
-
-static void *lu_obj_hop_object(struct hlist_node *hnode)
-{
-       return hlist_entry(hnode, struct lu_object_header, loh_hash);
-}
-
-static void *lu_obj_hop_key(struct hlist_node *hnode)
-{
-       struct lu_object_header *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       return &h->loh_fid;
-}
-
-static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
-{
-       struct lu_object_header *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
-}
-
-static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-       struct lu_object_header *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       atomic_inc(&h->loh_ref);
-}
-
-static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-        LBUG(); /* we should never called it */
+       lu_cache_nr = clamp_t(typeof(cache_size), cache_size,
+                             LU_CACHE_NR_MIN, LU_CACHE_NR_MAX);
 }
 
-static struct cfs_hash_ops lu_site_hash_ops = {
-        .hs_hash        = lu_obj_hop_hash,
-        .hs_key         = lu_obj_hop_key,
-        .hs_keycmp      = lu_obj_hop_keycmp,
-        .hs_object      = lu_obj_hop_object,
-        .hs_get         = lu_obj_hop_get,
-        .hs_put_locked  = lu_obj_hop_put_locked,
-};
-
 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
 {
        spin_lock(&s->ls_ld_lock);
@@ -1115,14 +1107,13 @@ EXPORT_SYMBOL(lu_dev_del_linkage);
 int lu_site_init(struct lu_site *s, struct lu_device *top)
 {
        struct lu_site_bkt_data *bkt;
-       char name[16];
-       unsigned long bits;
        unsigned int i;
        int rc;
        ENTRY;
 
        memset(s, 0, sizeof *s);
        mutex_init(&s->ls_purge_mutex);
+       lu_htable_limits(top);
 
 #ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG
        rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS);
@@ -1132,24 +1123,8 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        if (rc)
                return -ENOMEM;
 
-       snprintf(name, sizeof(name), "lu_site_%s", top->ld_type->ldt_name);
-       for (bits = lu_htable_order(top);
-            bits >= LU_SITE_BITS_MIN; bits--) {
-               s->ls_obj_hash = cfs_hash_create(name, bits, bits,
-                                                bits - LU_SITE_BKT_BITS,
-                                                0, 0, 0,
-                                                &lu_site_hash_ops,
-                                                CFS_HASH_SPIN_BKTLOCK |
-                                                CFS_HASH_NO_ITEMREF |
-                                                CFS_HASH_DEPTH |
-                                                CFS_HASH_ASSERT_EMPTY |
-                                                CFS_HASH_COUNTER);
-               if (s->ls_obj_hash != NULL)
-                       break;
-       }
-
-       if (s->ls_obj_hash == NULL) {
-               CERROR("failed to create lu_site hash with bits: %lu\n", bits);
+       if (rhashtable_init(&s->ls_obj_hash, &obj_hash_params) != 0) {
+               CERROR("failed to create lu_site hash\n");
                return -ENOMEM;
        }
 
@@ -1159,8 +1134,7 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt);
        OBD_ALLOC_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
        if (!s->ls_bkts) {
-               cfs_hash_putref(s->ls_obj_hash);
-               s->ls_obj_hash = NULL;
+               rhashtable_destroy(&s->ls_obj_hash);
                s->ls_bkts = NULL;
                return -ENOMEM;
        }
@@ -1174,9 +1148,8 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
        if (s->ls_stats == NULL) {
                OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
-               cfs_hash_putref(s->ls_obj_hash);
-               s->ls_obj_hash = NULL;
                s->ls_bkts = NULL;
+               rhashtable_destroy(&s->ls_obj_hash);
                return -ENOMEM;
        }
 
@@ -1219,12 +1192,11 @@ void lu_site_fini(struct lu_site *s)
 
        percpu_counter_destroy(&s->ls_lru_len_counter);
 
-        if (s->ls_obj_hash != NULL) {
-                cfs_hash_putref(s->ls_obj_hash);
-                s->ls_obj_hash = NULL;
-        }
-
-       OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
+       if (s->ls_bkts) {
+               rhashtable_destroy(&s->ls_obj_hash);
+               OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
+               s->ls_bkts = NULL;
+       }
 
         if (s->ls_top_dev != NULL) {
                 s->ls_top_dev->ld_site = NULL;
@@ -1380,7 +1352,6 @@ int lu_object_header_init(struct lu_object_header *h)
 {
         memset(h, 0, sizeof *h);
        atomic_set(&h->loh_ref, 1);
-       INIT_HLIST_NODE(&h->loh_hash);
        INIT_LIST_HEAD(&h->loh_lru);
        INIT_LIST_HEAD(&h->loh_layers);
         lu_ref_init(&h->loh_reference);
@@ -1395,7 +1366,6 @@ void lu_object_header_fini(struct lu_object_header *h)
 {
        LASSERT(list_empty(&h->loh_layers));
        LASSERT(list_empty(&h->loh_lru));
-       LASSERT(hlist_unhashed(&h->loh_hash));
         lu_ref_fini(&h->loh_reference);
 }
 EXPORT_SYMBOL(lu_object_header_fini);
@@ -2113,7 +2083,7 @@ typedef struct lu_site_stats{
 static void lu_site_stats_get(const struct lu_site *s,
                              lu_site_stats_t *stats)
 {
-       int cnt = cfs_hash_size_get(s->ls_obj_hash);
+       int cnt = atomic_read(&s->ls_obj_hash.nelems);
        /*
         * percpu_counter_sum_positive() won't accept a const pointer
         * as it does modify the struct by taking a spinlock
@@ -2379,16 +2349,23 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
  */
 int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
 {
+       const struct bucket_table *tbl;
        lu_site_stats_t stats;
+       unsigned int chains;
 
        memset(&stats, 0, sizeof(stats));
        lu_site_stats_get(s, &stats);
 
-       seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
+       rcu_read_lock();
+       tbl = rht_dereference_rcu(s->ls_obj_hash.tbl,
+                                 &((struct lu_site *)s)->ls_obj_hash);
+       chains = tbl->size;
+       rcu_read_unlock();
+       seq_printf(m, "%d/%d %d/%u %d %d %d %d %d %d %d\n",
                   stats.lss_busy,
                   stats.lss_total,
                   stats.lss_populated,
-                  CFS_HASH_NHLIST(s->ls_obj_hash),
+                  chains,
                   stats.lss_max_search,
                   ls_stats_read(s->ls_stats, LU_SS_CREATED),
                   ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
@@ -2447,27 +2424,27 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
 {
        struct lu_site          *s = o->lo_dev->ld_site;
        struct lu_fid           *old = &o->lo_header->loh_fid;
-       struct cfs_hash         *hs;
-       struct cfs_hash_bd       bd;
+       int rc;
 
        LASSERT(fid_is_zero(old));
-
+       *old = *fid;
+try_again:
+       rc = rhashtable_lookup_insert_fast(&s->ls_obj_hash,
+                                          &o->lo_header->loh_hash,
+                                          obj_hash_params);
        /* supposed to be unique */
-       hs = s->ls_obj_hash;
-       cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
-#ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
-       {
-               __u64 version = 0;
-               struct lu_object *shadow;
-
-               shadow = htable_lookup(s, &bd, fid, &version);
-               /* supposed to be unique */
-               LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
+       LASSERT(rc != -EEXIST);
+       /* handle hash table resizing */
+       if (rc == -ENOMEM) {
+               msleep(20);
+               goto try_again;
        }
-#endif
-       *old = *fid;
-       cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-       cfs_hash_bd_unlock(hs, &bd, 1);
+       /* trim the hash if its growing to big */
+       lu_object_limit(env, o->lo_dev);
+       if (rc == -E2BIG)
+               goto try_again;
+
+       LASSERTF(rc == 0, "failed hashtable insertion: rc = %d\n", rc);
 }
 EXPORT_SYMBOL(lu_object_assign_fid);
 
index beb1900..8b2a720 100644 (file)
@@ -257,11 +257,8 @@ static void ofd_stack_fini(const struct lu_env *env, struct ofd_device *m,
        }
 
        lu_site_purge(env, top->ld_site, ~0);
-       if (!cfs_hash_is_empty(top->ld_site->ls_obj_hash)) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_OTHER, NULL);
-               lu_site_print(env, top->ld_site, &msgdata, lu_cdebug_printer);
-       }
-
+       lu_site_print(env, top->ld_site, &top->ld_site->ls_obj_hash.nelems,
+                     D_OTHER, lu_cdebug_printer);
        LASSERT(m->ofd_osd_exp);
        obd_disconnect(m->ofd_osd_exp);
 
index 84df699..ef29623 100644 (file)
@@ -7974,10 +7974,8 @@ static struct lu_device *osd_device_free(const struct lu_env *env,
        /* XXX: make osd top device in order to release reference */
        d->ld_site->ls_top_dev = d;
        lu_site_purge(env, d->ld_site, -1);
-       if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
-               lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
-       }
+       lu_site_print(env, d->ld_site, &d->ld_site->ls_obj_hash.nelems,
+                     D_ERROR, lu_cdebug_printer);
        lu_site_fini(&o->od_site);
        dt_device_fini(&o->od_dt_dev);
        OBD_FREE_PTR(o);
index 92355fc..cb08b0c 100644 (file)
@@ -97,8 +97,6 @@ static void arc_prune_func(int64_t bytes, void *private)
        struct lu_env      env;
        int rc;
 
-       LASSERT(site->ls_obj_hash);
-
        rc = lu_env_init(&env, LCT_SHRINKER);
        if (rc) {
                CERROR("%s: can't initialize shrinker env: rc = %d\n",
@@ -1341,10 +1339,8 @@ static struct lu_device *osd_device_free(const struct lu_env *env,
        /* XXX: make osd top device in order to release reference */
        d->ld_site->ls_top_dev = d;
        lu_site_purge(env, d->ld_site, -1);
-       if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
-               lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
-       }
+       lu_site_print(env, d->ld_site, &d->ld_site->ls_obj_hash.nelems,
+                     D_ERROR, lu_cdebug_printer);
        lu_site_fini(&o->od_site);
        dt_device_fini(&o->od_dt_dev);
        OBD_FREE_PTR(o);
index d8c83aa..188aca0 100644 (file)
@@ -307,10 +307,8 @@ static struct lu_device *lwp_device_free(const struct lu_env *env,
        struct lwp_device *m = lu2lwp_dev(lu);
        ENTRY;
 
-       if (atomic_read(&lu->ld_ref) && lu->ld_site) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
-               lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer);
-       }
+       lu_site_print(env, lu->ld_site, &lu->ld_ref, D_ERROR,
+                     lu_cdebug_printer);
        lu_device_fini(&m->lpd_dev);
        OBD_FREE_PTR(m);
        RETURN(NULL);
index e154556..6d2a7f2 100644 (file)
@@ -1294,10 +1294,8 @@ static struct lu_device *osp_device_free(const struct lu_env *env,
 {
        struct osp_device *osp = lu2osp_dev(lu);
 
-       if (atomic_read(&lu->ld_ref) && lu->ld_site) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
-               lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer);
-       }
+       lu_site_print(env, lu->ld_site, &lu->ld_ref, D_ERROR,
+                     lu_cdebug_printer);
        dt_device_fini(&osp->opd_dt_dev);
        OBD_FREE_PTR(osp);