Whamcloud - gitweb
LU-11814 obdcalss: ensure LCT_QUIESCENT take sync
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index 63729fe..dce91d5 100644 (file)
 
 #define DEBUG_SUBSYSTEM S_CLASS
 
+#include <linux/delay.h>
 #include <linux/module.h>
 #include <linux/list.h>
+#include <linux/processor.h>
+#include <linux/random.h>
+
 #include <libcfs/libcfs.h>
-#include <libcfs/libcfs_hash.h> /* hash_long() */
 #include <libcfs/linux/linux-mem.h>
 #include <obd_class.h>
 #include <obd_support.h>
 struct lu_site_bkt_data {
        /**
         * LRU list, updated on each access to object. Protected by
-        * bucket lock of lu_site::ls_obj_hash.
+        * lsb_waitq.lock.
         *
         * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
-        * moved to the lu_site::ls_lru.prev (this is due to the non-existence
-        * of list_for_each_entry_safe_reverse()).
+        * moved to the lu_site::ls_lru.prev
         */
        struct list_head                lsb_lru;
        /**
         * Wait-queue signaled when an object in this site is ultimately
-        * destroyed (lu_object_free()). It is used by lu_object_find() to
-        * wait before re-trying when object in the process of destruction is
-        * found in the hash table.
+        * destroyed (lu_object_free()) or initialized (lu_object_start()).
+        * It is used by lu_object_find() to wait before re-trying when
+        * object in the process of destruction is found in the hash table;
+        * or wait object to be initialized by the allocator.
         *
         * \see htable_lookup().
         */
-       wait_queue_head_t               lsb_marche_funebre;
+       wait_queue_head_t               lsb_waitq;
 };
 
 enum {
@@ -81,21 +84,21 @@ enum {
 #define        LU_CACHE_NR_MAX_ADJUST          512
 #define        LU_CACHE_NR_UNLIMITED           -1
 #define        LU_CACHE_NR_DEFAULT             LU_CACHE_NR_UNLIMITED
-#define        LU_CACHE_NR_LDISKFS_LIMIT       LU_CACHE_NR_UNLIMITED
 /** This is set to roughly (20 * OSS_NTHRS_MAX) to prevent thrashing */
 #define        LU_CACHE_NR_ZFS_LIMIT           10240
 
-#define LU_SITE_BITS_MIN    12
-#define LU_SITE_BITS_MAX    24
-#define LU_SITE_BITS_MAX_CL 19
+#define        LU_CACHE_NR_MIN                 4096
+#define        LU_CACHE_NR_MAX                 0x80000000UL
+
 /**
- * total 256 buckets, we don't want too many buckets because:
- * - consume too much memory
+ * Max 256 buckets, we don't want too many buckets because:
+ * - consume too much memory (currently max 16K)
  * - avoid unbalanced LRU list
+ * With few cpus there is little gain from extra buckets, so
+ * we treat this as a maximum in lu_site_init().
  */
 #define LU_SITE_BKT_BITS    8
 
-
 static unsigned int lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
 module_param(lu_cache_percent, int, 0644);
 MODULE_PARM_DESC(lu_cache_percent, "Percentage of memory to be used as lu_object cache");
@@ -107,15 +110,36 @@ MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
 
+static u32 lu_fid_hash(const void *data, u32 len, u32 seed)
+{
+       const struct lu_fid *fid = data;
+
+       seed = cfs_hash_32(seed ^ fid->f_oid, 32);
+       seed ^= cfs_hash_64(fid->f_seq, 32);
+       return seed;
+}
+
+static const struct rhashtable_params obj_hash_params = {
+       .key_len        = sizeof(struct lu_fid),
+       .key_offset     = offsetof(struct lu_object_header, loh_fid),
+       .head_offset    = offsetof(struct lu_object_header, loh_hash),
+       .hashfn         = lu_fid_hash,
+       .automatic_shrinking = true,
+};
+
+static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid)
+{
+       return lu_fid_hash(fid, sizeof(*fid), s->ls_bkt_seed) &
+              (s->ls_bkt_cnt - 1);
+}
+
 wait_queue_head_t *
 lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
 {
-       struct cfs_hash_bd bd;
        struct lu_site_bkt_data *bkt;
 
-       cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
-       bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
-       return &bkt->lsb_marche_funebre;
+       bkt = &site->ls_bkts[lu_bkt_hash(site, fid)];
+       return &bkt->lsb_waitq;
 }
 EXPORT_SYMBOL(lu_site_wq_from_fid);
 
@@ -127,25 +151,17 @@ EXPORT_SYMBOL(lu_site_wq_from_fid);
 void lu_object_put(const struct lu_env *env, struct lu_object *o)
 {
        struct lu_site_bkt_data *bkt;
-       struct lu_object_header *top;
-       struct lu_site *site;
-       struct lu_object *orig;
-       struct cfs_hash_bd bd;
-       const struct lu_fid *fid;
-
-       top  = o->lo_header;
-       site = o->lo_dev->ld_site;
-       orig = o;
+       struct lu_object_header *top = o->lo_header;
+       struct lu_site *site = o->lo_dev->ld_site;
+       struct lu_object *orig = o;
+       const struct lu_fid *fid = lu_object_fid(o);
 
        /*
         * till we have full fids-on-OST implemented anonymous objects
         * are possible in OSP. such an object isn't listed in the site
         * so we should not remove it from the site.
         */
-       fid = lu_object_fid(o);
        if (fid_is_zero(fid)) {
-               LASSERT(top->loh_hash.next == NULL
-                       && top->loh_hash.pprev == NULL);
                LASSERT(list_empty(&top->loh_lru));
                if (!atomic_dec_and_test(&top->loh_ref))
                        return;
@@ -157,58 +173,74 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                return;
        }
 
-       cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
-       bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
+       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+       if (atomic_add_unless(&top->loh_ref, -1, 1)) {
+still_active:
+               /*
+                * At this point the object reference is dropped and lock is
+                * not taken, so lu_object should not be touched because it
+                * can be freed by concurrent thread.
+                *
+                * Somebody may be waiting for this, currently only used for
+                * cl_object, see cl_object_put_last().
+                */
+               wake_up(&bkt->lsb_waitq);
 
-       if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
-               if (lu_object_is_dying(top)) {
-                       /*
-                        * somebody may be waiting for this, currently only
-                        * used for cl_object, see cl_object_put_last().
-                        */
-                       wake_up_all(&bkt->lsb_marche_funebre);
-               }
                return;
        }
 
+       spin_lock(&bkt->lsb_waitq.lock);
+       if (!atomic_dec_and_test(&top->loh_ref)) {
+               spin_unlock(&bkt->lsb_waitq.lock);
+               goto still_active;
+       }
+
+       /*
+        * Refcount is zero, and cannot be incremented without taking the bkt
+        * lock, so object is stable.
+        */
+
        /*
-        * When last reference is released, iterate over object
-        * layers, and notify them that object is no longer busy.
+        * When last reference is released, iterate over object layers, and
+        * notify them that object is no longer busy.
         */
        list_for_each_entry_reverse(o, &top->loh_layers, lo_linkage) {
                if (o->lo_ops->loo_object_release != NULL)
                        o->lo_ops->loo_object_release(env, o);
        }
 
+       /*
+        * Don't use local 'is_dying' here because if was taken without lock but
+        * here we need the latest actual value of it so check lu_object
+        * directly here.
+        */
        if (!lu_object_is_dying(top) &&
            (lu_object_exists(orig) || lu_object_is_cl(orig))) {
                LASSERT(list_empty(&top->loh_lru));
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
+               spin_unlock(&bkt->lsb_waitq.lock);
                percpu_counter_inc(&site->ls_lru_len_counter);
-               CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n",
-                      orig, top, site->ls_obj_hash, bkt);
-               cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
+               CDEBUG(D_INODE, "Add %p/%p to site lru. bkt: %p\n",
+                      orig, top, bkt);
                return;
        }
 
        /*
-        * If object is dying (will not be cached) then remove it
-        * from hash table and LRU.
+        * If object is dying (will not be cached) then remove it from hash
+        * table (it is already not on the LRU).
         *
-        * This is done with hash table and LRU lists locked. As the only
-        * way to acquire first reference to previously unreferenced
-        * object is through hash-table lookup (lu_object_find()),
-        * or LRU scanning (lu_site_purge()), that are done under hash-table
-        * and LRU lock, no race with concurrent object lookup is possible
-        * and we can safely destroy object below.
+        * This is done with bucket lock held.  As the only way to acquire first
+        * reference to previously unreferenced object is through hash-table
+        * lookup (lu_object_find()) which takes the lock for first reference,
+        * no race with concurrent object lookup is possible and we can safely
+        * destroy object below.
         */
        if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
-               cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
-       cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
-       /*
-        * Object was already removed from hash and lru above, can
-        * kill it.
-        */
+               rhashtable_remove_fast(&site->ls_obj_hash, &top->loh_hash,
+                                      obj_hash_params);
+
+       spin_unlock(&bkt->lsb_waitq.lock);
+       /* Object was already removed from hash above, can kill it. */
        lu_object_free(env, orig);
 }
 EXPORT_SYMBOL(lu_object_put);
@@ -236,19 +268,19 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
        set_bit(LU_OBJECT_HEARD_BANSHEE, &top->loh_flags);
        if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags)) {
                struct lu_site *site = o->lo_dev->ld_site;
-               struct cfs_hash *obj_hash = site->ls_obj_hash;
-               struct cfs_hash_bd bd;
+               struct rhashtable *obj_hash = &site->ls_obj_hash;
+               struct lu_site_bkt_data *bkt;
 
-               cfs_hash_bd_get_and_lock(obj_hash, &top->loh_fid, &bd, 1);
+               bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+               spin_lock(&bkt->lsb_waitq.lock);
                if (!list_empty(&top->loh_lru)) {
-                       struct lu_site_bkt_data *bkt;
-
                        list_del_init(&top->loh_lru);
-                       bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
                        percpu_counter_dec(&site->ls_lru_len_counter);
                }
-               cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
-               cfs_hash_bd_unlock(obj_hash, &bd, 1);
+               spin_unlock(&bkt->lsb_waitq.lock);
+
+               rhashtable_remove_fast(obj_hash, &top->loh_hash,
+                                      obj_hash_params);
        }
 }
 EXPORT_SYMBOL(lu_object_unhash);
@@ -261,17 +293,9 @@ EXPORT_SYMBOL(lu_object_unhash);
  */
 static struct lu_object *lu_object_alloc(const struct lu_env *env,
                                         struct lu_device *dev,
-                                        const struct lu_fid *f,
-                                        const struct lu_object_conf *conf)
+                                        const struct lu_fid *f)
 {
-       struct lu_object *scan;
        struct lu_object *top;
-       struct list_head *layers;
-       unsigned int init_mask = 0;
-       unsigned int init_flag;
-       int clean;
-       int result;
-       ENTRY;
 
        /*
         * Create top-level object slice. This will also create
@@ -279,15 +303,36 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
         */
        top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
        if (top == NULL)
-               RETURN(ERR_PTR(-ENOMEM));
+               return ERR_PTR(-ENOMEM);
        if (IS_ERR(top))
-               RETURN(top);
-        /*
-         * This is the only place where object fid is assigned. It's constant
-         * after this point.
-         */
-        top->lo_header->loh_fid = *f;
-        layers = &top->lo_header->loh_layers;
+               return top;
+       /*
+        * This is the only place where object fid is assigned. It's constant
+        * after this point.
+        */
+       top->lo_header->loh_fid = *f;
+
+       return top;
+}
+
+/**
+ * Initialize object.
+ *
+ * This is called after object hash insertion to avoid returning an object with
+ * stale attributes.
+ */
+static int lu_object_start(const struct lu_env *env, struct lu_device *dev,
+                          struct lu_object *top,
+                          const struct lu_object_conf *conf)
+{
+       struct lu_object *scan;
+       struct list_head *layers;
+       unsigned int init_mask = 0;
+       unsigned int init_flag;
+       int clean;
+       int result;
+
+       layers = &top->lo_header->loh_layers;
 
        do {
                /*
@@ -302,10 +347,9 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
                        clean = 0;
                        scan->lo_header = top->lo_header;
                        result = scan->lo_ops->loo_object_init(env, scan, conf);
-                       if (result != 0) {
-                               lu_object_free(env, top);
-                               RETURN(ERR_PTR(result));
-                       }
+                       if (result)
+                               return result;
+
                        init_mask |= init_flag;
 next:
                        init_flag <<= 1;
@@ -313,17 +357,18 @@ next:
        } while (!clean);
 
        list_for_each_entry_reverse(scan, layers, lo_linkage) {
-                if (scan->lo_ops->loo_object_start != NULL) {
-                        result = scan->lo_ops->loo_object_start(env, scan);
-                        if (result != 0) {
-                                lu_object_free(env, top);
-                                RETURN(ERR_PTR(result));
-                        }
-                }
-        }
+               if (scan->lo_ops->loo_object_start != NULL) {
+                       result = scan->lo_ops->loo_object_start(env, scan);
+                       if (result)
+                               return result;
+               }
+       }
+
+       lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
 
-        lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
-        RETURN(top);
+       set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags);
+
+       return 0;
 }
 
 /**
@@ -332,10 +377,10 @@ next:
 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 {
        wait_queue_head_t *wq;
-       struct lu_site          *site;
-       struct lu_object        *scan;
-       struct list_head        *layers;
-       struct list_head         splice;
+       struct lu_site *site;
+       struct lu_object *scan;
+       struct list_head *layers;
+       LIST_HEAD(splice);
 
        site = o->lo_dev->ld_site;
        layers = &o->lo_header->loh_layers;
@@ -354,7 +399,6 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
          * necessary, because lu_object_header is freed together with the
          * top-level slice.
          */
-       INIT_LIST_HEAD(&splice);
        list_splice_init(layers, &splice);
        while (!list_empty(&splice)) {
                /*
@@ -362,7 +406,7 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
                 * lives as long as possible and ->loo_object_free() methods
                 * can look at its contents.
                 */
-               o = container_of0(splice.prev, struct lu_object, lo_linkage);
+               o = container_of(splice.prev, struct lu_object, lo_linkage);
                list_del_init(&o->lo_linkage);
                LASSERT(o->lo_ops->loo_object_free != NULL);
                o->lo_ops->loo_object_free(env, o);
@@ -380,95 +424,89 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                          int nr, int canblock)
 {
-        struct lu_object_header *h;
-        struct lu_object_header *temp;
-        struct lu_site_bkt_data *bkt;
-       struct cfs_hash_bd            bd;
-       struct cfs_hash_bd            bd2;
-       struct list_head         dispose;
+       struct lu_object_header *h;
+       struct lu_object_header *temp;
+       struct lu_site_bkt_data *bkt;
+       LIST_HEAD(dispose);
        int                      did_sth;
        unsigned int             start = 0;
-        int                      count;
-        int                      bnr;
+       int                      count;
+       int                      bnr;
        unsigned int             i;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
                RETURN(0);
 
-       INIT_LIST_HEAD(&dispose);
-        /*
-         * Under LRU list lock, scan LRU list and move unreferenced objects to
-         * the dispose list, removing them from LRU and hash table.
-         */
+       /*
+        * Under LRU list lock, scan LRU list and move unreferenced objects to
+        * the dispose list, removing them from LRU and hash table.
+        */
        if (nr != ~0)
                start = s->ls_purge_start;
-       bnr = (nr == ~0) ? -1 : nr / (int)CFS_HASH_NBKT(s->ls_obj_hash) + 1;
- again:
+       bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1;
+again:
        /*
         * It doesn't make any sense to make purge threads parallel, that can
-        * only bring troubles to us. See LU-5331.
+        * only bring troubles to us.  See LU-5331.
         */
        if (canblock != 0)
                mutex_lock(&s->ls_purge_mutex);
        else if (mutex_trylock(&s->ls_purge_mutex) == 0)
                goto out;
 
-        did_sth = 0;
-        cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
-                if (i < start)
-                        continue;
-                count = bnr;
-                cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
-                bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
+       did_sth = 0;
+       for (i = start; i < s->ls_bkt_cnt ; i++) {
+               count = bnr;
+               bkt = &s->ls_bkts[i];
+               spin_lock(&bkt->lsb_waitq.lock);
 
                list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
                        LASSERT(atomic_read(&h->loh_ref) == 0);
 
-                        cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
-                        LASSERT(bd.bd_bucket == bd2.bd_bucket);
+                       LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i);
 
-                        cfs_hash_bd_del_locked(s->ls_obj_hash,
-                                               &bd2, &h->loh_hash);
+                       set_bit(LU_OBJECT_UNHASHED, &h->loh_flags);
+                       rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
+                                              obj_hash_params);
                        list_move(&h->loh_lru, &dispose);
                        percpu_counter_dec(&s->ls_lru_len_counter);
-                        if (did_sth == 0)
-                                did_sth = 1;
+                       if (did_sth == 0)
+                               did_sth = 1;
 
-                        if (nr != ~0 && --nr == 0)
-                                break;
+                       if (nr != ~0 && --nr == 0)
+                               break;
 
-                        if (count > 0 && --count == 0)
-                                break;
+                       if (count > 0 && --count == 0)
+                               break;
 
                }
-               cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
+               spin_unlock(&bkt->lsb_waitq.lock);
                cond_resched();
                /*
                 * Free everything on the dispose list. This is safe against
                 * races due to the reasons described in lu_object_put().
                 */
-               while (!list_empty(&dispose)) {
-                       h = container_of0(dispose.next,
-                                         struct lu_object_header, loh_lru);
+               while ((h = list_first_entry_or_null(&dispose,
+                                                    struct lu_object_header,
+                                                    loh_lru)) != NULL) {
                        list_del_init(&h->loh_lru);
                        lu_object_free(env, lu_object_top(h));
                        lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
                }
 
-                if (nr == 0)
-                        break;
-        }
+               if (nr == 0)
+                       break;
+       }
        mutex_unlock(&s->ls_purge_mutex);
 
-        if (nr != 0 && did_sth && start != 0) {
-                start = 0; /* restart from the first bucket */
-                goto again;
-        }
-        /* race on s->ls_purge_start, but nobody cares */
-        s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
-
+       if (nr != 0 && did_sth && start != 0) {
+               start = 0; /* restart from the first bucket */
+               goto again;
+       }
+       /* race on s->ls_purge_start, but nobody cares */
+       s->ls_purge_start = i & (s->ls_bkt_cnt - 1);
 out:
-        return nr;
+       return nr;
 }
 EXPORT_SYMBOL(lu_site_purge_objects);
 
@@ -562,9 +600,9 @@ void lu_object_header_print(const struct lu_env *env, void *cookie,
        (*printer)(env, cookie, "header@%p[%#lx, %d, "DFID"%s%s%s]",
                   hdr, hdr->loh_flags, atomic_read(&hdr->loh_ref),
                   PFID(&hdr->loh_fid),
-                  hlist_unhashed(&hdr->loh_hash) ? "" : " hash",
-                  list_empty((struct list_head *)&hdr->loh_lru) ? \
-                  "" : " lru",
+                  test_bit(LU_OBJECT_UNHASHED,
+                           &hdr->loh_flags) ? "" : " hash",
+                  list_empty(&hdr->loh_lru) ? "" : " lru",
                   hdr->loh_attr & LOHA_EXISTS ? " exist" : "");
 }
 EXPORT_SYMBOL(lu_object_header_print);
@@ -616,36 +654,97 @@ int lu_object_invariant(const struct lu_object *o)
         return 1;
 }
 
-static struct lu_object *htable_lookup(struct lu_site *s,
-                                      struct cfs_hash_bd *bd,
+/*
+ * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because the
+ * calculation for the number of objects to reclaim is not covered by a lock the
+ * maximum number of objects is capped by LU_CACHE_MAX_ADJUST.  This ensures
+ * that many concurrent threads will not accidentally purge the entire cache.
+ */
+static void lu_object_limit(const struct lu_env *env,
+                           struct lu_device *dev)
+{
+       u64 size, nr;
+
+       if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
+               return;
+
+       size = atomic_read(&dev->ld_site->ls_obj_hash.nelems);
+       nr = (u64)lu_cache_nr;
+       if (size <= nr)
+               return;
+
+       lu_site_purge_objects(env, dev->ld_site,
+                             min_t(u64, size - nr, LU_CACHE_NR_MAX_ADJUST),
+                             0);
+}
+
+static struct lu_object *htable_lookup(const struct lu_env *env,
+                                      struct lu_device *dev,
+                                      struct lu_site_bkt_data *bkt,
                                       const struct lu_fid *f,
-                                      __u64 *version)
+                                      struct lu_object_header *new)
 {
-       struct lu_site_bkt_data *bkt;
+       struct lu_site *s = dev->ld_site;
        struct lu_object_header *h;
-       struct hlist_node *hnode;
-       __u64 ver = cfs_hash_bd_version_get(bd);
 
-       if (*version == ver)
+try_again:
+       rcu_read_lock();
+       if (new)
+               h = rhashtable_lookup_get_insert_fast(&s->ls_obj_hash,
+                                                     &new->loh_hash,
+                                                     obj_hash_params);
+       else
+               h = rhashtable_lookup(&s->ls_obj_hash, f, obj_hash_params);
+
+       if (IS_ERR_OR_NULL(h)) {
+               /* Not found */
+               if (!new)
+                       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
+               rcu_read_unlock();
+               if (PTR_ERR(h) == -ENOMEM) {
+                       msleep(20);
+                       goto try_again;
+               }
+               lu_object_limit(env, dev);
+               if (PTR_ERR(h) == -E2BIG)
+                       goto try_again;
+
                return ERR_PTR(-ENOENT);
+       }
+
+       if (atomic_inc_not_zero(&h->loh_ref)) {
+               rcu_read_unlock();
+               return lu_object_top(h);
+       }
 
-       *version = ver;
-       bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
-       /* cfs_hash_bd_peek_locked is a somehow "internal" function
-        * of cfs_hash, it doesn't add refcount on object. */
-       hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
-       if (!hnode) {
+       spin_lock(&bkt->lsb_waitq.lock);
+       if (lu_object_is_dying(h) ||
+           test_bit(LU_OBJECT_UNHASHED, &h->loh_flags)) {
+               spin_unlock(&bkt->lsb_waitq.lock);
+               rcu_read_unlock();
+               if (new) {
+                       /*
+                        * Old object might have already been removed, or will
+                        * be soon.  We need to insert our new object, so
+                        * remove the old one just in case it is still there.
+                        */
+                       rhashtable_remove_fast(&s->ls_obj_hash, &h->loh_hash,
+                                              obj_hash_params);
+                       goto try_again;
+               }
                lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
                return ERR_PTR(-ENOENT);
        }
+       /* Now protected by spinlock */
+       rcu_read_unlock();
 
-       h = container_of0(hnode, struct lu_object_header, loh_hash);
-       cfs_hash_get(s->ls_obj_hash, hnode);
-       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        if (!list_empty(&h->loh_lru)) {
                list_del_init(&h->loh_lru);
                percpu_counter_dec(&s->ls_lru_len_counter);
        }
+       atomic_inc(&h->loh_ref);
+       spin_unlock(&bkt->lsb_waitq.lock);
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        return lu_object_top(h);
 }
 
@@ -663,28 +762,37 @@ struct lu_object *lu_object_find(const struct lu_env *env,
 EXPORT_SYMBOL(lu_object_find);
 
 /*
- * Limit the lu_object cache to a maximum of lu_cache_nr objects.  Because
- * the calculation for the number of objects to reclaim is not covered by
- * a lock the maximum number of objects is capped by LU_CACHE_MAX_ADJUST.
- * This ensures that many concurrent threads will not accidentally purge
- * the entire cache.
+ * Get a 'first' reference to an object that was found while looking through the
+ * hash table.
  */
-static void lu_object_limit(const struct lu_env *env,
-                           struct lu_device *dev)
+struct lu_object *lu_object_get_first(struct lu_object_header *h,
+                                     struct lu_device *dev)
 {
-       __u64 size, nr;
+       struct lu_site *s = dev->ld_site;
+       struct lu_object *ret;
 
-       if (lu_cache_nr == LU_CACHE_NR_UNLIMITED)
-               return;
+       if (IS_ERR_OR_NULL(h) || lu_object_is_dying(h))
+               return NULL;
 
-       size = cfs_hash_size_get(dev->ld_site->ls_obj_hash);
-       nr = (__u64)lu_cache_nr;
-       if (size <= nr)
-               return;
+       ret = lu_object_locate(h, dev->ld_type);
+       if (!ret)
+               return ret;
 
-       lu_site_purge_objects(env, dev->ld_site,
-                             MIN(size - nr, LU_CACHE_NR_MAX_ADJUST), 0);
+       if (!atomic_inc_not_zero(&h->loh_ref)) {
+               struct lu_site_bkt_data *bkt;
+
+               bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
+               spin_lock(&bkt->lsb_waitq.lock);
+               if (!lu_object_is_dying(h) &&
+                   !test_bit(LU_OBJECT_UNHASHED, &h->loh_flags))
+                       atomic_inc(&h->loh_ref);
+               else
+                       ret = NULL;
+               spin_unlock(&bkt->lsb_waitq.lock);
+       }
+       return ret;
 }
+EXPORT_SYMBOL(lu_object_get_first);
 
 /**
  * Core logic of lu_object_find*() functions.
@@ -701,9 +809,11 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        struct lu_object *o;
        struct lu_object *shadow;
        struct lu_site *s;
-       struct cfs_hash *hs;
-       struct cfs_hash_bd bd;
-       __u64 version = 0;
+       struct lu_site_bkt_data *bkt;
+       struct rhashtable *hs;
+       int rc;
+
+       ENTRY;
 
        /*
         * This uses standard index maintenance protocol:
@@ -724,45 +834,98 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
         *
         */
        s  = dev->ld_site;
-       hs = s->ls_obj_hash;
-       cfs_hash_bd_get(hs, f, &bd);
+       hs = &s->ls_obj_hash;
+
+       if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
+               lu_site_purge(env, s, -1);
+
+       bkt = &s->ls_bkts[lu_bkt_hash(s, f)];
        if (!(conf && conf->loc_flags & LOC_F_NEW)) {
-               cfs_hash_bd_lock(hs, &bd, 1);
-               o = htable_lookup(s, &bd, f, &version);
-               cfs_hash_bd_unlock(hs, &bd, 1);
+               o = htable_lookup(env, dev, bkt, f, NULL);
 
-               if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
-                       return o;
+               if (!IS_ERR(o)) {
+                       if (likely(lu_object_is_inited(o->lo_header)))
+                               RETURN(o);
+
+                       wait_event_idle(bkt->lsb_waitq,
+                                       lu_object_is_inited(o->lo_header) ||
+                                       lu_object_is_dying(o->lo_header));
+
+                       if (lu_object_is_dying(o->lo_header)) {
+                               lu_object_put(env, o);
+
+                               RETURN(ERR_PTR(-ENOENT));
+                       }
+
+                       RETURN(o);
+               }
+
+               if (PTR_ERR(o) != -ENOENT)
+                       RETURN(o);
        }
+
        /*
-        * Allocate new object. This may result in rather complicated
-        * operations, including fld queries, inode loading, etc.
+        * Allocate new object, NB, object is unitialized in case object
+        * is changed between allocation and hash insertion, thus the object
+        * with stale attributes is returned.
         */
-       o = lu_object_alloc(env, dev, f, conf);
+       o = lu_object_alloc(env, dev, f);
        if (IS_ERR(o))
-               return o;
+               RETURN(o);
 
        LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
-       cfs_hash_bd_lock(hs, &bd, 1);
+       CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
 
-       if (conf && conf->loc_flags & LOC_F_NEW)
-               shadow = ERR_PTR(-ENOENT);
-       else
-               shadow = htable_lookup(s, &bd, f, &version);
+       if (conf && conf->loc_flags & LOC_F_NEW) {
+               int status = rhashtable_insert_fast(hs, &o->lo_header->loh_hash,
+                                                   obj_hash_params);
+               if (status)
+                       /* Strange error - go the slow way */
+                       shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
+               else
+                       shadow = ERR_PTR(-ENOENT);
+       } else {
+               shadow = htable_lookup(env, dev, bkt, f, o->lo_header);
+       }
        if (likely(PTR_ERR(shadow) == -ENOENT)) {
-               cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-               cfs_hash_bd_unlock(hs, &bd, 1);
+               /*
+                * The new object has been successfully inserted.
+                *
+                * This may result in rather complicated operations, including
+                * fld queries, inode loading, etc.
+                */
+               rc = lu_object_start(env, dev, o, conf);
+               if (rc) {
+                       lu_object_put_nocache(env, o);
+                       RETURN(ERR_PTR(rc));
+               }
+
+               wake_up(&bkt->lsb_waitq);
 
                lu_object_limit(env, dev);
 
-               return o;
+               RETURN(o);
        }
 
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
-       cfs_hash_bd_unlock(hs, &bd, 1);
        lu_object_free(env, o);
-       return shadow;
+
+       if (!(conf && conf->loc_flags & LOC_F_NEW) &&
+           !IS_ERR(shadow) &&
+           !lu_object_is_inited(shadow->lo_header)) {
+               wait_event_idle(bkt->lsb_waitq,
+                               lu_object_is_inited(shadow->lo_header) ||
+                               lu_object_is_dying(shadow->lo_header));
+
+               if (lu_object_is_dying(shadow->lo_header)) {
+                       lu_object_put(env, shadow);
+
+                       RETURN(ERR_PTR(-ENOENT));
+               }
+       }
+
+       RETURN(shadow);
 }
 EXPORT_SYMBOL(lu_object_find_at);
 
@@ -827,14 +990,9 @@ struct lu_site_print_arg {
         lu_printer_t     lsp_printer;
 };
 
-static int
-lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
-                 struct hlist_node *hnode, void *data)
+static void
+lu_site_obj_print(struct lu_object_header *h, struct lu_site_print_arg *arg)
 {
-       struct lu_site_print_arg *arg = (struct lu_site_print_arg *)data;
-       struct lu_object_header  *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
        if (!list_empty(&h->loh_layers)) {
                const struct lu_object *o;
 
@@ -845,33 +1003,45 @@ lu_site_obj_print(struct cfs_hash *hs, struct cfs_hash_bd *bd,
                lu_object_header_print(arg->lsp_env, arg->lsp_cookie,
                                       arg->lsp_printer, h);
        }
-       return 0;
 }
 
 /**
  * Print all objects in \a s.
  */
-void lu_site_print(const struct lu_env *env, struct lu_site *s, void *cookie,
-                   lu_printer_t printer)
+void lu_site_print(const struct lu_env *env, struct lu_site *s, atomic_t *ref,
+                  int msg_flag, lu_printer_t printer)
 {
-        struct lu_site_print_arg arg = {
-                .lsp_env     = (struct lu_env *)env,
-                .lsp_cookie  = cookie,
-                .lsp_printer = printer,
-        };
+       struct lu_site_print_arg arg = {
+               .lsp_env     = (struct lu_env *)env,
+               .lsp_printer = printer,
+       };
+       struct rhashtable_iter iter;
+       struct lu_object_header *h;
+       LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, msg_flag, NULL);
 
-        cfs_hash_for_each(s->ls_obj_hash, lu_site_obj_print, &arg);
+       if (!s || !atomic_read(ref))
+               return;
+
+       arg.lsp_cookie = (void *)&msgdata;
+
+       rhashtable_walk_enter(&s->ls_obj_hash, &iter);
+       rhashtable_walk_start(&iter);
+       while ((h = rhashtable_walk_next(&iter)) != NULL) {
+               if (IS_ERR(h))
+                       continue;
+               lu_site_obj_print(h, &arg);
+       }
+       rhashtable_walk_stop(&iter);
+       rhashtable_walk_exit(&iter);
 }
 EXPORT_SYMBOL(lu_site_print);
 
 /**
  * Return desired hash table order.
  */
-static unsigned long lu_htable_order(struct lu_device *top)
+static void lu_htable_limits(struct lu_device *top)
 {
        unsigned long cache_size;
-       unsigned long bits;
-       unsigned long bits_max = LU_SITE_BITS_MAX;
 
        /*
         * For ZFS based OSDs the cache should be disabled by default.  This
@@ -880,110 +1050,40 @@ static unsigned long lu_htable_order(struct lu_device *top)
         * always stay cached it must maintain a hold on them.
         */
        if (strcmp(top->ld_type->ldt_name, LUSTRE_OSD_ZFS_NAME) == 0) {
-               lu_cache_percent = 1;
                lu_cache_nr = LU_CACHE_NR_ZFS_LIMIT;
-               return LU_SITE_BITS_MIN;
+               return;
        }
 
-       if (strcmp(top->ld_type->ldt_name, LUSTRE_VVP_NAME) == 0)
-               bits_max = LU_SITE_BITS_MAX_CL;
-
-        /*
-         * Calculate hash table size, assuming that we want reasonable
-         * performance when 20% of total memory is occupied by cache of
-         * lu_objects.
-         *
-         * Size of lu_object is (arbitrary) taken as 1K (together with inode).
-         */
-       cache_size = totalram_pages;
+       /*
+        * Calculate hash table size, assuming that we want reasonable
+        * performance when 20% of total memory is occupied by cache of
+        * lu_objects.
+        *
+        * Size of lu_object is (arbitrary) taken as 1K (together with inode).
+        */
+       cache_size = cfs_totalram_pages();
 
 #if BITS_PER_LONG == 32
-        /* limit hashtable size for lowmem systems to low RAM */
+       /* limit hashtable size for lowmem systems to low RAM */
        if (cache_size > 1 << (30 - PAGE_SHIFT))
                cache_size = 1 << (30 - PAGE_SHIFT) * 3 / 4;
 #endif
 
-        /* clear off unreasonable cache setting. */
-        if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
-                CWARN("obdclass: invalid lu_cache_percent: %u, it must be in"
-                      " the range of (0, %u]. Will use default value: %u.\n",
-                      lu_cache_percent, LU_CACHE_PERCENT_MAX,
-                      LU_CACHE_PERCENT_DEFAULT);
+       /* clear off unreasonable cache setting. */
+       if (lu_cache_percent == 0 || lu_cache_percent > LU_CACHE_PERCENT_MAX) {
+               CWARN("obdclass: invalid lu_cache_percent: %u, it must be in the range of (0, %u]. Will use default value: %u.\n",
+                     lu_cache_percent, LU_CACHE_PERCENT_MAX,
+                     LU_CACHE_PERCENT_DEFAULT);
 
-                lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
-        }
-        cache_size = cache_size / 100 * lu_cache_percent *
+               lu_cache_percent = LU_CACHE_PERCENT_DEFAULT;
+       }
+       cache_size = cache_size / 100 * lu_cache_percent *
                (PAGE_SIZE / 1024);
 
-        for (bits = 1; (1 << bits) < cache_size; ++bits) {
-                ;
-        }
-
-       return clamp_t(typeof(bits), bits, LU_SITE_BITS_MIN, bits_max);
-}
-
-static unsigned lu_obj_hop_hash(struct cfs_hash *hs,
-                               const void *key, unsigned mask)
-{
-       struct lu_fid  *fid = (struct lu_fid *)key;
-       __u32           hash;
-
-       hash = fid_flatten32(fid);
-       hash += (hash >> 4) + (hash << 12); /* mixing oid and seq */
-       hash = hash_long(hash, hs->hs_bkt_bits);
-
-       /* give me another random factor */
-       hash -= hash_long((unsigned long)hs, fid_oid(fid) % 11 + 3);
-
-       hash <<= hs->hs_cur_bits - hs->hs_bkt_bits;
-       hash |= (fid_seq(fid) + fid_oid(fid)) & (CFS_HASH_NBKT(hs) - 1);
-
-       return hash & mask;
-}
-
-static void *lu_obj_hop_object(struct hlist_node *hnode)
-{
-       return hlist_entry(hnode, struct lu_object_header, loh_hash);
-}
-
-static void *lu_obj_hop_key(struct hlist_node *hnode)
-{
-       struct lu_object_header *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       return &h->loh_fid;
-}
-
-static int lu_obj_hop_keycmp(const void *key, struct hlist_node *hnode)
-{
-       struct lu_object_header *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       return lu_fid_eq(&h->loh_fid, (struct lu_fid *)key);
-}
-
-static void lu_obj_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-       struct lu_object_header *h;
-
-       h = hlist_entry(hnode, struct lu_object_header, loh_hash);
-       atomic_inc(&h->loh_ref);
-}
-
-static void lu_obj_hop_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-        LBUG(); /* we should never called it */
+       lu_cache_nr = clamp_t(typeof(cache_size), cache_size,
+                             LU_CACHE_NR_MIN, LU_CACHE_NR_MAX);
 }
 
-static struct cfs_hash_ops lu_site_hash_ops = {
-        .hs_hash        = lu_obj_hop_hash,
-        .hs_key         = lu_obj_hop_key,
-        .hs_keycmp      = lu_obj_hop_keycmp,
-        .hs_object      = lu_obj_hop_object,
-        .hs_get         = lu_obj_hop_get,
-        .hs_put_locked  = lu_obj_hop_put_locked,
-};
-
 void lu_dev_add_linkage(struct lu_site *s, struct lu_device *d)
 {
        spin_lock(&s->ls_ld_lock);
@@ -1007,15 +1107,13 @@ EXPORT_SYMBOL(lu_dev_del_linkage);
 int lu_site_init(struct lu_site *s, struct lu_device *top)
 {
        struct lu_site_bkt_data *bkt;
-       struct cfs_hash_bd bd;
-       char name[16];
-       unsigned long bits;
        unsigned int i;
        int rc;
        ENTRY;
 
        memset(s, 0, sizeof *s);
        mutex_init(&s->ls_purge_mutex);
+       lu_htable_limits(top);
 
 #ifdef HAVE_PERCPU_COUNTER_INIT_GFP_FLAG
        rc = percpu_counter_init(&s->ls_lru_len_counter, 0, GFP_NOFS);
@@ -1025,39 +1123,35 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
        if (rc)
                return -ENOMEM;
 
-       snprintf(name, sizeof(name), "lu_site_%s", top->ld_type->ldt_name);
-       for (bits = lu_htable_order(top);
-            bits >= LU_SITE_BITS_MIN; bits--) {
-               s->ls_obj_hash = cfs_hash_create(name, bits, bits,
-                                                bits - LU_SITE_BKT_BITS,
-                                                sizeof(*bkt), 0, 0,
-                                                &lu_site_hash_ops,
-                                                CFS_HASH_SPIN_BKTLOCK |
-                                                CFS_HASH_NO_ITEMREF |
-                                                CFS_HASH_DEPTH |
-                                                CFS_HASH_ASSERT_EMPTY |
-                                                CFS_HASH_COUNTER);
-               if (s->ls_obj_hash != NULL)
-                       break;
+       if (rhashtable_init(&s->ls_obj_hash, &obj_hash_params) != 0) {
+               CERROR("failed to create lu_site hash\n");
+               return -ENOMEM;
        }
 
-       if (s->ls_obj_hash == NULL) {
-               CERROR("failed to create lu_site hash with bits: %lu\n", bits);
+       s->ls_bkt_seed = prandom_u32();
+       s->ls_bkt_cnt = max_t(long, 1 << LU_SITE_BKT_BITS,
+                             2 * num_possible_cpus());
+       s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt);
+       OBD_ALLOC_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
+       if (!s->ls_bkts) {
+               rhashtable_destroy(&s->ls_obj_hash);
+               s->ls_bkts = NULL;
                return -ENOMEM;
        }
 
-       cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
-               bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
+       for (i = 0; i < s->ls_bkt_cnt; i++) {
+               bkt = &s->ls_bkts[i];
                INIT_LIST_HEAD(&bkt->lsb_lru);
-               init_waitqueue_head(&bkt->lsb_marche_funebre);
+               init_waitqueue_head(&bkt->lsb_waitq);
        }
 
-        s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
-        if (s->ls_stats == NULL) {
-                cfs_hash_putref(s->ls_obj_hash);
-                s->ls_obj_hash = NULL;
-                return -ENOMEM;
-        }
+       s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
+       if (s->ls_stats == NULL) {
+               OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
+               s->ls_bkts = NULL;
+               rhashtable_destroy(&s->ls_obj_hash);
+               return -ENOMEM;
+       }
 
         lprocfs_counter_init(s->ls_stats, LU_SS_CREATED,
                              0, "created", "created");
@@ -1098,10 +1192,11 @@ void lu_site_fini(struct lu_site *s)
 
        percpu_counter_destroy(&s->ls_lru_len_counter);
 
-        if (s->ls_obj_hash != NULL) {
-                cfs_hash_putref(s->ls_obj_hash);
-                s->ls_obj_hash = NULL;
-        }
+       if (s->ls_bkts) {
+               rhashtable_destroy(&s->ls_obj_hash);
+               OBD_FREE_PTR_ARRAY_LARGE(s->ls_bkts, s->ls_bkt_cnt);
+               s->ls_bkts = NULL;
+       }
 
         if (s->ls_top_dev != NULL) {
                 s->ls_top_dev->ld_site = NULL;
@@ -1149,14 +1244,25 @@ void lu_device_put(struct lu_device *d)
 }
 EXPORT_SYMBOL(lu_device_put);
 
+enum { /* Maximal number of tld slots. */
+       LU_CONTEXT_KEY_NR = 40
+};
+static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
+static DECLARE_RWSEM(lu_key_initing);
+
 /**
  * Initialize device \a d of type \a t.
  */
 int lu_device_init(struct lu_device *d, struct lu_device_type *t)
 {
-       if (atomic_inc_return(&t->ldt_device_nr) == 1 &&
-           t->ldt_ops->ldto_start != NULL)
-               t->ldt_ops->ldto_start(t);
+       if (atomic_add_unless(&t->ldt_device_nr, 1, 0) == 0) {
+               down_write(&lu_key_initing);
+               if (t->ldt_ops->ldto_start &&
+                   atomic_read(&t->ldt_device_nr) == 0)
+                       t->ldt_ops->ldto_start(t);
+               atomic_inc(&t->ldt_device_nr);
+               up_write(&lu_key_initing);
+       }
 
        memset(d, 0, sizeof *d);
        d->ld_type = t;
@@ -1257,7 +1363,6 @@ int lu_object_header_init(struct lu_object_header *h)
 {
         memset(h, 0, sizeof *h);
        atomic_set(&h->loh_ref, 1);
-       INIT_HLIST_NODE(&h->loh_hash);
        INIT_LIST_HEAD(&h->loh_lru);
        INIT_LIST_HEAD(&h->loh_layers);
         lu_ref_init(&h->loh_reference);
@@ -1272,7 +1377,6 @@ void lu_object_header_fini(struct lu_object_header *h)
 {
        LASSERT(list_empty(&h->loh_layers));
        LASSERT(list_empty(&h->loh_lru));
-       LASSERT(hlist_unhashed(&h->loh_hash));
         lu_ref_fini(&h->loh_reference);
 }
 EXPORT_SYMBOL(lu_object_header_fini);
@@ -1319,29 +1423,11 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
 
         for (scan = top; scan != NULL; scan = next) {
                 const struct lu_device_type *ldt = scan->ld_type;
-                struct obd_type             *type;
 
                 next = ldt->ldt_ops->ldto_device_free(env, scan);
-                type = ldt->ldt_obd_type;
-                if (type != NULL) {
-                        type->typ_refcnt--;
-                        class_put_type(type);
-                }
         }
 }
 
-enum {
-        /**
-         * Maximal number of tld slots.
-         */
-        LU_CONTEXT_KEY_NR = 40
-};
-
-static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
-
-DEFINE_RWLOCK(lu_keys_guard);
-static DECLARE_RWSEM(lu_key_initing);
-
 /**
  * Global counter incremented whenever key is registered, unregistered,
  * revived or quiesced. This is used to void unnecessary calls to
@@ -1364,19 +1450,27 @@ int lu_context_key_register(struct lu_context_key *key)
         LASSERT(key->lct_owner != NULL);
 
         result = -ENFILE;
-       write_lock(&lu_keys_guard);
+       atomic_set(&key->lct_used, 1);
+       lu_ref_init(&key->lct_reference);
         for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                if (lu_keys[i] == NULL) {
-                        key->lct_index = i;
-                       atomic_set(&key->lct_used, 1);
-                        lu_keys[i] = key;
-                        lu_ref_init(&key->lct_reference);
-                        result = 0;
-                       atomic_inc(&key_set_version);
-                        break;
-                }
+               if (lu_keys[i])
+                       continue;
+               key->lct_index = i;
+
+               if (strncmp("osd_", module_name(key->lct_owner), 4) == 0)
+                       CFS_RACE_WAIT(OBD_FAIL_OBD_SETUP);
+
+               if (cmpxchg(&lu_keys[i], NULL, key) != NULL)
+                       continue;
+
+               result = 0;
+               atomic_inc(&key_set_version);
+               break;
         }
-       write_unlock(&lu_keys_guard);
+       if (result) {
+               lu_ref_fini(&key->lct_reference);
+               atomic_set(&key->lct_used, 0);
+       }
        return result;
 }
 EXPORT_SYMBOL(lu_context_key_register);
@@ -1389,11 +1483,12 @@ static void key_fini(struct lu_context *ctx, int index)
                 key = lu_keys[index];
                 LASSERT(key != NULL);
                 LASSERT(key->lct_fini != NULL);
-               LASSERT(atomic_read(&key->lct_used) > 1);
+               LASSERT(atomic_read(&key->lct_used) > 0);
 
                 key->lct_fini(ctx, key, ctx->lc_value[index]);
                 lu_ref_del(&key->lct_reference, "ctx", ctx);
-               atomic_dec(&key->lct_used);
+               if (atomic_dec_and_test(&key->lct_used))
+                       wake_up_var(&key->lct_used);
 
                LASSERT(key->lct_owner != NULL);
                if ((ctx->lc_tags & LCT_NOREF) == 0) {
@@ -1412,32 +1507,21 @@ void lu_context_key_degister(struct lu_context_key *key)
        LASSERT(atomic_read(&key->lct_used) >= 1);
        LINVRNT(0 <= key->lct_index && key->lct_index < ARRAY_SIZE(lu_keys));
 
-       lu_context_key_quiesce(key);
+       lu_context_key_quiesce(NULL, key);
 
-       write_lock(&lu_keys_guard);
        key_fini(&lu_shrink_env.le_ctx, key->lct_index);
 
        /**
         * Wait until all transient contexts referencing this key have
         * run lu_context_key::lct_fini() method.
         */
-       while (atomic_read(&key->lct_used) > 1) {
-               write_unlock(&lu_keys_guard);
-               CDEBUG(D_INFO, "lu_context_key_degister: \"%s\" %p, %d\n",
-                      key->lct_owner ? key->lct_owner->name : "", key,
-                      atomic_read(&key->lct_used));
-               schedule();
-               write_lock(&lu_keys_guard);
-       }
-       if (lu_keys[key->lct_index]) {
-               lu_keys[key->lct_index] = NULL;
+       atomic_dec(&key->lct_used);
+       wait_var_event(&key->lct_used, atomic_read(&key->lct_used) == 0);
+
+       if (!WARN_ON(lu_keys[key->lct_index] == NULL))
                lu_ref_fini(&key->lct_reference);
-       }
-       write_unlock(&lu_keys_guard);
 
-       LASSERTF(atomic_read(&key->lct_used) == 1,
-                "key has instances: %d\n",
-                atomic_read(&key->lct_used));
+       smp_store_release(&lu_keys[key->lct_index], NULL);
 }
 EXPORT_SYMBOL(lu_context_key_degister);
 
@@ -1509,16 +1593,17 @@ EXPORT_SYMBOL(lu_context_key_revive_many);
 /**
  * Quiescent a number of keys.
  */
-void lu_context_key_quiesce_many(struct lu_context_key *k, ...)
+void lu_context_key_quiesce_many(struct lu_device_type *t,
+                                struct lu_context_key *k, ...)
 {
-        va_list args;
+       va_list args;
 
-        va_start(args, k);
-        do {
-                lu_context_key_quiesce(k);
-                k = va_arg(args, struct lu_context_key*);
-        } while (k != NULL);
-        va_end(args);
+       va_start(args, k);
+       do {
+               lu_context_key_quiesce(t, k);
+               k = va_arg(args, struct lu_context_key*);
+       } while (k != NULL);
+       va_end(args);
 }
 EXPORT_SYMBOL(lu_context_key_quiesce_many);
 
@@ -1539,31 +1624,41 @@ EXPORT_SYMBOL(lu_context_key_get);
  * List of remembered contexts. XXX document me.
  */
 static LIST_HEAD(lu_context_remembered);
+static DEFINE_SPINLOCK(lu_context_remembered_guard);
 
 /**
  * Destroy \a key in all remembered contexts. This is used to destroy key
  * values in "shared" contexts (like service threads), when a module owning
  * the key is about to be unloaded.
  */
-void lu_context_key_quiesce(struct lu_context_key *key)
+void lu_context_key_quiesce(struct lu_device_type *t,
+                           struct lu_context_key *key)
 {
        struct lu_context *ctx;
 
+       if (key->lct_tags & LCT_QUIESCENT)
+               return;
+       /*
+        * The write-lock on lu_key_initing will ensure that any
+        * keys_fill() which didn't see LCT_QUIESCENT will have
+        * finished before we call key_fini().
+        */
+       down_write(&lu_key_initing);
        if (!(key->lct_tags & LCT_QUIESCENT)) {
-                /*
-                * The write-lock on lu_key_initing will ensure that any
-                * keys_fill() which didn't see LCT_QUIESCENT will have
-                * finished before we call key_fini().
-                 */
-               down_write(&lu_key_initing);
-               key->lct_tags |= LCT_QUIESCENT;
+               if (t == NULL || atomic_read(&t->ldt_device_nr) == 0)
+                       key->lct_tags |= LCT_QUIESCENT;
                up_write(&lu_key_initing);
 
-               write_lock(&lu_keys_guard);
-               list_for_each_entry(ctx, &lu_context_remembered, lc_remember)
+               spin_lock(&lu_context_remembered_guard);
+               list_for_each_entry(ctx, &lu_context_remembered, lc_remember) {
+                       spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING);
                        key_fini(ctx, key->lct_index);
-               write_unlock(&lu_keys_guard);
+               }
+               spin_unlock(&lu_context_remembered_guard);
+
+               return;
        }
+       up_write(&lu_key_initing);
 }
 
 void lu_context_key_revive(struct lu_context_key *key)
@@ -1582,7 +1677,7 @@ static void keys_fini(struct lu_context *ctx)
        for (i = 0; i < ARRAY_SIZE(lu_keys); ++i)
                key_fini(ctx, i);
 
-       OBD_FREE(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
+       OBD_FREE_PTR_ARRAY(ctx->lc_value, ARRAY_SIZE(lu_keys));
        ctx->lc_value = NULL;
 }
 
@@ -1650,7 +1745,7 @@ static int keys_fill(struct lu_context *ctx)
 
 static int keys_init(struct lu_context *ctx)
 {
-       OBD_ALLOC(ctx->lc_value, ARRAY_SIZE(lu_keys) * sizeof ctx->lc_value[0]);
+       OBD_ALLOC_PTR_ARRAY(ctx->lc_value, ARRAY_SIZE(lu_keys));
        if (likely(ctx->lc_value != NULL))
                return keys_fill(ctx);
 
@@ -1668,9 +1763,9 @@ int lu_context_init(struct lu_context *ctx, __u32 tags)
        ctx->lc_state = LCS_INITIALIZED;
        ctx->lc_tags = tags;
        if (tags & LCT_REMEMBER) {
-               write_lock(&lu_keys_guard);
+               spin_lock(&lu_context_remembered_guard);
                list_add(&ctx->lc_remember, &lu_context_remembered);
-               write_unlock(&lu_keys_guard);
+               spin_unlock(&lu_context_remembered_guard);
        } else {
                INIT_LIST_HEAD(&ctx->lc_remember);
        }
@@ -1693,14 +1788,13 @@ void lu_context_fini(struct lu_context *ctx)
 
        if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
                LASSERT(list_empty(&ctx->lc_remember));
-               keys_fini(ctx);
-
-       } else { /* could race with key degister */
-               write_lock(&lu_keys_guard);
-               keys_fini(ctx);
+       } else {
+               /* could race with key degister */
+               spin_lock(&lu_context_remembered_guard);
                list_del_init(&ctx->lc_remember);
-               write_unlock(&lu_keys_guard);
+               spin_unlock(&lu_context_remembered_guard);
        }
+       keys_fini(ctx);
 }
 EXPORT_SYMBOL(lu_context_fini);
 
@@ -1721,28 +1815,35 @@ void lu_context_exit(struct lu_context *ctx)
 {
        unsigned int i;
 
-        LINVRNT(ctx->lc_state == LCS_ENTERED);
-        ctx->lc_state = LCS_LEFT;
-        if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value != NULL) {
-               /* could race with key quiescency */
-               if (ctx->lc_tags & LCT_REMEMBER)
-                       read_lock(&lu_keys_guard);
-
+       LINVRNT(ctx->lc_state == LCS_ENTERED);
+       /*
+        * Disable preempt to ensure we get a warning if
+        * any lct_exit ever tries to sleep.  That would hurt
+        * lu_context_key_quiesce() which spins waiting for us.
+        * This also ensure we aren't preempted while the state
+        * is LCS_LEAVING, as that too would cause problems for
+        * lu_context_key_quiesce().
+        */
+       preempt_disable();
+       /*
+        * Ensure lu_context_key_quiesce() sees LCS_LEAVING
+        * or we see LCT_QUIESCENT
+        */
+       smp_store_mb(ctx->lc_state, LCS_LEAVING);
+       if (ctx->lc_tags & LCT_HAS_EXIT && ctx->lc_value) {
                 for (i = 0; i < ARRAY_SIZE(lu_keys); ++i) {
-                       if (ctx->lc_value[i] != NULL) {
-                               struct lu_context_key *key;
-
-                               key = lu_keys[i];
-                               LASSERT(key != NULL);
-                               if (key->lct_exit != NULL)
-                                       key->lct_exit(ctx,
-                                                     key, ctx->lc_value[i]);
-                       }
-                }
+                       struct lu_context_key *key;
 
-               if (ctx->lc_tags & LCT_REMEMBER)
-                       read_unlock(&lu_keys_guard);
+                       key = lu_keys[i];
+                       if (ctx->lc_value[i] &&
+                           !(key->lct_tags & LCT_QUIESCENT) &&
+                           key->lct_exit)
+                               key->lct_exit(ctx, key, ctx->lc_value[i]);
+               }
         }
+
+       smp_store_release(&ctx->lc_state, LCS_LEFT);
+       preempt_enable();
 }
 EXPORT_SYMBOL(lu_context_exit);
 
@@ -1766,46 +1867,44 @@ int lu_context_refill(struct lu_context *ctx)
  * predefined when the lu_device type are registered, during the module probe
  * phase.
  */
-u32 lu_context_tags_default;
-u32 lu_session_tags_default;
+u32 lu_context_tags_default = LCT_CL_THREAD;
+u32 lu_session_tags_default = LCT_SESSION;
 
-#ifdef HAVE_SERVER_SUPPORT
 void lu_context_tags_update(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_context_tags_default |= tags;
        atomic_inc(&key_set_version);
-       write_unlock(&lu_keys_guard);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_context_tags_update);
 
 void lu_context_tags_clear(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_context_tags_default &= ~tags;
        atomic_inc(&key_set_version);
-       write_unlock(&lu_keys_guard);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_context_tags_clear);
 
 void lu_session_tags_update(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_session_tags_default |= tags;
        atomic_inc(&key_set_version);
-       write_unlock(&lu_keys_guard);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_session_tags_update);
 
 void lu_session_tags_clear(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_session_tags_default &= ~tags;
        atomic_inc(&key_set_version);
-       write_unlock(&lu_keys_guard);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_session_tags_clear);
-#endif /* HAVE_SERVER_SUPPORT */
 
 int lu_env_init(struct lu_env *env, __u32 tags)
 {
@@ -1866,6 +1965,119 @@ int lu_env_refill_by_tags(struct lu_env *env, __u32 ctags,
 }
 EXPORT_SYMBOL(lu_env_refill_by_tags);
 
+
+struct lu_env_item {
+       struct task_struct *lei_task;   /* rhashtable key */
+       struct rhash_head lei_linkage;
+       struct lu_env *lei_env;
+       struct rcu_head lei_rcu_head;
+};
+
+static const struct rhashtable_params lu_env_rhash_params = {
+       .key_len     = sizeof(struct task_struct *),
+       .key_offset  = offsetof(struct lu_env_item, lei_task),
+       .head_offset = offsetof(struct lu_env_item, lei_linkage),
+    };
+
+struct rhashtable lu_env_rhash;
+
+struct lu_env_percpu {
+       struct task_struct *lep_task;
+       struct lu_env *lep_env ____cacheline_aligned_in_smp;
+};
+
+static struct lu_env_percpu lu_env_percpu[NR_CPUS];
+
+int lu_env_add_task(struct lu_env *env, struct task_struct *task)
+{
+       struct lu_env_item *lei, *old;
+
+       LASSERT(env);
+
+       OBD_ALLOC_PTR(lei);
+       if (!lei)
+               return -ENOMEM;
+
+       lei->lei_task = task;
+       lei->lei_env = env;
+
+       old = rhashtable_lookup_get_insert_fast(&lu_env_rhash,
+                                               &lei->lei_linkage,
+                                               lu_env_rhash_params);
+       LASSERT(!old);
+
+       return 0;
+}
+EXPORT_SYMBOL(lu_env_add_task);
+
+int lu_env_add(struct lu_env *env)
+{
+       return lu_env_add_task(env, current);
+}
+EXPORT_SYMBOL(lu_env_add);
+
+static void lu_env_item_free(struct rcu_head *head)
+{
+       struct lu_env_item *lei;
+
+       lei = container_of(head, struct lu_env_item, lei_rcu_head);
+       OBD_FREE_PTR(lei);
+}
+
+void lu_env_remove(struct lu_env *env)
+{
+       struct lu_env_item *lei;
+       const void *task = current;
+       int i;
+
+       for_each_possible_cpu(i) {
+               if (lu_env_percpu[i].lep_env == env) {
+                       LASSERT(lu_env_percpu[i].lep_task == task);
+                       lu_env_percpu[i].lep_task = NULL;
+                       lu_env_percpu[i].lep_env = NULL;
+               }
+       }
+
+       /* The rcu_lock is not taking in this case since the key
+        * used is the actual task_struct. This implies that each
+        * object is only removed by the owning thread, so there
+        * can never be a race on a particular object.
+        */
+       lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
+                                    lu_env_rhash_params);
+       if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage,
+                                         lu_env_rhash_params) == 0)
+               call_rcu(&lei->lei_rcu_head, lu_env_item_free);
+}
+EXPORT_SYMBOL(lu_env_remove);
+
+struct lu_env *lu_env_find(void)
+{
+       struct lu_env *env = NULL;
+       struct lu_env_item *lei;
+       const void *task = current;
+       int i = get_cpu();
+
+       if (lu_env_percpu[i].lep_task == current) {
+               env = lu_env_percpu[i].lep_env;
+               put_cpu();
+               LASSERT(env);
+               return env;
+       }
+
+       lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
+                                    lu_env_rhash_params);
+       if (lei) {
+               env = lei->lei_env;
+               lu_env_percpu[i].lep_task = current;
+               lu_env_percpu[i].lep_env = env;
+       }
+       put_cpu();
+
+       return env;
+}
+EXPORT_SYMBOL(lu_env_find);
+
 static struct shrinker *lu_site_shrinker;
 
 typedef struct lu_site_stats{
@@ -1876,37 +2088,21 @@ typedef struct lu_site_stats{
 } lu_site_stats_t;
 
 static void lu_site_stats_get(const struct lu_site *s,
-                              lu_site_stats_t *stats, int populated)
+                             lu_site_stats_t *stats)
 {
-       struct cfs_hash *hs = s->ls_obj_hash;
-       struct cfs_hash_bd bd;
-       unsigned int i;
+       int cnt = atomic_read(&s->ls_obj_hash.nelems);
        /*
         * percpu_counter_sum_positive() won't accept a const pointer
         * as it does modify the struct by taking a spinlock
         */
        struct lu_site *s2 = (struct lu_site *)s;
 
-       stats->lss_busy += cfs_hash_size_get(hs) -
+       stats->lss_busy += cnt -
                percpu_counter_sum_positive(&s2->ls_lru_len_counter);
-        cfs_hash_for_each_bucket(hs, &bd, i) {
-               struct hlist_head *hhead;
-
-                cfs_hash_bd_lock(hs, &bd, 1);
-                stats->lss_total += cfs_hash_bd_count_get(&bd);
-                stats->lss_max_search = max((int)stats->lss_max_search,
-                                            cfs_hash_bd_depmax_get(&bd));
-                if (!populated) {
-                        cfs_hash_bd_unlock(hs, &bd, 1);
-                        continue;
-                }
 
-                cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
-                       if (!hlist_empty(hhead))
-                                stats->lss_populated++;
-                }
-                cfs_hash_bd_unlock(hs, &bd, 1);
-        }
+       stats->lss_total += cnt;
+       stats->lss_max_search = 0;
+       stats->lss_populated = 0;
 }
 
 
@@ -2011,10 +2207,6 @@ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
                 .nr_to_scan = shrink_param(sc, nr_to_scan),
                 .gfp_mask   = shrink_param(sc, gfp_mask)
        };
-#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
-       struct shrinker* shrinker = NULL;
-#endif
-
 
        CDEBUG(D_INODE, "Shrink %lu objects\n", scv.nr_to_scan);
 
@@ -2081,7 +2273,7 @@ void lu_context_keys_dump(void)
  */
 int lu_global_init(void)
 {
-        int result;
+       int result;
        DEF_SHRINKER_VAR(shvar, lu_cache_shrink,
                         lu_cache_shrink_count, lu_cache_shrink_scan);
 
@@ -2116,6 +2308,8 @@ int lu_global_init(void)
         if (lu_site_shrinker == NULL)
                 return -ENOMEM;
 
+       result = rhashtable_init(&lu_env_rhash, &lu_env_rhash_params);
+
         return result;
 }
 
@@ -2139,6 +2333,8 @@ void lu_global_fini(void)
         lu_env_fini(&lu_shrink_env);
        up_write(&lu_sites_guard);
 
+       rhashtable_destroy(&lu_env_rhash);
+
         lu_ref_global_fini();
 }
 
@@ -2160,16 +2356,23 @@ static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
  */
 int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
 {
+       const struct bucket_table *tbl;
        lu_site_stats_t stats;
+       unsigned int chains;
 
        memset(&stats, 0, sizeof(stats));
-       lu_site_stats_get(s, &stats, 1);
-
-       seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
+       lu_site_stats_get(s, &stats);
+
+       rcu_read_lock();
+       tbl = rht_dereference_rcu(s->ls_obj_hash.tbl,
+                                 &((struct lu_site *)s)->ls_obj_hash);
+       chains = tbl->size;
+       rcu_read_unlock();
+       seq_printf(m, "%d/%d %d/%u %d %d %d %d %d %d %d\n",
                   stats.lss_busy,
                   stats.lss_total,
                   stats.lss_populated,
-                  CFS_HASH_NHLIST(s->ls_obj_hash),
+                  chains,
                   stats.lss_max_search,
                   ls_stats_read(s->ls_stats, LU_SS_CREATED),
                   ls_stats_read(s->ls_stats, LU_SS_CACHE_HIT),
@@ -2228,27 +2431,27 @@ void lu_object_assign_fid(const struct lu_env *env, struct lu_object *o,
 {
        struct lu_site          *s = o->lo_dev->ld_site;
        struct lu_fid           *old = &o->lo_header->loh_fid;
-       struct cfs_hash         *hs;
-       struct cfs_hash_bd       bd;
+       int rc;
 
        LASSERT(fid_is_zero(old));
-
+       *old = *fid;
+try_again:
+       rc = rhashtable_lookup_insert_fast(&s->ls_obj_hash,
+                                          &o->lo_header->loh_hash,
+                                          obj_hash_params);
        /* supposed to be unique */
-       hs = s->ls_obj_hash;
-       cfs_hash_bd_get_and_lock(hs, (void *)fid, &bd, 1);
-#ifdef CONFIG_LUSTRE_DEBUG_EXPENSIVE_CHECK
-       {
-               __u64 version = 0;
-               struct lu_object *shadow;
-
-               shadow = htable_lookup(s, &bd, fid, &version);
-               /* supposed to be unique */
-               LASSERT(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT);
+       LASSERT(rc != -EEXIST);
+       /* handle hash table resizing */
+       if (rc == -ENOMEM) {
+               msleep(20);
+               goto try_again;
        }
-#endif
-       *old = *fid;
-       cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-       cfs_hash_bd_unlock(hs, &bd, 1);
+       /* trim the hash if its growing to big */
+       lu_object_limit(env, o->lo_dev);
+       if (rc == -E2BIG)
+               goto try_again;
+
+       LASSERTF(rc == 0, "failed hashtable insertion: rc = %d\n", rc);
 }
 EXPORT_SYMBOL(lu_object_assign_fid);
 
@@ -2261,11 +2464,19 @@ struct lu_object *lu_object_anon(const struct lu_env *env,
                                 struct lu_device *dev,
                                 const struct lu_object_conf *conf)
 {
-       struct lu_fid     fid;
+       struct lu_fid fid;
        struct lu_object *o;
+       int rc;
 
        fid_zero(&fid);
-       o = lu_object_alloc(env, dev, &fid, conf);
+       o = lu_object_alloc(env, dev, &fid);
+       if (!IS_ERR(o)) {
+               rc = lu_object_start(env, dev, o, conf);
+               if (rc) {
+                       lu_object_free(env, o);
+                       return ERR_PTR(rc);
+               }
+       }
 
        return o;
 }