Whamcloud - gitweb
LU-8130 lu_object: convert lu_object cache to rhashtable
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 440767d..5da1aa6 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_CLASS
 
+#include <linux/delay.h>
 #include <linux/module.h>
 #include <linux/list.h>
 #include <linux/processor.h>
 #include <linux/random.h>
 
 #include <libcfs/libcfs.h>
-#include <libcfs/libcfs_hash.h> /* hash_long() */
 #include <libcfs/linux/linux-mem.h>
 #include <obd_class.h>
 #include <obd_support.h>
@@ -84,13 +84,12 @@ enum {
 #define        LU_CACHE_NR_MAX_ADJUST          512
 #define        LU_CACHE_NR_UNLIMITED           -1
 #define        LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
-#define        LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
 /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
 #define        LU_CACHE_NR_ZFS_LIMIT           10240
 
-#define LU_SITE_BITS_MIN    12
-#define LU_SITE_BITS_MAX    24
-#define LU_SITE_BITS_MAX_CL 19
+#define        LU_CACHE_NR_MIN                 4096
+#define        LU_CACHE_NR_MAX                 0x80000000UL
+
 /**
  * Max 256 buckets, we don't want too many buckets because:
  * - consume too much memory (currently max 16K)
@@ -100,7 +99,6 @@ enum {
  */
 #define LU_SITE_BKT_BITS    8
 
-
 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
 module_param(lu_cache_percent, int, 0644);
 MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
@@ -112,7 +110,7 @@ MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
 
-static u32 lu_fid_hash(const void *data, u32 seed)
+static u32 lu_fid_hash(const void *data, u32 len, u32 seed)
 {
        const struct lu_fid *fid = data;
 
@@ -121,9 +119,17 @@ static u32 lu_fid_hash(const void *data, u32 seed)
        return seed;
 }
 
+static const struct rhashtable_params obj_hash_params = {
+       .key_len        = sizeof(struct lu_fid),
+       .key_offset     = offsetof(struct lu_object_header, loh_fid),
+       .head_offset    = offsetof(struct lu_object_header, loh_hash),
+       .hashfn         = lu_fid_hash,
+       .automatic_shrinking = true,
+};
+
 static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid)
 {
-       return lu_fid_hash(fid, s->ls_bkt_seed) &
+       return lu_fid_hash(fid, sizeof(*fid), s->ls_bkt_seed) &
               (s->ls_bkt_cnt - 1);
 }
 
@@ -148,9 +154,7 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
        struct lu_object_header *top = o->lo_header;
        struct lu_site *site = o->lo_dev->ld_site;
        struct lu_object *orig = o;
-       struct cfs_hash_bd bd;
        const struct lu_fid *fid = lu_object_fid(o);
-       bool is_dying;
 
        /*
         * till we have full fids-on-OST implemented anonymous objects
@@ -158,8 +162,6 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
         * so we should not remove it from the site.
         */
        if (fid_is_zero(fid)) {
-               LASSERT(top->loh_hash.next == NULL
-                       && top->loh_hash.pprev == NULL);
                LASSERT(list_empty(&top->loh_lru));
                if (!atomic_dec_and_test(&top->loh_ref))
                        return;
@@ -171,40 +173,45 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                return;
        }
 
-       cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
-
-       is_dying = lu_object_is_dying(top);
-       if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
-               /* at this point the object reference is dropped and lock is
+       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+       if (atomic_add_unless(&top->loh_ref, -1, 1)) {
+still_active:
+               /*
+                * At this point the object reference is dropped and lock is
                 * not taken, so lu_object should not be touched because it
-                * can be freed by concurrent thread. Use local variable for
-                * check.
+                * can be freed by concurrent thread.
+                *
+                * Somebody may be waiting for this, currently only used for
+                * cl_object, see cl_object_put_last().
                 */
-               if (is_dying) {
-                       /*
-                        * somebody may be waiting for this, currently only
-                        * used for cl_object, see cl_object_put_last().
-                        */
-                       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
-                       wake_up_all(&bkt->lsb_waitq);
-               }
+               wake_up(&bkt->lsb_waitq);
+
                return;
        }
 
+       spin_lock(&bkt->lsb_waitq.lock);
+       if (!atomic_dec_and_test(&top->loh_ref)) {
+               spin_unlock(&bkt->lsb_waitq.lock);
+               goto still_active;
+       }
+
+       /*
+        * Refcount is zero, and cannot be incremented without taking the bkt
+        * lock, so object is stable.
+        */
+
        /*
-        * When last reference is released, iterate over object
-        * layers, and notify them that object is no longer busy.
+        * When last reference is released, iterate over object layers, and
+        * notify them that object is no longer busy.
         */
        list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
                if (o->lo_ops->loo_object_release != NULL)
                        o->lo_ops->loo_object_release(env, o);
        }
 
-       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
-       spin_lock(&bkt->lsb_waitq.lock);
-
-       /* don't use local 'is_dying' here because if was taken without lock
-        * but here we need the latest actual value of it so check lu_object
+       /*
+        * Don't use local 'is_dying' here because if was taken without lock but
+        * here we need the latest actual value of it so check lu_object
         * directly here.
         */
        if (!lu_object_is_dying(top) &&
@@ -213,26 +220,26 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
                spin_unlock(&bkt->lsb_waitq.lock);
                percpu_counter_inc(&site->ls_lru_len_counter);
-               CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n",
-                      orig, top, site->ls_obj_hash, bkt);
-               cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
+               CDEBUG(D_INODE, "Add %p/%p to site lru. bkt: %p\n",
+                      orig, top, bkt);
                return;
        }
 
        /*
-        * If object is dying (will not be cached) then remove it
-        * from hash table (it is already not on the LRU).
+        * If object is dying (will not be cached) then remove it from hash
+        * table (it is already not on the LRU).
         *
-        * This is done with hash table lists locked. As the only
-        * way to acquire first reference to previously unreferenced
-        * object is through hash-table lookup (lu_object_find())
-        * which is done under hash-table, no race with concurrent
-        * object lookup is possible and we can safely destroy object below.
+        * This is done with bucket lock held.  As the only way to acquire first
+        * reference to previously unreferenced object is through hash-table
+        * lookup (lu_object_find()) which takes the lock for first reference,
+        * no race with concurrent object lookup is possible and we can safely
+        * destroy object below.
         */
        if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
-               cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
+               rhashtable_remove_fast(&site->ls_obj_hash, &top->loh_hash,
+                                      obj_hash_params);
+
        spin_unlock(&bkt->lsb_waitq.lock);
-       cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
        /* Object was already removed from hash above, can kill it. */
        lu_object_free(env, orig);
 }
@@ -261,21 +268,19 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
        set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
        if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
                struct lu_site *site = o->lo_dev->ld_site;
-               struct cfs_hash *obj_hash = site->ls_obj_hash;
-               struct cfs_hash_bd bd;
+               struct rhashtable *obj_hash = &site->ls_obj_hash;
+               struct lu_site_bkt_data *bkt;
 
-               cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
+               bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+               spin_lock(&bkt->lsb_waitq.lock);
                if (!list_empty(&top->loh_lru)) {
-                       struct lu_site_bkt_data *bkt;
-
-                       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
-                       spin_lock(&bkt->lsb_waitq.lock);
                        list_del_init(&top->loh_lru);
-                       spin_unlock(&bkt->lsb_waitq.lock);
                        percpu_counter_dec(&site->ls_lru_len_counter);
                }
-               cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
-               cfs_hash_bd_unlock(obj_hash, &bd, 1);
+               spin_unlock(&bkt->lsb_waitq.lock);
+
+               rhashtable_remove_fast(obj_hash, &top->loh_hash,
+                                      obj_hash_params);
        }
 }
 EXPORT_SYMBOL(lu_object_unhash);
@@ -419,39 +424,39 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                          int nr, int canblock)
 {
-        struct lu_object_header *h;
-        struct lu_object_header *temp;
-        struct lu_site_bkt_data *bkt;
+       struct lu_object_header *h;
+       struct lu_object_header *temp;
+       struct lu_site_bkt_data *bkt;
        LIST_HEAD(dispose);
        int                      did_sth;
        unsigned int             start = 0;
-        int                      count;
-        int                      bnr;
+       int                      count;
+       int                      bnr;
        unsigned int             i;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
                RETURN(0);
 
-        /*
-         * Under LRU list lock, scan LRU list and move unreferenced objects to
-         * the dispose list, removing them from LRU and hash table.
-         */
+       /*
+        * Under LRU list lock, scan LRU list and move unreferenced objects to
+        * the dispose list, removing them from LRU and hash table.
+        */
        if (nr != ~0)
                start = s->ls_purge_start;
        bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1;
- again:
+again:
        /*
         * It doesn't make any sense to make purge threads parallel, that can
-        * only bring troubles to us. See LU-5331.
+        * only bring troubles to us.  See LU-5331.
         */
        if (canblock != 0)
                mutex_lock(&s->ls_purge_mutex);
        else if (mutex_trylock(&s->ls_purge_mutex) == 0)
                goto out;
 
-        did_sth = 0;
+       did_sth = 0;
        for (i = start; i < s->ls_bkt_cnt ; i++) {
-                count = bnr;
+               count = bnr;
                bkt = &s->ls_bkts[i];
                spin_lock(&bkt->lsb_waitq.lock);
 
@@ -460,21 +465,19 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
 
                        LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i);
 
-                       /* Cannot remove from hash under current spinlock,
-                        * so set flag to stop object from being found
-                        * by htable_lookup().
-                        */
-                       set_bit(LU_OBJECT_PURGING, &h->loh_flags);
+                       set_bit(LU_OBJECT_UNHASHED, &h->loh_flags);
+                       rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
+                                              obj_hash_params);
                        list_move(&h->loh_lru, &dispose);
                        percpu_counter_dec(&s->ls_lru_len_counter);
-                        if (did_sth == 0)
-                                did_sth = 1;
+                       if (did_sth == 0)
+                               did_sth = 1;
 
-                        if (nr != ~0 && --nr == 0)
-                                break;
+                       if (nr != ~0 && --nr == 0)
+                               break;
 
-                        if (count > 0 && --count == 0)
-                                break;
+                       if (count > 0 && --count == 0)
+                               break;
 
                }
                spin_unlock(&bkt->lsb_waitq.lock);
@@ -486,25 +489,24 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                while ((h = list_first_entry_or_null(&dispose,
                                                     struct lu_object_header,
                                                     loh_lru)) != NULL) {
-                       cfs_hash_del(s->ls_obj_hash, &h->loh_fid, &h->loh_hash);
                        list_del_init(&h->loh_lru);
                        lu_object_free(env, lu_object_top(h));
                        lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
                }
 
-                if (nr == 0)
-                        break;
-        }
+               if (nr == 0)
+                       break;
+       }
        mutex_unlock(&s->ls_purge_mutex);
 
-        if (nr != 0 && did_sth && start != 0) {
-                start = 0; /* restart from the first bucket */
-                goto again;
-        }
-        /* race on s->ls_purge_start, but nobody cares */
+       if (nr != 0 && did_sth && start != 0) {
+               start = 0; /* restart from the first bucket */
+               goto again;
+       }
+       /* race on s->ls_purge_start, but nobody cares */
        s->ls_purge_start = i & (s->ls_bkt_cnt - 1);
 out:
-        return nr;
+       return nr;
 }
 EXPORT_SYMBOL(lu_site_purge_objects);
 
@@ -598,9 +600,9 @@ void lu_object_header_print(const struct lu_env *env, void *cookie,
        (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
                   hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
                   PFID(&hdr->loh_fid),
-                  hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
-                  list_empty((struct list_head *)&hdr->loh_lru) ? \
-                  "" : " lru",
+                  test_bit(LU_OBJECT_UNHASHED,
+                           &hdr->loh_flags) ? "" : " hash",
+                  list_empty(&hdr->loh_lru) ? "" : " lru",
                   hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
 }
 EXPORT_SYMBOL(lu_object_header_print);
@@ -652,50 +654,96 @@ int lu_object_invariant(const struct lu_object *o)
         return 1;
 }
 
-static struct lu_object *htable_lookup(struct lu_site *s,
-                                      struct cfs_hash_bd *bd,
+/*
+ * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because the
+ * calculation for the number of objects to reclaim is not covered by a lock the
+ * maximum number of objects is capped by LU_CACHE_MAX_ADJUST.  This ensures
+ * that many concurrent threads will not accidentally purge the entire cache.
+ */
+static void lu_object_limit(const struct lu_env *env,
+                           struct lu_device *dev)
+{
+       u64 size, nr;
+
+       if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
+               return;
+
+       size = atomic_read(&dev->ld_site->ls_obj_hash.nelems);
+       nr = (u64)lu_cache_nr;
+       if (size <= nr)
+               return;
+
+       lu_site_purge_objects(env, dev->ld_site,
+                             min_t(u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
+                             0);
+}
+
+static struct lu_object *htable_lookup(const struct lu_env *env,
+                                      struct lu_device *dev,
+                                      struct lu_site_bkt_data *bkt,
                                       const struct lu_fid *f,
-                                      __u64 *version)
+                                      struct lu_object_header *new)
 {
+       struct lu_site *s = dev->ld_site;
        struct lu_object_header *h;
-       struct hlist_node *hnode;
-       __u64 ver = cfs_hash_bd_version_get(bd);
 
-       if (*version == ver)
+try_again:
+       rcu_read_lock();
+       if (new)
+               h = rhashtable_lookup_get_insert_fast(&s->ls_obj_hash,
+                                                     &new->loh_hash,
+                                                     obj_hash_params);
+       else
+               h = rhashtable_lookup(&s->ls_obj_hash, f, obj_hash_params);
+
+       if (IS_ERR_OR_NULL(h)) {
+               /* Not found */
+               if (!new)
+                       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
+               rcu_read_unlock();
+               if (PTR_ERR(h) == -ENOMEM) {
+                       msleep(20);
+                       goto try_again;
+               }
+               lu_object_limit(env, dev);
+               if (PTR_ERR(h) == -E2BIG)
+                       goto try_again;
+
                return ERR_PTR(-ENOENT);
+       }
 
-       *version = ver;
-       /* cfs_hash_bd_peek_locked is a somehow "internal" function
-        * of cfs_hash, it doesn't add refcount on object. */
-       hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
-       if (!hnode) {
+       if (atomic_inc_not_zero(&h->loh_ref)) {
+               rcu_read_unlock();
+               return lu_object_top(h);
+       }
+
+       spin_lock(&bkt->lsb_waitq.lock);
+       if (lu_object_is_dying(h) ||
+           test_bit(LU_OBJECT_UNHASHED, &h->loh_flags)) {
+               spin_unlock(&bkt->lsb_waitq.lock);
+               rcu_read_unlock();
+               if (new) {
+                       /*
+                        * Old object might have already been removed, or will
+                        * be soon.  We need to insert our new object, so
+                        * remove the old one just in case it is still there.
+                        */
+                       rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
+                                              obj_hash_params);
+                       goto try_again;
+               }
                lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
                return ERR_PTR(-ENOENT);
        }
+       /* Now protected by spinlock */
+       rcu_read_unlock();
 
-       h = container_of(hnode, struct lu_object_header, loh_hash);
        if (!list_empty(&h->loh_lru)) {
-               struct lu_site_bkt_data *bkt;
-
-               bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
-               spin_lock(&bkt->lsb_waitq.lock);
-               /* Might have just been moved to the dispose list, in which
-                * case LU_OBJECT_PURGING will be set.  In that case,
-                * delete it from the hash table immediately.
-                * When lu_site_purge_objects() tried, it will find it
-                * isn't there, which is harmless.
-                */
-               if (test_bit(LU_OBJECT_PURGING, &h->loh_flags)) {
-                       spin_unlock(&bkt->lsb_waitq.lock);
-                       cfs_hash_bd_del_locked(s->ls_obj_hash, bd, hnode);
-                       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
-                       return ERR_PTR(-ENOENT);
-               }
                list_del_init(&h->loh_lru);
-               spin_unlock(&bkt->lsb_waitq.lock);
                percpu_counter_dec(&s->ls_lru_len_counter);
        }
-       cfs_hash_get(s->ls_obj_hash, hnode);
+       atomic_inc(&h->loh_ref);
+       spin_unlock(&bkt->lsb_waitq.lock);
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        return lu_object_top(h);
 }
@@ -714,29 +762,37 @@ struct lu_object *lu_object_find(const struct lu_env *env,
 EXPORT_SYMBOL(lu_object_find);
 
 /*
- * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because
- * the calculation for the number of objects to reclaim is not covered by
- * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
- * This ensures that many concurrent threads will not accidentally purge
- * the entire cache.
+ * Get a 'first' reference to an object that was found while looking through the
+ * hash table.
  */
-static void lu_object_limit(const struct lu_env *env,
-                           struct lu_device *dev)
+struct lu_object *lu_object_get_first(struct lu_object_header *h,
+                                     struct lu_device *dev)
 {
-       __u64 size, nr;
+       struct lu_site *s = dev->ld_site;
+       struct lu_object *ret;
 
-       if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
-               return;
+       if (IS_ERR_OR_NULL(h) || lu_object_is_dying(h))
+               return NULL;
 
-       size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
-       nr = (__u64)lu_cache_nr;
-       if (size <= nr)
-               return;
+       ret = lu_object_locate(h, dev->ld_type);
+       if (!ret)
+               return ret;
 
-       lu_site_purge_objects(env, dev->ld_site,
-                             min_t(__u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
-                             0);
+       if (!atomic_inc_not_zero(&h->loh_ref)) {
+               struct lu_site_bkt_data *bkt;
+
+               bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
+               spin_lock(&bkt->lsb_waitq.lock);
+               if (!lu_object_is_dying(h) &&
+                   !test_bit(LU_OBJECT_UNHASHED, &h->loh_flags))
+                       atomic_inc(&h->loh_ref);
+               else
+                       ret = NULL;
+               spin_unlock(&bkt->lsb_waitq.lock);
+       }
+       return ret;
 }
+EXPORT_SYMBOL(lu_object_get_first);
 
 /**
  * Core logic of lu_object_find*() functions.
@@ -753,10 +809,8 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        struct lu_object *o;
        struct lu_object *shadow;
        struct lu_site *s;
-       struct cfs_hash *hs;
-       struct cfs_hash_bd bd;
        struct lu_site_bkt_data *bkt;
-       __u64 version = 0;
+       struct rhashtable *hs;
        int rc;
 
        ENTRY;
@@ -780,17 +834,14 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
         *
         */
        s  = dev->ld_site;
-       hs = s->ls_obj_hash;
+       hs = &s->ls_obj_hash;
 
        if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
                lu_site_purge(env, s, -1);
 
        bkt = &s->ls_bkts[lu_bkt_hash(s, f)];
-       cfs_hash_bd_get(hs, f, &bd);
        if (!(conf && conf->loc_flags & LOC_F_NEW)) {
-               cfs_hash_bd_lock(hs, &bd, 1);
-               o = htable_lookup(s, &bd, f, &version);
-               cfs_hash_bd_unlock(hs, &bd, 1);
+               o = htable_lookup(env, dev, bkt, f, NULL);
 
                if (!IS_ERR(o)) {
                        if (likely(lu_object_is_inited(o->lo_header)))
@@ -826,17 +877,21 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
 
        CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
 
-       cfs_hash_bd_lock(hs, &bd, 1);
-
-       if (conf && conf->loc_flags & LOC_F_NEW)
-               shadow = ERR_PTR(-ENOENT);
-       else
-               shadow = htable_lookup(s, &bd, f, &version);
+       if (conf && conf->loc_flags & LOC_F_NEW) {
+               int status = rhashtable_insert_fast(hs, &o->lo_header->loh_hash,
+                                                   obj_hash_params);
+               if (status)
+                       /* Strange error - go the slow way */
+                       shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
+               else
+                       shadow = ERR_PTR(-ENOENT);
+       } else {
+               shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
+       }
        if (likely(PTR_ERR(shadow) == -ENOENT)) {
-               cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-               cfs_hash_bd_unlock(hs, &bd, 1);
-
                /*
+                * The new object has been successfully inserted.
+                *
                 * This may result in rather complicated operations, including
                 * fld queries, inode loading, etc.
                 */
@@ -846,7 +901,7 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
                        RETURN(ERR_PTR(rc));
                }
 
-               wake_up_all(&bkt->lsb_waitq);
+               wake_up(&bkt->lsb_waitq);
 
                lu_object_limit(env, dev);
 
@@ -854,10 +909,10 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        }
 
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
-       cfs_hash_bd_unlock(hs, &bd, 1);
        lu_object_free(env, o);
 
        if (!(conf && conf->loc_flags & LOC_F_NEW) &&
+           !IS_ERR(shadow) &&
            !lu_object_is_inited(shadow->lo_header)) {
                wait_event_idle(bkt->lsb_waitq,
                                lu_object_is_inited(shadow->lo_header) ||
@@ -935,14 +990,9 @@ struct lu_site_print_arg {
         lu_printer_t     lsp_printer;
 };
 
-static int
-lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
-                 struct hlist_node *hnode, void *data)
+static void
+lu_site_obj_print(struct lu_object_header *h, struct lu_site_print_arg *arg)
 {
-       struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
-       struct lu_object_header  *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
        if (!list_empty(&h->loh_layers)) {
                const struct lu_object *o;
 
@@ -953,33 +1003,45 @@ lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
                lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
                                       arg->lsp_printer, h);
        }
-       return 0;
 }
 
 /**
  * Print all objects in \a s.
  */
-void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
-                   lu_printer_t printer)
+void lu_site_print(const struct lu_env *env, struct lu_site *s, atomic_t *ref,
+                  int msg_flag, lu_printer_t printer)
 {
-        struct lu_site_print_arg arg = {
-                .lsp_env     = (struct lu_env *)env,
-                .lsp_cookie  = cookie,
-                .lsp_printer = printer,
-        };
+       struct lu_site_print_arg arg = {
+               .lsp_env     = (struct lu_env *)env,
+               .lsp_printer = printer,
+       };
+       struct rhashtable_iter iter;
+       struct lu_object_header *h;
+       LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, msg_flag, NULL);
+
+       if (!s || !atomic_read(ref))
+               return;
 
-        cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
+       arg.lsp_cookie = (void *)&msgdata;
+
+       rhashtable_walk_enter(&s->ls_obj_hash, &iter);
+       rhashtable_walk_start(&iter);
+       while ((h = rhashtable_walk_next(&iter)) != NULL) {
+               if (IS_ERR(h))
+                       continue;
+               lu_site_obj_print(h, &arg);
+       }
+       rhashtable_walk_stop(&iter);
+       rhashtable_walk_exit(&iter);
 }
 EXPORT_SYMBOL(lu_site_print);
 
 /**
  * Return desired hash table order.
  */
-static unsigned long lu_htable_order(struct lu_device *top)
+static void lu_htable_limits(struct lu_device *top)
 {
        unsigned long cache_size;
-       unsigned long bits;
-       unsigned long bits_max = LU_SITE_BITS_MAX;
 
        /*
         * For ZFS based OSDs the cache should be disabled by default.  This
@@ -988,110 +1050,40 @@ static unsigned long lu_htable_order(struct lu_device *top)
         * always stay cached it must maintain a hold on them.
         */
        if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
-               lu_cache_percent = 1;
                lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
-               return LU_SITE_BITS_MIN;
+               return;
        }
 
-       if (strcmp(top->ld_type->ldt_name, LUSTRE_VVP_NAME) == 0)
-               bits_max = LU_SITE_BITS_MAX_CL;
-
-        /*
-         * Calculate hash table size, assuming that we want reasonable
-         * performance when 20% of total memory is occupied by cache of
-         * lu_objects.
-         *
-         * Size of lu_object is (arbitrary) taken as 1K (together with inode).
-         */
+       /*
+        * Calculate hash table size, assuming that we want reasonable
+        * performance when 20% of total memory is occupied by cache of
+        * lu_objects.
+        *
+        * Size of lu_object is (arbitrary) taken as 1K (together with inode).
+        */
        cache_size = cfs_totalram_pages();
 
 #if BITS_PER_LONG == 32
-        /* limit hashtable size for lowmem systems to low RAM */
+       /* limit hashtable size for lowmem systems to low RAM */
        if (cache_size > 1 << (30 - PAGE_SHIFT))
                cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
 #endif
 
-        /* clear off unreasonable cache setting. */
-        if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
-                CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
-                      " the range of (0, %u]. Will use default value: %u.\n",
-                      lu_cache_percent, LU_CACHE_PERCENT_MAX,
-                      LU_CACHE_PERCENT_DEFAULT);
+       /* clear off unreasonable cache setting. */
+       if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
+               CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n",
+                     lu_cache_percent, LU_CACHE_PERCENT_MAX,
+                     LU_CACHE_PERCENT_DEFAULT);
 
-                lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
-        }
-        cache_size = cache_size / 100 * lu_cache_percent *
+               lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
+       }
+       cache_size = cache_size / 100 * lu_cache_percent *
                (PAGE_SIZE / 1024);
 
-        for (bits = 1; (1 << bits) < cache_size; ++bits) {
-                ;
-        }
-
-       return clamp_t(typeof(bits), bits, LU_SITE_BITS_MIN, bits_max);
-}
-
-static unsigned lu_obj_hop_hash(struct cfs_hash *hs,
-                               const void *key, unsigned mask)
-{
-       struct lu_fid  *fid = (struct lu_fid *)key;
-       __u32           hash;
-
-       hash = fid_flatten32(fid);
-       hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
-       hash = hash_long(hash, hs->hs_bkt_bits);
-
-       /* give me another random factor */
-       hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
-
-       hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
-       hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
-
-       return hash & mask;
-}
-
-static void *lu_obj_hop_object(struct hlist_node *hnode)
-{
-       return hlist_entry(hnode, struct lu_object_header, loh_hash);
-}
-
-static void *lu_obj_hop_key(struct hlist_node *hnode)
-{
-       struct lu_object_header *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       return &h->loh_fid;
-}
-
-static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
-{
-       struct lu_object_header *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
-}
-
-static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-       struct lu_object_header *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       atomic_inc(&h->loh_ref);
-}
-
-static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-        LBUG(); /* we should never called it */
+       lu_cache_nr = clamp_t(typeof(cache_size), cache_size,
+                             LU_CACHE_NR_MIN, LU_CACHE_NR_MAX);
 }
 
-static struct cfs_hash_ops lu_site_hash_ops = {
-        .hs_hash        = lu_obj_hop_hash,
-        .hs_key         = lu_obj_hop_key,
-        .hs_keycmp      = lu_obj_hop_keycmp,
-        .hs_object      = lu_obj_hop_object,
-        .hs_get         = lu_obj_hop_get,
-        .hs_put_locked  = lu_obj_hop_put_locked,
-};
-
 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
 {
        spin_lock(&s->ls_ld_lock);
@@ -1115,14 +1107,13 @@ EXPORT_SYMBOL(lu_dev_del_linkage);
 int lu_site_init(struct lu_site *s, struct lu_device *top)
 {
        struct lu_site_bkt_data *bkt;
-       char name[16];
-       unsigned long bits;
        unsigned int i;
        int rc;
        ENTRY;
 
        memset(s, 0, sizeof *s);
        mutex_init(&s->ls_purge_mutex);
+       lu_htable_limits(top);
 
 #ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG
        rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS);
@@ -1132,24 +1123,8 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        if (rc)
                return -ENOMEM;
 
-       snprintf(name, sizeof(name), "lu_site_%s", top->ld_type->ldt_name);
-       for (bits = lu_htable_order(top);
-            bits >= LU_SITE_BITS_MIN; bits--) {
-               s->ls_obj_hash = cfs_hash_create(name, bits, bits,
-                                                bits - LU_SITE_BKT_BITS,
-                                                0, 0, 0,
-                                                &lu_site_hash_ops,
-                                                CFS_HASH_SPIN_BKTLOCK |
-                                                CFS_HASH_NO_ITEMREF |
-                                                CFS_HASH_DEPTH |
-                                                CFS_HASH_ASSERT_EMPTY |
-                                                CFS_HASH_COUNTER);
-               if (s->ls_obj_hash != NULL)
-                       break;
-       }
-
-       if (s->ls_obj_hash == NULL) {
-               CERROR("failed to create lu_site hash with bits: %lu\n", bits);
+       if (rhashtable_init(&s->ls_obj_hash, &obj_hash_params) != 0) {
+               CERROR("failed to create lu_site hash\n");
                return -ENOMEM;
        }
 
@@ -1159,8 +1134,7 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt);
        OBD_ALLOC_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
        if (!s->ls_bkts) {
-               cfs_hash_putref(s->ls_obj_hash);
-               s->ls_obj_hash = NULL;
+               rhashtable_destroy(&s->ls_obj_hash);
                s->ls_bkts = NULL;
                return -ENOMEM;
        }
@@ -1174,9 +1148,8 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
        if (s->ls_stats == NULL) {
                OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
-               cfs_hash_putref(s->ls_obj_hash);
-               s->ls_obj_hash = NULL;
                s->ls_bkts = NULL;
+               rhashtable_destroy(&s->ls_obj_hash);
                return -ENOMEM;
        }
 
@@ -1219,12 +1192,11 @@ void lu_site_fini(struct lu_site *s)
 
        percpu_counter_destroy(&s->ls_lru_len_counter);
 
-        if (s->ls_obj_hash != NULL) {
-                cfs_hash_putref(s->ls_obj_hash);
-                s->ls_obj_hash = NULL;
-        }
-
-       OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
+       if (s->ls_bkts) {
+               rhashtable_destroy(&s->ls_obj_hash);
+               OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
+               s->ls_bkts = NULL;
+       }
 
         if (s->ls_top_dev != NULL) {
                 s->ls_top_dev->ld_site = NULL;
@@ -1380,7 +1352,6 @@ int lu_object_header_init(struct lu_object_header *h)
 {
         memset(h, 0, sizeof *h);
        atomic_set(&h->loh_ref, 1);
-       INIT_HLIST_NODE(&h->loh_hash);
        INIT_LIST_HEAD(&h->loh_lru);
        INIT_LIST_HEAD(&h->loh_layers);
         lu_ref_init(&h->loh_reference);
@@ -1395,7 +1366,6 @@ void lu_object_header_fini(struct lu_object_header *h)
 {
        LASSERT(list_empty(&h->loh_layers));
        LASSERT(list_empty(&h->loh_lru));
-       LASSERT(hlist_unhashed(&h->loh_hash));
         lu_ref_fini(&h->loh_reference);
 }
 EXPORT_SYMBOL(lu_object_header_fini);
@@ -2113,7 +2083,7 @@ typedef struct lu_site_stats{
 static void lu_site_stats_get(const struct lu_site *s,
                              lu_site_stats_t *stats)
 {
-       int cnt = cfs_hash_size_get(s->ls_obj_hash);
+       int cnt = atomic_read(&s->ls_obj_hash.nelems);
        /*
         * percpu_counter_sum_positive() won't accept a const pointer
         * as it does modify the struct by taking a spinlock
@@ -2379,16 +2349,23 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
  */
 int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
 {
+       const struct bucket_table *tbl;
        lu_site_stats_t stats;
+       unsigned int chains;
 
        memset(&stats, 0, sizeof(stats));
        lu_site_stats_get(s, &stats);
 
-       seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
+       rcu_read_lock();
+       tbl = rht_dereference_rcu(s->ls_obj_hash.tbl,
+                                 &((struct lu_site *)s)->ls_obj_hash);
+       chains = tbl->size;
+       rcu_read_unlock();
+       seq_printf(m, "%d/%d %d/%u %d %d %d %d %d %d %d\n",
                   stats.lss_busy,
                   stats.lss_total,
                   stats.lss_populated,
-                  CFS_HASH_NHLIST(s->ls_obj_hash),
+                  chains,
                   stats.lss_max_search,
                   ls_stats_read(s->ls_stats, LU_SS_CREATED),
                   ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
@@ -2447,27 +2424,27 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
 {
        struct lu_site          *s = o->lo_dev->ld_site;
        struct lu_fid           *old = &o->lo_header->loh_fid;
-       struct cfs_hash         *hs;
-       struct cfs_hash_bd       bd;
+       int rc;
 
        LASSERT(fid_is_zero(old));
-
+       *old = *fid;
+try_again:
+       rc = rhashtable_lookup_insert_fast(&s->ls_obj_hash,
+                                          &o->lo_header->loh_hash,
+                                          obj_hash_params);
        /* supposed to be unique */
-       hs = s->ls_obj_hash;
-       cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
-#ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
-       {
-               __u64 version = 0;
-               struct lu_object *shadow;
-
-               shadow = htable_lookup(s, &bd, fid, &version);
-               /* supposed to be unique */
-               LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
+       LASSERT(rc != -EEXIST);
+       /* handle hash table resizing */
+       if (rc == -ENOMEM) {
+               msleep(20);
+               goto try_again;
        }
-#endif
-       *old = *fid;
-       cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-       cfs_hash_bd_unlock(hs, &bd, 1);
+       /* trim the hash if its growing to big */
+       lu_object_limit(env, o->lo_dev);
+       if (rc == -E2BIG)
+               goto try_again;
+
+       LASSERTF(rc == 0, "failed hashtable insertion: rc = %d\n", rc);
 }
 EXPORT_SYMBOL(lu_object_assign_fid);