Whamcloud - gitweb
LU-9679 lustre: use LIST_HEAD() for local lists.
[fs/lustre-release.git] / lustre / obdclass / lu_object.c
index b6279f0..9f97180 100644 (file)
@@ -47,6 +47,7 @@
 #else
 #include <libcfs/linux/processor.h>
 #endif
+#include <linux/random.h>
 
 #include <libcfs/libcfs.h>
 #include <libcfs/libcfs_hash.h> /* hash_long() */
 struct lu_site_bkt_data {
        /**
         * LRU list, updated on each access to object. Protected by
-        * bucket lock of lu_site::ls_obj_hash.
+        * lsb_waitq.lock.
         *
         * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
-        * moved to the lu_site::ls_lru.prev (this is due to the non-existence
-        * of list_for_each_entry_safe_reverse()).
+        * moved to the lu_site::ls_lru.prev
         */
        struct list_head                lsb_lru;
        /**
@@ -96,9 +96,11 @@ enum {
 #define LU_SITE_BITS_MAX    24
 #define LU_SITE_BITS_MAX_CL 19
 /**
- * total 256 buckets, we don't want too many buckets because:
- * - consume too much memory
+ * Max 256 buckets, we don't want too many buckets because:
+ * - consume too much memory (currently max 16K)
  * - avoid unbalanced LRU list
+ * With few cpus there is little gain from extra buckets, so
+ * we treat this as a maximum in lu_site_init().
  */
 #define LU_SITE_BKT_BITS    8
 
@@ -114,14 +116,27 @@ MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
 
+static u32 lu_fid_hash(const void *data, u32 seed)
+{
+       const struct lu_fid *fid = data;
+
+       seed = cfs_hash_32(seed ^ fid->f_oid, 32);
+       seed ^= cfs_hash_64(fid->f_seq, 32);
+       return seed;
+}
+
+static inline int lu_bkt_hash(struct lu_site *s, const struct lu_fid *fid)
+{
+       return lu_fid_hash(fid, s->ls_bkt_seed) &
+              (s->ls_bkt_cnt - 1);
+}
+
 wait_queue_head_t *
 lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
 {
-       struct cfs_hash_bd bd;
        struct lu_site_bkt_data *bkt;
 
-       cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
-       bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
+       bkt = &site->ls_bkts[lu_bkt_hash(site, fid)];
        return &bkt->lsb_waitq;
 }
 EXPORT_SYMBOL(lu_site_wq_from_fid);
@@ -161,7 +176,6 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
        }
 
        cfs_hash_bd_get(site->ls_obj_hash, &top->loh_fid, &bd);
-       bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
 
        is_dying = lu_object_is_dying(top);
        if (!cfs_hash_bd_dec_and_lock(site->ls_obj_hash, &bd, &top->loh_ref)) {
@@ -175,6 +189,7 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                         * somebody may be waiting for this, currently only
                         * used for cl_object, see cl_object_put_last().
                         */
+                       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
                        wake_up_all(&bkt->lsb_waitq);
                }
                return;
@@ -189,6 +204,9 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
                        o->lo_ops->loo_object_release(env, o);
        }
 
+       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+       spin_lock(&bkt->lsb_waitq.lock);
+
        /* don't use local 'is_dying' here because if was taken without lock
         * but here we need the latest actual value of it so check lu_object
         * directly here.
@@ -197,6 +215,7 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
            (lu_object_exists(orig) || lu_object_is_cl(orig))) {
                LASSERT(list_empty(&top->loh_lru));
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
+               spin_unlock(&bkt->lsb_waitq.lock);
                percpu_counter_inc(&site->ls_lru_len_counter);
                CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n",
                       orig, top, site->ls_obj_hash, bkt);
@@ -206,22 +225,19 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
 
        /*
         * If object is dying (will not be cached) then remove it
-        * from hash table and LRU.
+        * from hash table (it is already not on the LRU).
         *
-        * This is done with hash table and LRU lists locked. As the only
+        * This is done with hash table lists locked. As the only
         * way to acquire first reference to previously unreferenced
-        * object is through hash-table lookup (lu_object_find()),
-        * or LRU scanning (lu_site_purge()), that are done under hash-table
-        * and LRU lock, no race with concurrent object lookup is possible
-        * and we can safely destroy object below.
+        * object is through hash-table lookup (lu_object_find())
+        * which is done under hash-table, no race with concurrent
+        * object lookup is possible and we can safely destroy object below.
         */
        if (!test_and_set_bit(LU_OBJECT_UNHASHED, &top->loh_flags))
                cfs_hash_bd_del_locked(site->ls_obj_hash, &bd, &top->loh_hash);
+       spin_unlock(&bkt->lsb_waitq.lock);
        cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
-       /*
-        * Object was already removed from hash and lru above, can
-        * kill it.
-        */
+       /* Object was already removed from hash above, can kill it. */
        lu_object_free(env, orig);
 }
 EXPORT_SYMBOL(lu_object_put);
@@ -256,8 +272,10 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
                if (!list_empty(&top->loh_lru)) {
                        struct lu_site_bkt_data *bkt;
 
+                       bkt = &site->ls_bkts[lu_bkt_hash(site, &top->loh_fid)];
+                       spin_lock(&bkt->lsb_waitq.lock);
                        list_del_init(&top->loh_lru);
-                       bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
+                       spin_unlock(&bkt->lsb_waitq.lock);
                        percpu_counter_dec(&site->ls_lru_len_counter);
                }
                cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
@@ -358,10 +376,10 @@ next:
 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 {
        wait_queue_head_t *wq;
-       struct lu_site          *site;
-       struct lu_object        *scan;
-       struct list_head        *layers;
-       struct list_head         splice;
+       struct lu_site *site;
+       struct lu_object *scan;
+       struct list_head *layers;
+       LIST_HEAD(splice);
 
        site = o->lo_dev->ld_site;
        layers = &o->lo_header->loh_layers;
@@ -380,7 +398,6 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
          * necessary, because lu_object_header is freed together with the
          * top-level slice.
          */
-       INIT_LIST_HEAD(&splice);
        list_splice_init(layers, &splice);
        while (!list_empty(&splice)) {
                /*
@@ -409,9 +426,7 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
         struct lu_object_header *h;
         struct lu_object_header *temp;
         struct lu_site_bkt_data *bkt;
-       struct cfs_hash_bd            bd;
-       struct cfs_hash_bd            bd2;
-       struct list_head         dispose;
+       LIST_HEAD(dispose);
        int                      did_sth;
        unsigned int             start = 0;
         int                      count;
@@ -421,14 +436,13 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
        if (OBD_FAIL_CHECK(OBD_FAIL_OBD_NO_LRU))
                RETURN(0);
 
-       INIT_LIST_HEAD(&dispose);
         /*
          * Under LRU list lock, scan LRU list and move unreferenced objects to
          * the dispose list, removing them from LRU and hash table.
          */
        if (nr != ~0)
                start = s->ls_purge_start;
-       bnr = (nr == ~0) ? -1 : nr / (int)CFS_HASH_NBKT(s->ls_obj_hash) + 1;
+       bnr = (nr == ~0) ? -1 : nr / s->ls_bkt_cnt + 1;
  again:
        /*
         * It doesn't make any sense to make purge threads parallel, that can
@@ -440,21 +454,21 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                goto out;
 
         did_sth = 0;
-        cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
-                if (i < start)
-                        continue;
+       for (i = start; i < s->ls_bkt_cnt ; i++) {
                 count = bnr;
-                cfs_hash_bd_lock(s->ls_obj_hash, &bd, 1);
-                bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
+               bkt = &s->ls_bkts[i];
+               spin_lock(&bkt->lsb_waitq.lock);
 
                list_for_each_entry_safe(h, temp, &bkt->lsb_lru, loh_lru) {
                        LASSERT(atomic_read(&h->loh_ref) == 0);
 
-                        cfs_hash_bd_get(s->ls_obj_hash, &h->loh_fid, &bd2);
-                        LASSERT(bd.bd_bucket == bd2.bd_bucket);
+                       LINVRNT(lu_bkt_hash(s, &h->loh_fid) == i);
 
-                        cfs_hash_bd_del_locked(s->ls_obj_hash,
-                                               &bd2, &h->loh_hash);
+                       /* Cannot remove from hash under current spinlock,
+                        * so set flag to stop object from being found
+                        * by htable_lookup().
+                        */
+                       set_bit(LU_OBJECT_PURGING, &h->loh_flags);
                        list_move(&h->loh_lru, &dispose);
                        percpu_counter_dec(&s->ls_lru_len_counter);
                         if (did_sth == 0)
@@ -467,15 +481,16 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                                 break;
 
                }
-               cfs_hash_bd_unlock(s->ls_obj_hash, &bd, 1);
+               spin_unlock(&bkt->lsb_waitq.lock);
                cond_resched();
                /*
                 * Free everything on the dispose list. This is safe against
                 * races due to the reasons described in lu_object_put().
                 */
-               while (!list_empty(&dispose)) {
-                       h = container_of0(dispose.next,
-                                         struct lu_object_header, loh_lru);
+               while ((h = list_first_entry_or_null(&dispose,
+                                                    struct lu_object_header,
+                                                    loh_lru)) != NULL) {
+                       cfs_hash_del(s->ls_obj_hash, &h->loh_fid, &h->loh_hash);
                        list_del_init(&h->loh_lru);
                        lu_object_free(env, lu_object_top(h));
                        lprocfs_counter_incr(s->ls_stats, LU_SS_LRU_PURGED);
@@ -491,8 +506,7 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                 goto again;
         }
         /* race on s->ls_purge_start, but nobody cares */
-        s->ls_purge_start = i % CFS_HASH_NBKT(s->ls_obj_hash);
-
+       s->ls_purge_start = i & (s->ls_bkt_cnt - 1);
 out:
         return nr;
 }
@@ -664,12 +678,29 @@ static struct lu_object *htable_lookup(struct lu_site *s,
        }
 
        h = container_of0(hnode, struct lu_object_header, loh_hash);
-       cfs_hash_get(s->ls_obj_hash, hnode);
-       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        if (!list_empty(&h->loh_lru)) {
+               struct lu_site_bkt_data *bkt;
+
+               bkt = &s->ls_bkts[lu_bkt_hash(s, &h->loh_fid)];
+               spin_lock(&bkt->lsb_waitq.lock);
+               /* Might have just been moved to the dispose list, in which
+                * case LU_OBJECT_PURGING will be set.  In that case,
+                * delete it from the hash table immediately.
+                * When lu_site_purge_objects() tried, it will find it
+                * isn't there, which is harmless.
+                */
+               if (test_bit(LU_OBJECT_PURGING, &h->loh_flags)) {
+                       spin_unlock(&bkt->lsb_waitq.lock);
+                       cfs_hash_bd_del_locked(s->ls_obj_hash, bd, hnode);
+                       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_MISS);
+                       return ERR_PTR(-ENOENT);
+               }
                list_del_init(&h->loh_lru);
+               spin_unlock(&bkt->lsb_waitq.lock);
                percpu_counter_dec(&s->ls_lru_len_counter);
        }
+       cfs_hash_get(s->ls_obj_hash, hnode);
+       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        return lu_object_top(h);
 }
 
@@ -728,7 +759,6 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        struct cfs_hash *hs;
        struct cfs_hash_bd bd;
        struct lu_site_bkt_data *bkt;
-       struct l_wait_info lwi = { 0 };
        __u64 version = 0;
        int rc;
 
@@ -758,8 +788,8 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
        if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_OBD_ZERO_NLINK_RACE)))
                lu_site_purge(env, s, -1);
 
+       bkt = &s->ls_bkts[lu_bkt_hash(s, f)];
        cfs_hash_bd_get(hs, f, &bd);
-       bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
        if (!(conf && conf->loc_flags & LOC_F_NEW)) {
                cfs_hash_bd_lock(hs, &bd, 1);
                o = htable_lookup(s, &bd, f, &version);
@@ -769,9 +799,9 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
                        if (likely(lu_object_is_inited(o->lo_header)))
                                RETURN(o);
 
-                       l_wait_event(bkt->lsb_waitq,
-                                    lu_object_is_inited(o->lo_header) ||
-                                    lu_object_is_dying(o->lo_header), &lwi);
+                       wait_event_idle(bkt->lsb_waitq,
+                                       lu_object_is_inited(o->lo_header) ||
+                                       lu_object_is_dying(o->lo_header));
 
                        if (lu_object_is_dying(o->lo_header)) {
                                lu_object_put(env, o);
@@ -832,9 +862,9 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
 
        if (!(conf && conf->loc_flags & LOC_F_NEW) &&
            !lu_object_is_inited(shadow->lo_header)) {
-               l_wait_event(bkt->lsb_waitq,
-                            lu_object_is_inited(shadow->lo_header) ||
-                            lu_object_is_dying(shadow->lo_header), &lwi);
+               wait_event_idle(bkt->lsb_waitq,
+                               lu_object_is_inited(shadow->lo_header) ||
+                               lu_object_is_dying(shadow->lo_header));
 
                if (lu_object_is_dying(shadow->lo_header)) {
                        lu_object_put(env, shadow);
@@ -1088,7 +1118,6 @@ EXPORT_SYMBOL(lu_dev_del_linkage);
 int lu_site_init(struct lu_site *s, struct lu_device *top)
 {
        struct lu_site_bkt_data *bkt;
-       struct cfs_hash_bd bd;
        char name[16];
        unsigned long bits;
        unsigned int i;
@@ -1111,7 +1140,7 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
             bits >= LU_SITE_BITS_MIN; bits--) {
                s->ls_obj_hash = cfs_hash_create(name, bits, bits,
                                                 bits - LU_SITE_BKT_BITS,
-                                                sizeof(*bkt), 0, 0,
+                                                0, 0, 0,
                                                 &lu_site_hash_ops,
                                                 CFS_HASH_SPIN_BKTLOCK |
                                                 CFS_HASH_NO_ITEMREF |
@@ -1127,16 +1156,30 @@ int lu_site_init(struct lu_site *s, struct lu_device *top)
                return -ENOMEM;
        }
 
-       cfs_hash_for_each_bucket(s->ls_obj_hash, &bd, i) {
-               bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, &bd);
+       s->ls_bkt_seed = prandom_u32();
+       s->ls_bkt_cnt = max_t(long, 1 << LU_SITE_BKT_BITS,
+                             2 * num_possible_cpus());
+       s->ls_bkt_cnt = roundup_pow_of_two(s->ls_bkt_cnt);
+       OBD_ALLOC_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*bkt));
+       if (!s->ls_bkts) {
+               cfs_hash_putref(s->ls_obj_hash);
+               s->ls_obj_hash = NULL;
+               s->ls_bkts = NULL;
+               return -ENOMEM;
+       }
+
+       for (i = 0; i < s->ls_bkt_cnt; i++) {
+               bkt = &s->ls_bkts[i];
                INIT_LIST_HEAD(&bkt->lsb_lru);
                init_waitqueue_head(&bkt->lsb_waitq);
        }
 
         s->ls_stats = lprocfs_alloc_stats(LU_SS_LAST_STAT, 0);
         if (s->ls_stats == NULL) {
-                cfs_hash_putref(s->ls_obj_hash);
+               OBD_FREE_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*bkt));
+               cfs_hash_putref(s->ls_obj_hash);
                 s->ls_obj_hash = NULL;
+               s->ls_bkts = NULL;
                 return -ENOMEM;
         }
 
@@ -1184,6 +1227,8 @@ void lu_site_fini(struct lu_site *s)
                 s->ls_obj_hash = NULL;
         }
 
+       OBD_FREE_LARGE(s->ls_bkts, s->ls_bkt_cnt * sizeof(*s->ls_bkts));
+
         if (s->ls_top_dev != NULL) {
                 s->ls_top_dev->ld_site = NULL;
                 lu_ref_del(&s->ls_top_dev->ld_reference, "site-top", s);
@@ -2059,37 +2104,21 @@ typedef struct lu_site_stats{
 } lu_site_stats_t;
 
 static void lu_site_stats_get(const struct lu_site *s,
-                              lu_site_stats_t *stats, int populated)
+                             lu_site_stats_t *stats)
 {
-       struct cfs_hash *hs = s->ls_obj_hash;
-       struct cfs_hash_bd bd;
-       unsigned int i;
+       int cnt = cfs_hash_size_get(s->ls_obj_hash);
        /*
         * percpu_counter_sum_positive() won't accept a const pointer
         * as it does modify the struct by taking a spinlock
         */
        struct lu_site *s2 = (struct lu_site *)s;
 
-       stats->lss_busy += cfs_hash_size_get(hs) -
+       stats->lss_busy += cnt -
                percpu_counter_sum_positive(&s2->ls_lru_len_counter);
-        cfs_hash_for_each_bucket(hs, &bd, i) {
-               struct hlist_head *hhead;
-
-                cfs_hash_bd_lock(hs, &bd, 1);
-                stats->lss_total += cfs_hash_bd_count_get(&bd);
-                stats->lss_max_search = max((int)stats->lss_max_search,
-                                            cfs_hash_bd_depmax_get(&bd));
-                if (!populated) {
-                        cfs_hash_bd_unlock(hs, &bd, 1);
-                        continue;
-                }
 
-                cfs_hash_bd_for_each_hlist(hs, &bd, hhead) {
-                       if (!hlist_empty(hhead))
-                                stats->lss_populated++;
-                }
-                cfs_hash_bd_unlock(hs, &bd, 1);
-        }
+       stats->lss_total += cnt;
+       stats->lss_max_search = 0;
+       stats->lss_populated = 0;
 }
 
 
@@ -2194,10 +2223,6 @@ static int lu_cache_shrink(SHRINKER_ARGS(sc, nr_to_scan, gfp_mask))
                 .nr_to_scan = shrink_param(sc, nr_to_scan),
                 .gfp_mask   = shrink_param(sc, gfp_mask)
        };
-#if !defined(HAVE_SHRINKER_WANT_SHRINK_PTR) && !defined(HAVE_SHRINK_CONTROL)
-       struct shrinker* shrinker = NULL;
-#endif
-
 
        CDEBUG(D_INODE, "Shrink %lu objects\n", scv.nr_to_scan);
 
@@ -2350,7 +2375,7 @@ int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
        lu_site_stats_t stats;
 
        memset(&stats, 0, sizeof(stats));
-       lu_site_stats_get(s, &stats, 1);
+       lu_site_stats_get(s, &stats);
 
        seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
                   stats.lss_busy,