Whamcloud - gitweb
LU-9679 lustre: use LIST_HEAD() for local lists.
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index a236b61..9f97180 100644 (file)
@@ -47,6 +47,7 @@
 #else
 #include <libcfs/linux/processor.h>
 #endif
+#include <linux/random.h>
 
 #include <libcfs/libcfs.h>
 #include <libcfs/libcfs_hash.h> /* hash_long() */
 struct lu_site_bkt_data {
        /**
         * LRU list, updated on each access to object. Protected by
-        * bucket lock of lu_site::ls_obj_hash.
+        * lsb_waitq.lock.
         *
         * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
-        * moved to the lu_site::ls_lru.prev (this is due to the non-existence
-        * of list_for_each_entry_safe_reverse()).
+        * moved to the lu_site::ls_lru.prev
         */
        struct list_head                lsb_lru;
        /**
         * Wait-queue signaled when an object in this site is ultimately
-        * destroyed (lu_object_free()). It is used by lu_object_find() to
-        * wait before re-trying when object in the process of destruction is
-        * found in the hash table.
+        * destroyed (lu_object_free()) or initialized (lu_object_start()).
+        * It is used by lu_object_find() to wait before re-trying when
+        * object in the process of destruction is found in the hash table;
+        * or wait object to be initialized by the allocator.
         *
         * \see htable_lookup().
         */
-       wait_queue_head_t               lsb_marche_funebre;
+       wait_queue_head_t               lsb_waitq;
 };
 
 enum {
@@ -95,9 +96,11 @@ enum {
 #define LU_SITE_BITS_MAX    24
 #define LU_SITE_BITS_MAX_CL 19
 /**
- * total 256 buckets, we don't want too many buckets because:
- * - consume too much memory
+ * Max 256 buckets, we don't want too many buckets because:
+ * - consume too much memory (currently max 16K)
  * - avoid unbalanced LRU list
+ * With few cpus there is little gain from extra buckets, so
+ * we treat this as a maximum in lu_site_init().
  */
 #define LU_SITE_BKT_BITS    8
 
@@ -113,15 +116,28 @@ MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
 
+static u32 lu_fid_hash(const void *data, u32 seed)
+{
+       const struct lu_fid *fid = data;
+
+       seed = cfs_hash_32(seed ^ fid->f_oid, 32);
+       seed ^= cfs_hash_64(fid->f_seq, 32);
+       return seed;
+}
+
+static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid)
+{
+       return lu_fid_hash(fid, s->ls_bkt_seed) &
+              (s->ls_bkt_cnt - 1);
+}
+
 wait_queue_head_t *
 lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
 {
-       struct cfs_hash_bd bd;
        struct lu_site_bkt_data *bkt;
 
-       cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
-       bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
-       return &bkt->lsb_marche_funebre;
+       bkt = &site->ls_bkts[lu_bkt_hash(site, fid)];
+       return &bkt->lsb_waitq;
 }
 EXPORT_SYMBOL(lu_site_wq_from_fid);
 
@@ -160,7 +176,6 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
        }
 
        cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
-       bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
 
        is_dying = lu_object_is_dying(top);
        if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
@@ -174,7 +189,8 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                         * somebody may be waiting for this, currently only
                         * used for cl_object, see cl_object_put_last().
                         */
-                       wake_up_all(&bkt->lsb_marche_funebre);
+                       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+                       wake_up_all(&bkt->lsb_waitq);
                }
                return;
        }
@@ -188,6 +204,9 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                        o->lo_ops->loo_object_release(env, o);
        }
 
+       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+       spin_lock(&bkt->lsb_waitq.lock);
+
        /* don't use local 'is_dying' here because if was taken without lock
         * but here we need the latest actual value of it so check lu_object
         * directly here.
@@ -196,6 +215,7 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
            (lu_object_exists(orig) || lu_object_is_cl(orig))) {
                LASSERT(list_empty(&top->loh_lru));
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
+               spin_unlock(&bkt->lsb_waitq.lock);
                percpu_counter_inc(&site->ls_lru_len_counter);
                CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n",
                       orig, top, site->ls_obj_hash, bkt);
@@ -205,22 +225,19 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
 
        /*
         * If object is dying (will not be cached) then remove it
-        * from hash table and LRU.
+        * from hash table (it is already not on the LRU).
         *
-        * This is done with hash table and LRU lists locked. As the only
+        * This is done with hash table lists locked. As the only
         * way to acquire first reference to previously unreferenced
-        * object is through hash-table lookup (lu_object_find()),
-        * or LRU scanning (lu_site_purge()), that are done under hash-table
-        * and LRU lock, no race with concurrent object lookup is possible
-        * and we can safely destroy object below.
+        * object is through hash-table lookup (lu_object_find())
+        * which is done under hash-table, no race with concurrent
+        * object lookup is possible and we can safely destroy object below.
         */
        if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
                cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
+       spin_unlock(&bkt->lsb_waitq.lock);
        cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
-       /*
-        * Object was already removed from hash and lru above, can
-        * kill it.
-        */
+       /* Object was already removed from hash above, can kill it. */
        lu_object_free(env, orig);
 }
 EXPORT_SYMBOL(lu_object_put);
@@ -255,8 +272,10 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
                if (!list_empty(&top->loh_lru)) {
                        struct lu_site_bkt_data *bkt;
 
+                       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+                       spin_lock(&bkt->lsb_waitq.lock);
                        list_del_init(&top->loh_lru);
-                       bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
+                       spin_unlock(&bkt->lsb_waitq.lock);
                        percpu_counter_dec(&site->ls_lru_len_counter);
                }
                cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
@@ -273,17 +292,9 @@ EXPORT_SYMBOL(lu_object_unhash);
  */
 static struct lu_object *lu_object_alloc(const struct lu_env *env,
                                         struct lu_device *dev,
-                                        const struct lu_fid *f,
-                                        const struct lu_object_conf *conf)
+                                        const struct lu_fid *f)
 {
-       struct lu_object *scan;
        struct lu_object *top;
-       struct list_head *layers;
-       unsigned int init_mask = 0;
-       unsigned int init_flag;
-       int clean;
-       int result;
-       ENTRY;
 
        /*
         * Create top-level object slice. This will also create
@@ -291,15 +302,36 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
         */
        top = dev->ld_ops->ldo_object_alloc(env, NULL, dev);
        if (top == NULL)
-               RETURN(ERR_PTR(-ENOMEM));
+               return ERR_PTR(-ENOMEM);
        if (IS_ERR(top))
-               RETURN(top);
-        /*
-         * This is the only place where object fid is assigned. It's constant
-         * after this point.
-         */
-        top->lo_header->loh_fid = *f;
-        layers = &top->lo_header->loh_layers;
+               return top;
+       /*
+        * This is the only place where object fid is assigned. It's constant
+        * after this point.
+        */
+       top->lo_header->loh_fid = *f;
+
+       return top;
+}
+
+/**
+ * Initialize object.
+ *
+ * This is called after object hash insertion to avoid returning an object with
+ * stale attributes.
+ */
+static int lu_object_start(const struct lu_env *env, struct lu_device *dev,
+                          struct lu_object *top,
+                          const struct lu_object_conf *conf)
+{
+       struct lu_object *scan;
+       struct list_head *layers;
+       unsigned int init_mask = 0;
+       unsigned int init_flag;
+       int clean;
+       int result;
+
+       layers = &top->lo_header->loh_layers;
 
        do {
                /*
@@ -314,10 +346,9 @@ static struct lu_object *lu_object_alloc(const struct lu_env *env,
                        clean = 0;
                        scan->lo_header = top->lo_header;
                        result = scan->lo_ops->loo_object_init(env, scan, conf);
-                       if (result != 0) {
-                               lu_object_free(env, top);
-                               RETURN(ERR_PTR(result));
-                       }
+                       if (result)
+                               return result;
+
                        init_mask |= init_flag;
 next:
                        init_flag <<= 1;
@@ -325,17 +356,18 @@ next:
        } while (!clean);
 
        list_for_each_entry_reverse(scan, layers, lo_linkage) {
-                if (scan->lo_ops->loo_object_start != NULL) {
-                        result = scan->lo_ops->loo_object_start(env, scan);
-                        if (result != 0) {
-                                lu_object_free(env, top);
-                                RETURN(ERR_PTR(result));
-                        }
-                }
-        }
+               if (scan->lo_ops->loo_object_start != NULL) {
+                       result = scan->lo_ops->loo_object_start(env, scan);
+                       if (result)
+                               return result;
+               }
+       }
+
+       lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
 
-        lprocfs_counter_incr(dev->ld_site->ls_stats, LU_SS_CREATED);
-        RETURN(top);
+       set_bit(LU_OBJECT_INITED, &top->lo_header->loh_flags);
+
+       return 0;
 }
 
 /**
@@ -344,10 +376,10 @@ next:
 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 {
        wait_queue_head_t *wq;
-       struct lu_site          *site;
-       struct lu_object        *scan;
-       struct list_head        *layers;
-       struct list_head         splice;
+       struct lu_site *site;
+       struct lu_object *scan;
+       struct list_head *layers;
+       LIST_HEAD(splice);
 
        site = o->lo_dev->ld_site;
        layers = &o->lo_header->loh_layers;
@@ -366,7 +398,6 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
          * necessary, because lu_object_header is freed together with the
          * top-level slice.
          */
-       INIT_LIST_HEAD(&splice);
        list_splice_init(layers, &splice);
        while (!list_empty(&splice)) {
                /*
@@ -395,9 +426,7 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
         struct lu_object_header *h;
         struct lu_object_header *temp;
         struct lu_site_bkt_data *bkt;
-       struct cfs_hash_bd            bd;
-       struct cfs_hash_bd            bd2;
-       struct list_head         dispose;
+       LIST_HEAD(dispose);
        int                      did_sth;
        unsigned int             start = 0;
         int                      count;
@@ -407,14 +436,13 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
        if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
                RETURN(0);
 
-       INIT_LIST_HEAD(&dispose);
         /*
          * Under LRU list lock, scan LRU list and move unreferenced objects to
          * the dispose list, removing them from LRU and hash table.
          */
        if (nr != ~0)
                start = s->ls_purge_start;
-       bnr = (nr == ~0) ? -1 : nr / (int)CFS_HASH_NBKT(s->ls_obj_hash) + 1;
+       bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1;
  again:
        /*
         * It doesn't make any sense to make purge threads parallel, that can
@@ -426,21 +454,21 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                goto out;
 
         did_sth = 0;
-        cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
-                if (i < start)
-                        continue;
+       for (i = start; i < s->ls_bkt_cnt ; i++) {
                 count = bnr;
-                cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
-                bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
+               bkt = &s->ls_bkts[i];
+               spin_lock(&bkt->lsb_waitq.lock);
 
                list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
                        LASSERT(atomic_read(&h->loh_ref) == 0);
 
-                        cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
-                        LASSERT(bd.bd_bucket == bd2.bd_bucket);
+                       LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i);
 
-                        cfs_hash_bd_del_locked(s->ls_obj_hash,
-                                               &bd2, &h->loh_hash);
+                       /* Cannot remove from hash under current spinlock,
+                        * so set flag to stop object from being found
+                        * by htable_lookup().
+                        */
+                       set_bit(LU_OBJECT_PURGING, &h->loh_flags);
                        list_move(&h->loh_lru, &dispose);
                        percpu_counter_dec(&s->ls_lru_len_counter);
                         if (did_sth == 0)
@@ -453,15 +481,16 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                                 break;
 
                }
-               cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
+               spin_unlock(&bkt->lsb_waitq.lock);
                cond_resched();
                /*
                 * Free everything on the dispose list. This is safe against
                 * races due to the reasons described in lu_object_put().
                 */
-               while (!list_empty(&dispose)) {
-                       h = container_of0(dispose.next,
-                                         struct lu_object_header, loh_lru);
+               while ((h = list_first_entry_or_null(&dispose,
+                                                    struct lu_object_header,
+                                                    loh_lru)) != NULL) {
+                       cfs_hash_del(s->ls_obj_hash, &h->loh_fid, &h->loh_hash);
                        list_del_init(&h->loh_lru);
                        lu_object_free(env, lu_object_top(h));
                        lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
@@ -477,8 +506,7 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                 goto again;
         }
         /* race on s->ls_purge_start, but nobody cares */
-        s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
-
+       s->ls_purge_start = i & (s->ls_bkt_cnt - 1);
 out:
         return nr;
 }
@@ -633,7 +661,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
                                       const struct lu_fid *f,
                                       __u64 *version)
 {
-       struct lu_site_bkt_data *bkt;
        struct lu_object_header *h;
        struct hlist_node *hnode;
        __u64 ver = cfs_hash_bd_version_get(bd);
@@ -642,7 +669,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
                return ERR_PTR(-ENOENT);
 
        *version = ver;
-       bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
        /* cfs_hash_bd_peek_locked is a somehow "internal" function
         * of cfs_hash, it doesn't add refcount on object. */
        hnode = cfs_hash_bd_peek_locked(s->ls_obj_hash, bd, (void *)f);
@@ -652,12 +678,29 @@ static struct lu_object *htable_lookup(struct lu_site *s,
        }
 
        h = container_of0(hnode, struct lu_object_header, loh_hash);
-       cfs_hash_get(s->ls_obj_hash, hnode);
-       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        if (!list_empty(&h->loh_lru)) {
+               struct lu_site_bkt_data *bkt;
+
+               bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
+               spin_lock(&bkt->lsb_waitq.lock);
+               /* Might have just been moved to the dispose list, in which
+                * case LU_OBJECT_PURGING will be set.  In that case,
+                * delete it from the hash table immediately.
+                * When lu_site_purge_objects() tried, it will find it
+                * isn't there, which is harmless.
+                */
+               if (test_bit(LU_OBJECT_PURGING, &h->loh_flags)) {
+                       spin_unlock(&bkt->lsb_waitq.lock);
+                       cfs_hash_bd_del_locked(s->ls_obj_hash, bd, hnode);
+                       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
+                       return ERR_PTR(-ENOENT);
+               }
                list_del_init(&h->loh_lru);
+               spin_unlock(&bkt->lsb_waitq.lock);
                percpu_counter_dec(&s->ls_lru_len_counter);
        }
+       cfs_hash_get(s->ls_obj_hash, hnode);
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        return lu_object_top(h);
 }
 
@@ -715,7 +758,11 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        struct lu_site *s;
        struct cfs_hash *hs;
        struct cfs_hash_bd bd;
+       struct lu_site_bkt_data *bkt;
        __u64 version = 0;
+       int rc;
+
+       ENTRY;
 
        /*
         * This uses standard index maintenance protocol:
@@ -737,25 +784,51 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
         */
        s  = dev->ld_site;
        hs = s->ls_obj_hash;
+
+       if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
+               lu_site_purge(env, s, -1);
+
+       bkt = &s->ls_bkts[lu_bkt_hash(s, f)];
        cfs_hash_bd_get(hs, f, &bd);
        if (!(conf && conf->loc_flags & LOC_F_NEW)) {
                cfs_hash_bd_lock(hs, &bd, 1);
                o = htable_lookup(s, &bd, f, &version);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
-               if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
-                       return o;
+               if (!IS_ERR(o)) {
+                       if (likely(lu_object_is_inited(o->lo_header)))
+                               RETURN(o);
+
+                       wait_event_idle(bkt->lsb_waitq,
+                                       lu_object_is_inited(o->lo_header) ||
+                                       lu_object_is_dying(o->lo_header));
+
+                       if (lu_object_is_dying(o->lo_header)) {
+                               lu_object_put(env, o);
+
+                               RETURN(ERR_PTR(-ENOENT));
+                       }
+
+                       RETURN(o);
+               }
+
+               if (PTR_ERR(o) != -ENOENT)
+                       RETURN(o);
        }
+
        /*
-        * Allocate new object. This may result in rather complicated
-        * operations, including fld queries, inode loading, etc.
+        * Allocate new object, NB, object is unitialized in case object
+        * is changed between allocation and hash insertion, thus the object
+        * with stale attributes is returned.
         */
-       o = lu_object_alloc(env, dev, f, conf);
+       o = lu_object_alloc(env, dev, f);
        if (IS_ERR(o))
-               return o;
+               RETURN(o);
 
        LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
+       CFS_RACE_WAIT(OBD_FAIL_OBD_ZERO_NLINK_RACE);
+
        cfs_hash_bd_lock(hs, &bd, 1);
 
        if (conf && conf->loc_flags & LOC_F_NEW)
@@ -766,15 +839,41 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
                cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
+               /*
+                * This may result in rather complicated operations, including
+                * fld queries, inode loading, etc.
+                */
+               rc = lu_object_start(env, dev, o, conf);
+               if (rc) {
+                       lu_object_put_nocache(env, o);
+                       RETURN(ERR_PTR(rc));
+               }
+
+               wake_up_all(&bkt->lsb_waitq);
+
                lu_object_limit(env, dev);
 
-               return o;
+               RETURN(o);
        }
 
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_RACE);
        cfs_hash_bd_unlock(hs, &bd, 1);
        lu_object_free(env, o);
-       return shadow;
+
+       if (!(conf && conf->loc_flags & LOC_F_NEW) &&
+           !lu_object_is_inited(shadow->lo_header)) {
+               wait_event_idle(bkt->lsb_waitq,
+                               lu_object_is_inited(shadow->lo_header) ||
+                               lu_object_is_dying(shadow->lo_header));
+
+               if (lu_object_is_dying(shadow->lo_header)) {
+                       lu_object_put(env, shadow);
+
+                       RETURN(ERR_PTR(-ENOENT));
+               }
+       }
+
+       RETURN(shadow);
 }
 EXPORT_SYMBOL(lu_object_find_at);
 
@@ -907,7 +1006,7 @@ static unsigned long lu_htable_order(struct lu_device *top)
          *
          * Size of lu_object is (arbitrary) taken as 1K (together with inode).
          */
-       cache_size = totalram_pages;
+       cache_size = cfs_totalram_pages();
 
 #if BITS_PER_LONG == 32
         /* limit hashtable size for lowmem systems to low RAM */
@@ -1019,7 +1118,6 @@ EXPORT_SYMBOL(lu_dev_del_linkage);
 int lu_site_init(struct lu_site *s, struct lu_device *top)
 {
        struct lu_site_bkt_data *bkt;
-       struct cfs_hash_bd bd;
        char name[16];
        unsigned long bits;
        unsigned int i;
@@ -1042,7 +1140,7 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
             bits >= LU_SITE_BITS_MIN; bits--) {
                s->ls_obj_hash = cfs_hash_create(name, bits, bits,
                                                 bits - LU_SITE_BKT_BITS,
-                                                sizeof(*bkt), 0, 0,
+                                                0, 0, 0,
                                                 &lu_site_hash_ops,
                                                 CFS_HASH_SPIN_BKTLOCK |
                                                 CFS_HASH_NO_ITEMREF |
@@ -1058,16 +1156,30 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
                return -ENOMEM;
        }
 
-       cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
-               bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
+       s->ls_bkt_seed = prandom_u32();
+       s->ls_bkt_cnt = max_t(long, 1 << LU_SITE_BKT_BITS,
+                             2 * num_possible_cpus());
+       s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt);
+       OBD_ALLOC_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*bkt));
+       if (!s->ls_bkts) {
+               cfs_hash_putref(s->ls_obj_hash);
+               s->ls_obj_hash = NULL;
+               s->ls_bkts = NULL;
+               return -ENOMEM;
+       }
+
+       for (i = 0; i < s->ls_bkt_cnt; i++) {
+               bkt = &s->ls_bkts[i];
                INIT_LIST_HEAD(&bkt->lsb_lru);
-               init_waitqueue_head(&bkt->lsb_marche_funebre);
+               init_waitqueue_head(&bkt->lsb_waitq);
        }
 
         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
         if (s->ls_stats == NULL) {
-                cfs_hash_putref(s->ls_obj_hash);
+               OBD_FREE_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*bkt));
+               cfs_hash_putref(s->ls_obj_hash);
                 s->ls_obj_hash = NULL;
+               s->ls_bkts = NULL;
                 return -ENOMEM;
         }
 
@@ -1115,6 +1227,8 @@ void lu_site_fini(struct lu_site *s)
                 s->ls_obj_hash = NULL;
         }
 
+       OBD_FREE_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*s->ls_bkts));
+
         if (s->ls_top_dev != NULL) {
                 s->ls_top_dev->ld_site = NULL;
                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
@@ -1331,14 +1445,8 @@ void lu_stack_fini(const struct lu_env *env, struct lu_device *top)
 
         for (scan = top; scan != NULL; scan = next) {
                 const struct lu_device_type *ldt = scan->ld_type;
-                struct obd_type             *type;
 
                 next = ldt->ldt_ops->ldto_device_free(env, scan);
-                type = ldt->ldt_obd_type;
-                if (type != NULL) {
-                        type->typ_refcnt--;
-                        class_put_type(type);
-                }
         }
 }
 
@@ -1351,7 +1459,6 @@ enum {
 
 static struct lu_context_key *lu_keys[LU_CONTEXT_KEY_NR] = { NULL, };
 
-DEFINE_RWLOCK(lu_keys_guard);
 static DECLARE_RWSEM(lu_key_initing);
 
 /**
@@ -1405,7 +1512,7 @@ static void key_fini(struct lu_context *ctx, int index)
                 key = lu_keys[index];
                 LASSERT(key != NULL);
                 LASSERT(key->lct_fini != NULL);
-               LASSERT(atomic_read(&key->lct_used) > 1);
+               LASSERT(atomic_read(&key->lct_used) > 0);
 
                 key->lct_fini(ctx, key, ctx->lc_value[index]);
                 lu_ref_del(&key->lct_reference, "ctx", ctx);
@@ -1545,6 +1652,7 @@ EXPORT_SYMBOL(lu_context_key_get);
  * List of remembered contexts. XXX document me.
  */
 static LIST_HEAD(lu_context_remembered);
+static DEFINE_SPINLOCK(lu_context_remembered_guard);
 
 /**
  * Destroy \a key in all remembered contexts. This is used to destroy key
@@ -1565,13 +1673,13 @@ void lu_context_key_quiesce(struct lu_context_key *key)
                key->lct_tags |= LCT_QUIESCENT;
                up_write(&lu_key_initing);
 
-               write_lock(&lu_keys_guard);
+               spin_lock(&lu_context_remembered_guard);
                list_for_each_entry(ctx, &lu_context_remembered, lc_remember) {
                        spin_until_cond(READ_ONCE(ctx->lc_state) != LCS_LEAVING);
                        key_fini(ctx, key->lct_index);
                }
 
-               write_unlock(&lu_keys_guard);
+               spin_unlock(&lu_context_remembered_guard);
        }
 }
 
@@ -1677,9 +1785,9 @@ int lu_context_init(struct lu_context *ctx, __u32 tags)
        ctx->lc_state = LCS_INITIALIZED;
        ctx->lc_tags = tags;
        if (tags & LCT_REMEMBER) {
-               write_lock(&lu_keys_guard);
+               spin_lock(&lu_context_remembered_guard);
                list_add(&ctx->lc_remember, &lu_context_remembered);
-               write_unlock(&lu_keys_guard);
+               spin_unlock(&lu_context_remembered_guard);
        } else {
                INIT_LIST_HEAD(&ctx->lc_remember);
        }
@@ -1702,14 +1810,13 @@ void lu_context_fini(struct lu_context *ctx)
 
        if ((ctx->lc_tags & LCT_REMEMBER) == 0) {
                LASSERT(list_empty(&ctx->lc_remember));
-               keys_fini(ctx);
-
-       } else { /* could race with key degister */
-               write_lock(&lu_keys_guard);
-               keys_fini(ctx);
+       } else {
+               /* could race with key degister */
+               spin_lock(&lu_context_remembered_guard);
                list_del_init(&ctx->lc_remember);
-               write_unlock(&lu_keys_guard);
+               spin_unlock(&lu_context_remembered_guard);
        }
+       keys_fini(ctx);
 }
 EXPORT_SYMBOL(lu_context_fini);
 
@@ -1782,46 +1889,44 @@ int lu_context_refill(struct lu_context *ctx)
  * predefined when the lu_device type are registered, during the module probe
  * phase.
  */
-u32 lu_context_tags_default;
-u32 lu_session_tags_default;
+u32 lu_context_tags_default = LCT_CL_THREAD;
+u32 lu_session_tags_default = LCT_SESSION;
 
-#ifdef HAVE_SERVER_SUPPORT
 void lu_context_tags_update(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_context_tags_default |= tags;
        atomic_inc(&key_set_version);
-       write_unlock(&lu_keys_guard);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_context_tags_update);
 
 void lu_context_tags_clear(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_context_tags_default &= ~tags;
        atomic_inc(&key_set_version);
-       write_unlock(&lu_keys_guard);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_context_tags_clear);
 
 void lu_session_tags_update(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_session_tags_default |= tags;
        atomic_inc(&key_set_version);
-       write_unlock(&lu_keys_guard);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_session_tags_update);
 
 void lu_session_tags_clear(__u32 tags)
 {
-       write_lock(&lu_keys_guard);
+       spin_lock(&lu_context_remembered_guard);
        lu_session_tags_default &= ~tags;
        atomic_inc(&key_set_version);
-       write_unlock(&lu_keys_guard);
+       spin_unlock(&lu_context_remembered_guard);
 }
 EXPORT_SYMBOL(lu_session_tags_clear);
-#endif /* HAVE_SERVER_SUPPORT */
 
 int lu_env_init(struct lu_env *env, __u32 tags)
 {
@@ -1887,6 +1992,7 @@ struct lu_env_item {
        struct task_struct *lei_task;   /* rhashtable key */
        struct rhash_head lei_linkage;
        struct lu_env *lei_env;
+       struct rcu_head lei_rcu_head;
 };
 
 static const struct rhashtable_params lu_env_rhash_params = {
@@ -1926,6 +2032,14 @@ int lu_env_add(struct lu_env *env)
 }
 EXPORT_SYMBOL(lu_env_add);
 
+static void lu_env_item_free(struct rcu_head *head)
+{
+       struct lu_env_item *lei;
+
+       lei = container_of(head, struct lu_env_item, lei_rcu_head);
+       OBD_FREE_PTR(lei);
+}
+
 void lu_env_remove(struct lu_env *env)
 {
        struct lu_env_item *lei;
@@ -1940,13 +2054,16 @@ void lu_env_remove(struct lu_env *env)
                }
        }
 
-       rcu_read_lock();
+       /* The rcu_lock is not taking in this case since the key
+        * used is the actual task_struct. This implies that each
+        * object is only removed by the owning thread, so there
+        * can never be a race on a particular object.
+        */
        lei = rhashtable_lookup_fast(&lu_env_rhash, &task,
                                     lu_env_rhash_params);
        if (lei && rhashtable_remove_fast(&lu_env_rhash, &lei->lei_linkage,
                                          lu_env_rhash_params) == 0)
-               OBD_FREE_PTR(lei);
-       rcu_read_unlock();
+               call_rcu(&lei->lei_rcu_head, lu_env_item_free);
 }
 EXPORT_SYMBOL(lu_env_remove);
 
@@ -1987,37 +2104,21 @@ typedef struct lu_site_stats{
 } lu_site_stats_t;
 
 static void lu_site_stats_get(const struct lu_site *s,
-                              lu_site_stats_t *stats, int populated)
+                             lu_site_stats_t *stats)
 {
-       struct cfs_hash *hs = s->ls_obj_hash;
-       struct cfs_hash_bd bd;
-       unsigned int i;
+       int cnt = cfs_hash_size_get(s->ls_obj_hash);
        /*
         * percpu_counter_sum_positive() won't accept a const pointer
         * as it does modify the struct by taking a spinlock
         */
        struct lu_site *s2 = (struct lu_site *)s;
 
-       stats->lss_busy += cfs_hash_size_get(hs) -
+       stats->lss_busy += cnt -
                percpu_counter_sum_positive(&s2->ls_lru_len_counter);
-        cfs_hash_for_each_bucket(hs, &bd, i) {
-               struct hlist_head *hhead;
-
-                cfs_hash_bd_lock(hs, &bd, 1);
-                stats->lss_total += cfs_hash_bd_count_get(&bd);
-                stats->lss_max_search = max((int)stats->lss_max_search,
-                                            cfs_hash_bd_depmax_get(&bd));
-                if (!populated) {
-                        cfs_hash_bd_unlock(hs, &bd, 1);
-                        continue;
-                }
 
-                cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
-                       if (!hlist_empty(hhead))
-                                stats->lss_populated++;
-                }
-                cfs_hash_bd_unlock(hs, &bd, 1);
-        }
+       stats->lss_total += cnt;
+       stats->lss_max_search = 0;
+       stats->lss_populated = 0;
 }
 
 
@@ -2122,10 +2223,6 @@ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
                 .nr_to_scan = shrink_param(sc, nr_to_scan),
                 .gfp_mask   = shrink_param(sc, gfp_mask)
        };
-#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
-       struct shrinker* shrinker = NULL;
-#endif
-
 
        CDEBUG(D_INODE, "Shrink %lu objects\n", scv.nr_to_scan);
 
@@ -2278,7 +2375,7 @@ int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
        lu_site_stats_t stats;
 
        memset(&stats, 0, sizeof(stats));
-       lu_site_stats_get(s, &stats, 1);
+       lu_site_stats_get(s, &stats);
 
        seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
                   stats.lss_busy,
@@ -2376,11 +2473,19 @@ struct lu_object *lu_object_anon(const struct lu_env *env,
                                 struct lu_device *dev,
                                 const struct lu_object_conf *conf)
 {
-       struct lu_fid     fid;
+       struct lu_fid fid;
        struct lu_object *o;
+       int rc;
 
        fid_zero(&fid);
-       o = lu_object_alloc(env, dev, &fid, conf);
+       o = lu_object_alloc(env, dev, &fid);
+       if (!IS_ERR(o)) {
+               rc = lu_object_start(env, dev, o, conf);
+               if (rc) {
+                       lu_object_free(env, o);
+                       return ERR_PTR(rc);
+               }
+       }
 
        return o;
 }