Whamcloud - gitweb
LU-4423 obd: backport of lu_object changes upstream 25/32325/3
authorNeilBrown <neilb@suse.com>
Wed, 9 May 2018 02:46:29 +0000 (22:46 -0400)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 7 Jun 2018 20:08:18 +0000 (20:08 +0000)
fold lu_object_new() into lu_object_find_at()

lu_object_new() duplicates a lot of code that is in
lu_object_find_at().
There is no real need for a separate function, it is simpler just
to skip the bits of lu_object_find_at() that we don't
want in the LOC_F_NEW case.

Linux-commit: 775c4dc274343e5e2959fa1171baf2fc01028840

discard extra lru count.

lu_object maintains 2 lru counts.
One is a per-bucket lsb_lru_len.
The other is the per-cpu ls_lru_len_counter.

The only times the per-bucket counters are use are:
 - a debug message when an object is added
 - in lu_site_stats_get when all the counters are combined.

The debug message is not essential, and the per-cpu counter
can be used to get the combined total.

So discard the per-bucket lsb_lru_len.

Linux-commit: e167b370360f8887cf21a2a82f83e7118a2aeb11

make struct lu_site_bkt_data private

This data structure only needs to be public so that
various modules can access a wait queue to wait for object
destruction.
If we provide a function to get the wait queue, rather than the
whole bucket, the structure can be made private.

Linux-commit: bc5e7fb40d36edb95ce8f661596811bec3f7d5cf

Change-Id: I26203f331a0c73ae4e23878eb10b15d9fcf546c5
Signed-off-by: NeilBrown <neilb@suse.com>
Signed-off-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-on: https://review.whamcloud.com/32325
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lu_object.h
lustre/llite/lcommon_cl.c
lustre/lov/lov_object.c
lustre/obdclass/lu_object.c

index 766bf84..5b6a8bd 100644 (file)
@@ -531,31 +531,6 @@ struct lu_object_header {
 
 struct fld;
 
 
 struct fld;
 
-struct lu_site_bkt_data {
-       /**
-        * number of object in this bucket on the lsb_lru list.
-        */
-       long                    lsb_lru_len;
-       /**
-        * LRU list, updated on each access to object. Protected by
-        * bucket lock of lu_site::ls_obj_hash.
-        *
-        * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
-        * moved to the lu_site::ls_lru.prev (this is due to the non-existence
-        * of list_for_each_entry_safe_reverse()).
-        */
-       struct list_head        lsb_lru;
-       /**
-        * Wait-queue signaled when an object in this site is ultimately
-        * destroyed (lu_object_free()). It is used by lu_object_find() to
-        * wait before re-trying when object in the process of destruction is
-        * found in the hash table.
-        *
-        * \see htable_lookup().
-        */
-       wait_queue_head_t       lsb_marche_funebre;
-};
-
 enum {
        LU_SS_CREATED           = 0,
        LU_SS_CACHE_HIT,
 enum {
        LU_SS_CREATED           = 0,
        LU_SS_CACHE_HIT,
@@ -626,14 +601,8 @@ struct lu_site {
        struct percpu_counter   ls_lru_len_counter;
 };
 
        struct percpu_counter   ls_lru_len_counter;
 };
 
-static inline struct lu_site_bkt_data *
-lu_site_bkt_from_fid(struct lu_site *site, struct lu_fid *fid)
-{
-       struct cfs_hash_bd bd;
-
-        cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
-        return cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
-}
+wait_queue_head_t *
+lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid);
 
 static inline struct seq_server_site *lu_site2seq(const struct lu_site *s)
 {
 
 static inline struct seq_server_site *lu_site2seq(const struct lu_site *s)
 {
index 1249be0..597c130 100644 (file)
@@ -204,12 +204,12 @@ static void cl_object_put_last(struct lu_env *env, struct cl_object *obj)
 
        if (unlikely(atomic_read(&header->loh_ref) != 1)) {
                struct lu_site *site = obj->co_lu.lo_dev->ld_site;
 
        if (unlikely(atomic_read(&header->loh_ref) != 1)) {
                struct lu_site *site = obj->co_lu.lo_dev->ld_site;
-               struct lu_site_bkt_data *bkt;
+               wait_queue_head_t *wq;
 
 
-               bkt = lu_site_bkt_from_fid(site, &header->loh_fid);
+               wq = lu_site_wq_from_fid(site, &header->loh_fid);
 
                init_waitqueue_entry(&waiter, current);
 
                init_waitqueue_entry(&waiter, current);
-               add_wait_queue(&bkt->lsb_marche_funebre, &waiter);
+               add_wait_queue(wq, &waiter);
 
                while (1) {
                        set_current_state(TASK_UNINTERRUPTIBLE);
 
                while (1) {
                        set_current_state(TASK_UNINTERRUPTIBLE);
@@ -219,7 +219,7 @@ static void cl_object_put_last(struct lu_env *env, struct cl_object *obj)
                }
 
                set_current_state(TASK_RUNNING);
                }
 
                set_current_state(TASK_RUNNING);
-               remove_wait_queue(&bkt->lsb_marche_funebre, &waiter);
+               remove_wait_queue(wq, &waiter);
        }
 
        cl_object_put(env, obj);
        }
 
        cl_object_put(env, obj);
index 4edcb89..c4138b4 100644 (file)
@@ -282,14 +282,14 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
 {
        struct cl_object        *sub;
        struct lu_site          *site;
 {
        struct cl_object        *sub;
        struct lu_site          *site;
-       struct lu_site_bkt_data *bkt;
+       wait_queue_head_t *wq;
        wait_queue_entry_t *waiter;
 
         LASSERT(r0->lo_sub[idx] == los);
 
        wait_queue_entry_t *waiter;
 
         LASSERT(r0->lo_sub[idx] == los);
 
-        sub  = lovsub2cl(los);
-        site = sub->co_lu.lo_dev->ld_site;
-        bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
+       sub = lovsub2cl(los);
+       site = sub->co_lu.lo_dev->ld_site;
+       wq = lu_site_wq_from_fid(site, &sub->co_lu.lo_header->loh_fid);
 
         cl_object_kill(env, sub);
         /* release a reference to the sub-object and ... */
 
         cl_object_kill(env, sub);
         /* release a reference to the sub-object and ... */
@@ -301,7 +301,7 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
        if (r0->lo_sub[idx] == los) {
                waiter = &lov_env_info(env)->lti_waiter;
                init_waitqueue_entry(waiter, current);
        if (r0->lo_sub[idx] == los) {
                waiter = &lov_env_info(env)->lti_waiter;
                init_waitqueue_entry(waiter, current);
-               add_wait_queue(&bkt->lsb_marche_funebre, waiter);
+               add_wait_queue(wq, waiter);
                set_current_state(TASK_UNINTERRUPTIBLE);
                while (1) {
                        /* this wait-queue is signaled at the end of
                set_current_state(TASK_UNINTERRUPTIBLE);
                while (1) {
                        /* this wait-queue is signaled at the end of
@@ -317,7 +317,7 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
                                break;
                        }
                }
                                break;
                        }
                }
-               remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
+               remove_wait_queue(wq, waiter);
        }
        LASSERT(r0->lo_sub[idx] == NULL);
 }
        }
        LASSERT(r0->lo_sub[idx] == NULL);
 }
index b390140..f17a8ee 100644 (file)
 #include <lu_object.h>
 #include <lu_ref.h>
 
 #include <lu_object.h>
 #include <lu_ref.h>
 
+struct lu_site_bkt_data {
+       /**
+        * LRU list, updated on each access to object. Protected by
+        * bucket lock of lu_site::ls_obj_hash.
+        *
+        * "Cold" end of LRU is lu_site::ls_lru.next. Accessed object are
+        * moved to the lu_site::ls_lru.prev (this is due to the non-existence
+        * of list_for_each_entry_safe_reverse()).
+        */
+       struct list_head                lsb_lru;
+       /**
+        * Wait-queue signaled when an object in this site is ultimately
+        * destroyed (lu_object_free()). It is used by lu_object_find() to
+        * wait before re-trying when object in the process of destruction is
+        * found in the hash table.
+        *
+        * \see htable_lookup().
+        */
+       wait_queue_head_t               lsb_marche_funebre;
+};
+
 enum {
        LU_CACHE_PERCENT_MAX     = 50,
        LU_CACHE_PERCENT_DEFAULT = 20
 enum {
        LU_CACHE_PERCENT_MAX     = 50,
        LU_CACHE_PERCENT_DEFAULT = 20
@@ -86,6 +107,18 @@ MODULE_PARM_DESC(lu_cache_nr, "Maximum number of objects in lu_object cache");
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
 
 static void lu_object_free(const struct lu_env *env, struct lu_object *o);
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx);
 
+wait_queue_head_t *
+lu_site_wq_from_fid(struct lu_site *site, struct lu_fid *fid)
+{
+       struct cfs_hash_bd bd;
+       struct lu_site_bkt_data *bkt;
+
+       cfs_hash_bd_get(site->ls_obj_hash, fid, &bd);
+       bkt = cfs_hash_bd_extra_get(site->ls_obj_hash, &bd);
+       return &bkt->lsb_marche_funebre;
+}
+EXPORT_SYMBOL(lu_site_wq_from_fid);
+
 /**
  * Decrease reference counter on object. If last reference is freed, return
  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
 /**
  * Decrease reference counter on object. If last reference is freed, return
  * object to the cache, unless lu_object_is_dying(o) holds. In the latter
@@ -151,11 +184,9 @@ void lu_object_put(const struct lu_env *env, struct lu_object *o)
            (lu_object_exists(orig) || lu_object_is_cl(orig))) {
                LASSERT(list_empty(&top->loh_lru));
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
            (lu_object_exists(orig) || lu_object_is_cl(orig))) {
                LASSERT(list_empty(&top->loh_lru));
                list_add_tail(&top->loh_lru, &bkt->lsb_lru);
-               bkt->lsb_lru_len++;
                percpu_counter_inc(&site->ls_lru_len_counter);
                percpu_counter_inc(&site->ls_lru_len_counter);
-               CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p, "
-                      "lru_len: %ld\n", orig, top,
-                      site->ls_obj_hash, bkt, bkt->lsb_lru_len);
+               CDEBUG(D_INODE, "Add %p/%p to site lru. hash: %p, bkt: %p\n",
+                      orig, top, site->ls_obj_hash, bkt);
                cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
                return;
        }
                cfs_hash_bd_unlock(site->ls_obj_hash, &bd, 1);
                return;
        }
@@ -214,7 +245,6 @@ void lu_object_unhash(const struct lu_env *env, struct lu_object *o)
 
                        list_del_init(&top->loh_lru);
                        bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
 
                        list_del_init(&top->loh_lru);
                        bkt = cfs_hash_bd_extra_get(obj_hash, &bd);
-                       bkt->lsb_lru_len--;
                        percpu_counter_dec(&site->ls_lru_len_counter);
                }
                cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
                        percpu_counter_dec(&site->ls_lru_len_counter);
                }
                cfs_hash_bd_del_locked(obj_hash, &bd, &top->loh_hash);
@@ -301,15 +331,15 @@ next:
  */
 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 {
  */
 static void lu_object_free(const struct lu_env *env, struct lu_object *o)
 {
-       struct lu_site_bkt_data *bkt;
+       wait_queue_head_t *wq;
        struct lu_site          *site;
        struct lu_object        *scan;
        struct list_head        *layers;
        struct list_head         splice;
 
        struct lu_site          *site;
        struct lu_object        *scan;
        struct list_head        *layers;
        struct list_head         splice;
 
-        site   = o->lo_dev->ld_site;
-        layers = &o->lo_header->loh_layers;
-        bkt    = lu_site_bkt_from_fid(site, &o->lo_header->loh_fid);
+       site = o->lo_dev->ld_site;
+       layers = &o->lo_header->loh_layers;
+       wq = lu_site_wq_from_fid(site, &o->lo_header->loh_fid);
         /*
          * First call ->loo_object_delete() method to release all resources.
          */
         /*
          * First call ->loo_object_delete() method to release all resources.
          */
@@ -338,8 +368,8 @@ static void lu_object_free(const struct lu_env *env, struct lu_object *o)
                o->lo_ops->loo_object_free(env, o);
        }
 
                o->lo_ops->loo_object_free(env, o);
        }
 
-       if (waitqueue_active(&bkt->lsb_marche_funebre))
-               wake_up_all(&bkt->lsb_marche_funebre);
+       if (waitqueue_active(wq))
+               wake_up_all(wq);
 }
 
 /**
 }
 
 /**
@@ -400,7 +430,6 @@ int lu_site_purge_objects(const struct lu_env *env, struct lu_site *s,
                         cfs_hash_bd_del_locked(s->ls_obj_hash,
                                                &bd2, &h->loh_hash);
                        list_move(&h->loh_lru, &dispose);
                         cfs_hash_bd_del_locked(s->ls_obj_hash,
                                                &bd2, &h->loh_hash);
                        list_move(&h->loh_lru, &dispose);
-                       bkt->lsb_lru_len--;
                        percpu_counter_dec(&s->ls_lru_len_counter);
                         if (did_sth == 0)
                                 did_sth = 1;
                        percpu_counter_dec(&s->ls_lru_len_counter);
                         if (did_sth == 0)
                                 did_sth = 1;
@@ -615,7 +644,6 @@ static struct lu_object *htable_lookup(struct lu_site *s,
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        if (!list_empty(&h->loh_lru)) {
                list_del_init(&h->loh_lru);
        lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_HIT);
        if (!list_empty(&h->loh_lru)) {
                list_del_init(&h->loh_lru);
-               bkt->lsb_lru_len--;
                percpu_counter_dec(&s->ls_lru_len_counter);
        }
        return lu_object_top(h);
                percpu_counter_dec(&s->ls_lru_len_counter);
        }
        return lu_object_top(h);
@@ -658,29 +686,6 @@ static void lu_object_limit(const struct lu_env *env,
                              MIN(size - nr, LU_CACHE_NR_MAX_ADJUST), 0);
 }
 
                              MIN(size - nr, LU_CACHE_NR_MAX_ADJUST), 0);
 }
 
-static struct lu_object *lu_object_new(const struct lu_env *env,
-                                      struct lu_device *dev,
-                                      const struct lu_fid *f,
-                                      const struct lu_object_conf *conf)
-{
-       struct lu_object *o;
-       struct cfs_hash *hs;
-       struct cfs_hash_bd bd;
-
-       o = lu_object_alloc(env, dev, f, conf);
-       if (unlikely(IS_ERR(o)))
-               return o;
-
-       hs = dev->ld_site->ls_obj_hash;
-       cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-       cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
-       cfs_hash_bd_unlock(hs, &bd, 1);
-
-       lu_object_limit(env, dev);
-
-       return o;
-}
-
 /**
  * Core logic of lu_object_find*() functions.
  *
 /**
  * Core logic of lu_object_find*() functions.
  *
@@ -717,34 +722,35 @@ struct lu_object *lu_object_find_at(const struct lu_env *env,
         * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
         * just alloc and insert directly.
         *
         * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
         * just alloc and insert directly.
         *
-        * If dying object is found during index search, add @waiter to the
-        * site wait-queue and return ERR_PTR(-EAGAIN).
         */
         */
-       if (conf && conf->loc_flags & LOC_F_NEW)
-               return lu_object_new(env, dev, f, conf);
-
        s  = dev->ld_site;
        hs = s->ls_obj_hash;
        s  = dev->ld_site;
        hs = s->ls_obj_hash;
-       cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
-       o = htable_lookup(s, &bd, f, &version);
-       cfs_hash_bd_unlock(hs, &bd, 1);
-       if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
-               return o;
+       cfs_hash_bd_get(hs, f, &bd);
+       if (!(conf && conf->loc_flags & LOC_F_NEW)) {
+               cfs_hash_bd_lock(hs, &bd, 1);
+               o = htable_lookup(s, &bd, f, &version);
+               cfs_hash_bd_unlock(hs, &bd, 1);
 
 
+               if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
+                       return o;
+       }
        /*
         * Allocate new object. This may result in rather complicated
         * operations, including fld queries, inode loading, etc.
         */
        o = lu_object_alloc(env, dev, f, conf);
        /*
         * Allocate new object. This may result in rather complicated
         * operations, including fld queries, inode loading, etc.
         */
        o = lu_object_alloc(env, dev, f, conf);
-       if (unlikely(IS_ERR(o)))
+       if (IS_ERR(o))
                return o;
 
        LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
        cfs_hash_bd_lock(hs, &bd, 1);
 
                return o;
 
        LASSERT(lu_fid_eq(lu_object_fid(o), f));
 
        cfs_hash_bd_lock(hs, &bd, 1);
 
-       shadow = htable_lookup(s, &bd, f, &version);
-       if (likely(IS_ERR(shadow) && PTR_ERR(shadow) == -ENOENT)) {
+       if (conf && conf->loc_flags & LOC_F_NEW)
+               shadow = ERR_PTR(-ENOENT);
+       else
+               shadow = htable_lookup(s, &bd, f, &version);
+       if (likely(PTR_ERR(shadow) == -ENOENT)) {
                cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
                cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
                cfs_hash_bd_unlock(hs, &bd, 1);
 
@@ -1903,19 +1909,24 @@ typedef struct lu_site_stats{
         unsigned        lss_busy;
 } lu_site_stats_t;
 
         unsigned        lss_busy;
 } lu_site_stats_t;
 
-static void lu_site_stats_get(struct cfs_hash *hs,
+static void lu_site_stats_get(const struct lu_site *s,
                               lu_site_stats_t *stats, int populated)
 {
                               lu_site_stats_t *stats, int populated)
 {
+       struct cfs_hash *hs = s->ls_obj_hash;
        struct cfs_hash_bd bd;
        struct cfs_hash_bd bd;
-       unsigned int  i;
+       unsigned int i;
+       /*
+        * percpu_counter_sum_positive() won't accept a const pointer
+        * as it does modify the struct by taking a spinlock
+        */
+       struct lu_site *s2 = (struct lu_site *)s;
 
 
+       stats->lss_busy += cfs_hash_size_get(hs) -
+               percpu_counter_sum_positive(&s2->ls_lru_len_counter);
         cfs_hash_for_each_bucket(hs, &bd, i) {
         cfs_hash_for_each_bucket(hs, &bd, i) {
-                struct lu_site_bkt_data *bkt = cfs_hash_bd_extra_get(hs, &bd);
-               struct hlist_head       *hhead;
+               struct hlist_head *hhead;
 
                 cfs_hash_bd_lock(hs, &bd, 1);
 
                 cfs_hash_bd_lock(hs, &bd, 1);
-               stats->lss_busy  +=
-                       cfs_hash_bd_count_get(&bd) - bkt->lsb_lru_len;
                 stats->lss_total += cfs_hash_bd_count_get(&bd);
                 stats->lss_max_search = max((int)stats->lss_max_search,
                                             cfs_hash_bd_depmax_get(&bd));
                 stats->lss_total += cfs_hash_bd_count_get(&bd);
                 stats->lss_max_search = max((int)stats->lss_max_search,
                                             cfs_hash_bd_depmax_get(&bd));
@@ -2186,7 +2197,7 @@ int lu_site_stats_seq_print(const struct lu_site *s, struct seq_file *m)
        lu_site_stats_t stats;
 
        memset(&stats, 0, sizeof(stats));
        lu_site_stats_t stats;
 
        memset(&stats, 0, sizeof(stats));
-       lu_site_stats_get(s->ls_obj_hash, &stats, 1);
+       lu_site_stats_get(s, &stats, 1);
 
        seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
                   stats.lss_busy,
 
        seq_printf(m, "%d/%d %d/%d %d %d %d %d %d %d %d\n",
                   stats.lss_busy,