Whamcloud - gitweb
LU-18351 obdclass: jobstat scaling 07/56607/35
authorShaun Tancheff <shaun.tancheff@hpe.com>
Fri, 6 Dec 2024 00:11:50 +0000 (07:11 +0700)
committerOleg Drokin <green@whamcloud.com>
Mon, 16 Dec 2024 08:14:48 +0000 (08:14 +0000)
When there is a large number of jobstats (>100k) list walking in
seq_write does not scale and can trigger cpus soft lock-ups.

Prefer red black trees for easier lookup, removal and simplified
tree walk during seq_write()

Add an rbtree with a generated sequential id as jobs are added
to simplify iteration with restarts. The larger the amount of
data being written via seq_write() the more likely and frequent
a restart is needed to seek to new position which can be done
quickly with an rbtree.

Add an lru list to so periodic trim of jobs does not need to
walk the entire tree of jobs.

Unpatched:
  jobs  50000    100000   200000  350000  500000
  time  2.096s   3.672s   7.504s  11.875s 16.721s

Patched:
  jobs  50000    100000   200000  350000  500000
  time  1.366s   3.296s   6.763s  8.895s  11.965s

HPE-bug-id: LUS-12621
Signed-off-by: Shaun Tancheff <shaun.tancheff@hpe.com>
Change-Id: Icca1365af5db761ed89ee9a8a97ce4ded65b8832
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/56607
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: James Simmons <jsimmons@infradead.org>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
libcfs/autoconf/lustre-libcfs.m4
libcfs/include/libcfs/linux/linux-misc.h
lustre/include/lprocfs_status.h
lustre/include/obd_support.h
lustre/mdt/mdt_lproc.c
lustre/obdclass/lprocfs_jobstats.c
lustre/ofd/ofd_internal.h
lustre/target/tgt_main.c
lustre/tests/sanity.sh

index 356c9aa..7c8be4e 100644 (file)
@@ -2223,6 +2223,33 @@ AC_DEFUN([LIBCFS_NLA_STRLCPY], [
 ]) # LIBCFS_NLA_STRLCPY
 
 #
+# LIBCFS_RB_FIND
+#
+# Kernel v5.11-20-g2d24dd5798d0
+#   rbtree: Add generic add and find helpers
+#
+AC_DEFUN([LIBCFS_SRC_RB_FIND], [
+       LB2_LINUX_TEST_SRC([rb_find], [
+               #include <linux/rbtree.h>
+               static int cmp(const void *key, const struct rb_node *node)
+               {
+                       return 0;
+               }
+       ],[
+               void *key = NULL;
+               struct rb_root *tree = NULL;
+               struct rb_node *node __maybe_unused = rb_find(key, tree, cmp);
+       ])
+])
+AC_DEFUN([LIBCFS_RB_FIND], [
+       LB2_MSG_LINUX_TEST_RESULT([if 'rb_find()' is available],
+       [rb_find], [
+               AC_DEFINE(HAVE_RB_FIND, 1,
+                       ['rb_find()' is available])
+       ])
+]) # LIBCFS_RB_FIND
+
+#
 # LIBCFS_LINUX_FORTIFY_STRING_HEADER
 #
 # Linux v5.11-11104-ga28a6e860c6c
@@ -2642,6 +2669,7 @@ AC_DEFUN([LIBCFS_PROG_LINUX_SRC], [
        LIBCFS_SRC_HAVE_LIST_CMP_FUNC_T
        LIBCFS_SRC_NLA_STRLCPY
        # 5.12
+       LIBCFS_SRC_RB_FIND
        LIBCFS_SRC_LINUX_FORTIFY_STRING_HEADER
        LIBCFS_SRC_HAVE_CIPHER_HEADER
        # 5.13
@@ -2798,6 +2826,7 @@ AC_DEFUN([LIBCFS_PROG_LINUX_RESULTS], [
        LIBCFS_HAVE_LIST_CMP_FUNC_T
        LIBCFS_NLA_STRLCPY
        # 5.12
+       LIBCFS_RB_FIND
        LIBCFS_LINUX_FORTIFY_STRING_HEADER
        LIBCFS_HAVE_CIPHER_HEADER
        # 5.13
index bff9e05..75cc420 100644 (file)
@@ -208,6 +208,95 @@ void cfs_arch_exit(void);
 #define task_is_running(task)          (task->state == TASK_RUNNING)
 #endif
 
+#ifndef HAVE_RB_FIND
+/**
+ * rb_find() - find @key in tree @tree
+ * @key: key to match
+ * @tree: tree to search
+ * @cmp: operator defining the node order
+ *
+ * Returns the rb_node matching @key or NULL.
+ */
+static __always_inline struct rb_node *
+rb_find(const void *key, const struct rb_root *tree,
+       int (*cmp)(const void *key, const struct rb_node *))
+{
+       struct rb_node *node = tree->rb_node;
+
+       while (node) {
+               int c = cmp(key, node);
+
+               if (c < 0)
+                       node = node->rb_left;
+               else if (c > 0)
+                       node = node->rb_right;
+               else
+                       return node;
+       }
+
+       return NULL;
+}
+
+/**
+ * rb_add() - insert @node into @tree
+ * @node: node to insert
+ * @tree: tree to insert @node into
+ * @less: operator defining the (partial) node order
+ */
+static __always_inline void
+rb_add(struct rb_node *node, struct rb_root *tree,
+       bool (*less)(struct rb_node *, const struct rb_node *))
+{
+       struct rb_node **link = &tree->rb_node;
+       struct rb_node *parent = NULL;
+
+       while (*link) {
+               parent = *link;
+               if (less(node, parent))
+                       link = &parent->rb_left;
+               else
+                       link = &parent->rb_right;
+       }
+
+       rb_link_node(node, parent, link);
+       rb_insert_color(node, tree);
+}
+
+/**
+ * rb_find_add() - find equivalent @node in @tree, or add @node
+ * @node: node to look-for / insert
+ * @tree: tree to search / modify
+ * @cmp: operator defining the node order
+ *
+ * Returns the rb_node matching @node, or NULL when no match is found and @node
+ * is inserted.
+ */
+static __always_inline struct rb_node *
+rb_find_add(struct rb_node *node, struct rb_root *tree,
+           int (*cmp)(struct rb_node *, const struct rb_node *))
+{
+       struct rb_node **link = &tree->rb_node;
+       struct rb_node *parent = NULL;
+       int c;
+
+       while (*link) {
+               parent = *link;
+               c = cmp(node, parent);
+
+               if (c < 0)
+                       link = &parent->rb_left;
+               else if (c > 0)
+                       link = &parent->rb_right;
+               else
+                       return parent;
+       }
+
+       rb_link_node(node, parent, link);
+       rb_insert_color(node, tree);
+       return NULL;
+}
+#endif /* !HAVE_RB_FIND */
+
 /* interval tree */
 #ifdef HAVE_INTERVAL_TREE_CACHED
 #define interval_tree_root rb_root_cached
index 356f558..8dbf4cf 100644 (file)
@@ -422,18 +422,31 @@ struct obd_device;
 #define JOBSTATS_NODELOCAL             "nodelocal"
 #define JOBSTATS_SESSION               "session"
 
+enum ojb_info_flags {
+       OJS_CLEANING,           /* job cleaning is in operation */
+       OJS_HEADER,             /* seq_show() header */
+       OJS_ACTIVE_JOBS,        /* set while ojs_jobs > 0 */
+       OJS_FINI,               /* set at _fini */
+};
+
 typedef void (*cntr_init_callback)(struct lprocfs_stats *stats,
                                   unsigned int offset,
                                   enum lprocfs_counter_config cntr_umask);
 struct obd_job_stats {
-       struct cfs_hash        *ojs_hash;       /* hash of jobids */
-       struct list_head        ojs_list;       /* list of job_stat structs */
-       spinlock_t              ojs_lock;       /* protect ojs_list/js_list */
+       struct rb_root          ojs_idtree;     /* root sorted on js_jobid */
+       struct rb_root          ojs_postree;    /* unique id (temporal) root */
+       atomic64_t              ojs_next_pos;   /* generate next unique id */
+       struct rw_semaphore     ojs_rwsem;      /* rbtree locking */
+       struct list_head        ojs_lru;        /* least recently used */
+       struct llist_head       ojs_deleted;    /* zero-ref to be purged */
+       unsigned long           ojs_flags;      /* see: ojb_info_flags */
+       atomic_t                ojs_readers;    /* active readers */
+       spinlock_t              ojs_lock;       /* protect ojs_lru/js_lru */
        ktime_t                 ojs_cleanup_interval;/* 1/2 expiry seconds */
        ktime_t                 ojs_cleanup_last;/* previous cleanup time */
        cntr_init_callback      ojs_cntr_init_fn;/* lprocfs_stats initializer */
        unsigned short          ojs_cntr_num;   /* number of stats in struct */
-       bool                    ojs_cleaning;   /* currently expiring stats */
+       atomic64_t              ojs_jobs;       /* number of jobs */
 };
 
 #ifdef CONFIG_PROC_FS
index a16e2a1..9c0843d 100644 (file)
@@ -66,9 +66,6 @@ extern bool obd_enable_health_write;
 #define HASH_EXP_LOCK_BKT_BITS  5
 #define HASH_EXP_LOCK_CUR_BITS  7
 #define HASH_EXP_LOCK_MAX_BITS  16
-#define HASH_JOB_STATS_BKT_BITS 5
-#define HASH_JOB_STATS_CUR_BITS 7
-#define HASH_JOB_STATS_MAX_BITS 12
 
 /* Timeout definitions */
 #define OBD_TIMEOUT_DEFAULT             100
index 27c5ef8..953badb 100644 (file)
@@ -1580,7 +1580,7 @@ void mdt_counter_incr(struct ptlrpc_request *req, int opcode, long amount)
        if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats != NULL)
                lprocfs_counter_add(exp->exp_nid_stats->nid_stats, opcode,
                                    amount);
-       if (exp->exp_obd && obd2obt(exp->exp_obd)->obt_jobstats.ojs_hash &&
+       if (exp->exp_obd && obd2obt(exp->exp_obd)->obt_jobstats.ojs_cntr_num &&
            (exp_connect_flags(exp) & OBD_CONNECT_JOBSTATS))
                lprocfs_job_stats_log(exp->exp_obd,
                                      lustre_msg_get_jobid(req->rq_reqmsg),
index dcb0c32..42705fb 100644 (file)
@@ -35,6 +35,7 @@
 
 #ifdef CONFIG_PROC_FS
 
+#define JOB_CLEANUP_BATCH 1024
 /*
  * JobID formats & JobID environment variable names for supported
  * job schedulers:
  */
 
 struct job_stat {
-       struct hlist_node       js_hash;        /* hash struct for this jobid */
-       struct list_head        js_list;        /* on ojs_list, with ojs_lock */
+       struct rb_node          js_idnode;      /* js_jobid sorted node */
+       struct rb_node          js_posnode;     /* pos sorted node */
+       struct list_head        js_lru;         /* on ojs_lru, with ojs_lock */
+       struct llist_node       js_deleted;     /* on ojs_deleted w/ojs_lock */
+       u64                     js_pos_id;      /* pos for job stats seq file */
        struct kref             js_refcount;    /* num users of this struct */
        char                    js_jobid[LUSTRE_JOBID_SIZE]; /* job name + NUL*/
        ktime_t                 js_time_init;   /* time of initial stat*/
@@ -73,118 +77,61 @@ struct job_stat {
        struct rcu_head         js_rcu;         /* RCU head for job_reclaim_rcu*/
 };
 
-static unsigned int
-job_stat_hash(struct cfs_hash *hs, const void *key, const unsigned int bits)
-{
-       return cfs_hash_djb2_hash(key, strlen(key), bits);
-}
-
-static void *job_stat_key(struct hlist_node *hnode)
-{
-       struct job_stat *job;
-       job = hlist_entry(hnode, struct job_stat, js_hash);
-       return job->js_jobid;
-}
-
-static int job_stat_keycmp(const void *key, struct hlist_node *hnode)
-{
-       struct job_stat *job;
-       job = hlist_entry(hnode, struct job_stat, js_hash);
-       return (strlen(job->js_jobid) == strlen(key)) &&
-              !strncmp(job->js_jobid, key, strlen(key));
-}
-
-static void *job_stat_object(struct hlist_node *hnode)
-{
-       return hlist_entry(hnode, struct job_stat, js_hash);
-}
-
-static bool job_getref_try(struct job_stat *job)
-{
-       return kref_get_unless_zero(&job->js_refcount);
-}
-
-static void job_stat_get(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-       struct job_stat *job;
-       job = hlist_entry(hnode, struct job_stat, js_hash);
-       kref_get(&job->js_refcount);
-}
-
 static void job_reclaim_rcu(struct rcu_head *head)
 {
        struct job_stat *job = container_of(head, typeof(*job), js_rcu);
+       struct obd_job_stats *stats;
 
+       stats = job->js_jobstats;
        lprocfs_stats_free(&job->js_stats);
        OBD_FREE_PTR(job);
+       if (atomic64_dec_and_test(&stats->ojs_jobs))
+               clear_bit(OJS_ACTIVE_JOBS, &stats->ojs_flags);
 }
 
-static void job_free(struct kref *kref)
+static void job_purge_locked(struct obd_job_stats *stats, unsigned int sched)
 {
-       struct job_stat *job = container_of(kref, struct job_stat,
-                                           js_refcount);
-
-       LASSERT(job->js_jobstats != NULL);
-       spin_lock(&job->js_jobstats->ojs_lock);
-       list_del_rcu(&job->js_list);
-       spin_unlock(&job->js_jobstats->ojs_lock);
+       struct job_stat *job, *n;
+       struct llist_node *entry;
+       unsigned int count = 0;
 
-       call_rcu(&job->js_rcu, job_reclaim_rcu);
-}
+       entry = llist_del_all(&stats->ojs_deleted);
+       if (!entry)
+               return;
 
-static void job_putref(struct job_stat *job)
-{
-       LASSERT(kref_read(&job->js_refcount) > 0);
-       kref_put(&job->js_refcount, job_free);
+       /* ojs_rwsem lock is needed to project rbtree re-balance on erase */
+       llist_for_each_entry_safe(job, n, entry, js_deleted) {
+               rb_erase(&job->js_posnode, &stats->ojs_postree);
+               rb_erase(&job->js_idnode, &stats->ojs_idtree);
+               call_rcu(&job->js_rcu, job_reclaim_rcu);
+
+               if (++count == sched) {
+                       sched = 0;
+                       up_write(&stats->ojs_rwsem);
+                       cond_resched();
+                       down_write(&stats->ojs_rwsem);
+               }
+       }
 }
 
-static void job_stat_put_locked(struct cfs_hash *hs, struct hlist_node *hnode)
+static void job_free(struct kref *kref)
 {
-       struct job_stat *job;
+       struct job_stat *job = container_of(kref, struct job_stat, js_refcount);
+       struct obd_job_stats *stats;
 
-       job = hlist_entry(hnode, struct job_stat, js_hash);
-       job_putref(job);
-}
+       LASSERT(job->js_jobstats);
 
-static void job_stat_exit(struct cfs_hash *hs, struct hlist_node *hnode)
-{
-       CERROR("should not have any items\n");
+       stats = job->js_jobstats;
+       spin_lock(&stats->ojs_lock);
+       list_del_rcu(&job->js_lru);
+       llist_add(&job->js_deleted, &stats->ojs_deleted);
+       spin_unlock(&stats->ojs_lock);
 }
 
-static struct cfs_hash_ops job_stats_hash_ops = {
-       .hs_hash       = job_stat_hash,
-       .hs_key        = job_stat_key,
-       .hs_keycmp     = job_stat_keycmp,
-       .hs_object     = job_stat_object,
-       .hs_get        = job_stat_get,
-       .hs_put_locked = job_stat_put_locked,
-       .hs_exit       = job_stat_exit,
-};
-
-/**
- * Jobstats expiry iterator to clean up old jobids
- *
- * Called for each job_stat structure on this device, it should delete stats
- * older than the specified \a oldest_time in seconds.  If \a oldest_time is
- * in the future then this will delete all statistics (e.g. during shutdown).
- *
- * \param[in] hs       hash of all jobids on this device
- * \param[in] bd       hash bucket containing this jobid
- * \param[in] hnode    hash structure for this jobid
- * \param[in] data     pointer to stats expiry time in seconds
- */
-static int job_cleanup_iter_callback(struct cfs_hash *hs,
-                                    struct cfs_hash_bd *bd,
-                                    struct hlist_node *hnode, void *data)
+static void job_putref(struct job_stat *job)
 {
-       ktime_t oldest_time = *((ktime_t *)data);
-       struct job_stat *job;
-
-       job = hlist_entry(hnode, struct job_stat, js_hash);
-       if (ktime_before(job->js_time_latest, oldest_time))
-               cfs_hash_bd_del_locked(hs, bd, hnode);
-
-       return 0;
+       LASSERT(kref_read(&job->js_refcount) > 0);
+       kref_put(&job->js_refcount, job_free);
 }
 
 /**
@@ -212,11 +159,20 @@ static int job_cleanup_iter_callback(struct cfs_hash *hs,
  */
 static void lprocfs_job_cleanup(struct obd_job_stats *stats, bool clear)
 {
+       struct job_stat *job;
        ktime_t cleanup_interval = stats->ojs_cleanup_interval;
        ktime_t now = ktime_get_real();
        ktime_t oldest;
-
-       if (likely(!clear)) {
+       unsigned int sched = JOB_CLEANUP_BATCH;
+
+       if (unlikely(clear)) {
+               /* user request or shutdown: block until safe to clear */
+               do {
+                       wait_on_bit(&stats->ojs_flags, OJS_CLEANING,
+                                   TASK_UNINTERRUPTIBLE);
+               } while (test_and_set_bit(OJS_CLEANING, &stats->ojs_flags));
+               sched = UINT_MAX;
+       } else {
                /* ojs_cleanup_interval of zero means never clean up stats */
                if (ktime_to_ns(cleanup_interval) == 0)
                        return;
@@ -225,44 +181,32 @@ static void lprocfs_job_cleanup(struct obd_job_stats *stats, bool clear)
                                                cleanup_interval)))
                        return;
 
-               if (stats->ojs_cleaning)
+               /* skip if clean is in progress */
+               if (test_and_set_bit(OJS_CLEANING, &stats->ojs_flags))
                        return;
        }
 
-       spin_lock(&stats->ojs_lock);
-       if (!clear && stats->ojs_cleaning) {
-               spin_unlock(&stats->ojs_lock);
-               return;
-       }
-
-       stats->ojs_cleaning = true;
-       spin_unlock(&stats->ojs_lock);
-
-       /* Can't hold ojs_lock over hash iteration, since it is grabbed by
-        * job_cleanup_iter_callback()
-        *   ->cfs_hash_bd_del_locked()
-        *     ->job_putref()
-        *       ->job_free()
-        *
-        * Holding ojs_lock isn't necessary for safety of the hash iteration,
-        * since locking of the hash is handled internally, but there isn't
-        * any benefit to having multiple threads doing cleanup at one time.
-        *
-        * Subtract or add twice the cleanup_interval, since it is 1/2 the
-        * maximum age.  When clearing all stats, push oldest into the future.
-        */
        cleanup_interval = ktime_add(cleanup_interval, cleanup_interval);
        if (likely(!clear))
                oldest = ktime_sub(now, cleanup_interval);
        else
                oldest = ktime_add(now, cleanup_interval);
-       cfs_hash_for_each_safe(stats->ojs_hash, job_cleanup_iter_callback,
-                              &oldest);
 
-       spin_lock(&stats->ojs_lock);
-       stats->ojs_cleaning = false;
+       /* remove all jobs older oldest */
+       rcu_read_lock();
+       list_for_each_entry_rcu(job, &stats->ojs_lru, js_lru) {
+               if (!ktime_before(job->js_time_latest, oldest))
+                       break;
+               job_putref(job); /* drop ref to initiate removal */
+       }
+       rcu_read_unlock();
        stats->ojs_cleanup_last = ktime_get_real();
-       spin_unlock(&stats->ojs_lock);
+
+       if (down_write_trylock(&stats->ojs_rwsem)) {
+               job_purge_locked(stats, sched);
+               up_write(&stats->ojs_rwsem);
+       }
+       clear_bit(OJS_CLEANING, &stats->ojs_flags);
 }
 
 static struct job_stat *job_alloc(char *jobid, struct obd_job_stats *jobs)
@@ -284,22 +228,173 @@ static struct job_stat *job_alloc(char *jobid, struct obd_job_stats *jobs)
        memcpy(job->js_jobid, jobid, sizeof(job->js_jobid));
        job->js_time_latest = job->js_stats->ls_init;
        job->js_jobstats = jobs;
-       INIT_HLIST_NODE(&job->js_hash);
-       INIT_LIST_HEAD(&job->js_list);
+       RB_CLEAR_NODE(&job->js_idnode);
+       INIT_LIST_HEAD(&job->js_lru);
+       /* open code init_llist_node */
+       job->js_deleted.next = &job->js_deleted;
        kref_init(&job->js_refcount);
+       if (atomic64_inc_return(&jobs->ojs_jobs) == 1)
+               set_bit(OJS_ACTIVE_JOBS, &jobs->ojs_flags);
 
        return job;
 }
 
+static inline int cmp_key_jobid(const void *_key, const struct rb_node *node)
+{
+       struct job_stat *job = container_of(node, struct job_stat, js_idnode);
+       const char *key = (const char *)_key;
+
+       return strcmp(key, job->js_jobid);
+}
+
+/* return the next job in pos_id order or NULL*/
+static struct job_stat *job_get_next_pos(struct job_stat *job)
+{
+       struct rb_node *next = rb_next(&job->js_posnode);
+
+       while (next) {
+               struct job_stat *next_job;
+
+               next_job = container_of(next, struct job_stat, js_posnode);
+               if (kref_get_unless_zero(&next_job->js_refcount))
+                       return next_job;
+
+               /* 'next_job' is going away, try again */
+               if (next)
+                       next = rb_next(next);
+       }
+
+       return NULL;
+}
+
+/* find and add a ref to a job with pos_id <= pos or NULL */
+static struct job_stat *job_find_first_pos(struct obd_job_stats *stats, u64 pos)
+{
+       struct rb_node *node = stats->ojs_postree.rb_node;
+       struct job_stat *found = NULL;
+
+       while (node) {
+               struct job_stat *job;
+
+               job = container_of(node, struct job_stat, js_posnode);
+               if (pos <= job->js_pos_id) {
+                       found = job;
+                       if (pos == job->js_pos_id)
+                               break;
+                       node = node->rb_left;
+               } else {
+                       node = node->rb_right;
+               }
+       }
+       if (found) {
+               if (kref_get_unless_zero(&found->js_refcount))
+                       return found;
+               return job_get_next_pos(found);
+       }
+       return NULL;
+}
+
+/* find and add a ref to a job, returns NULL if the job is being deleted */
+static struct job_stat *job_find(struct obd_job_stats *stats,
+                                const char *key)
+{
+       struct rb_node *node;
+       struct job_stat *job;
+
+       node = rb_find((void *)key, &stats->ojs_idtree, cmp_key_jobid);
+       if (node) {
+               job = container_of(node, struct job_stat, js_idnode);
+               if (kref_get_unless_zero(&job->js_refcount))
+                       return job;
+       }
+       return NULL;
+}
+
+static inline int cmp_node_jobid(struct rb_node *left,
+                                const struct rb_node *node)
+{
+       struct job_stat *key = container_of(left, struct job_stat, js_idnode);
+       struct job_stat *job = container_of(node, struct job_stat, js_idnode);
+
+       return strcmp(key->js_jobid, job->js_jobid);
+}
+
+/* insert a (newly allocated) job into the rbtree
+ * In the case of a collision handle and existing job
+ *  - is being deleted return -EAGAIN
+ *  - is active increment the ref count and return it.
+ * otherwise no collision and the job as added, add reference the new job
+ * and return NULL.
+ */
+static struct job_stat *job_insert(struct obd_job_stats *stats,
+                                  struct job_stat *job)
+{
+       struct rb_node *node;
+
+       node = rb_find_add(&job->js_idnode, &stats->ojs_idtree, cmp_node_jobid);
+       if (node) {
+               struct job_stat *existing_job;
+
+               existing_job = container_of(node, struct job_stat, js_idnode);
+               if (kref_get_unless_zero(&existing_job->js_refcount))
+                       return existing_job;
+               /* entry is being deleted */
+               return ERR_PTR(-EAGAIN);
+       }
+       kref_get(&job->js_refcount);
+
+       return NULL;
+}
+
+static inline int cmp_node_pos(struct rb_node *left, const struct rb_node *node)
+{
+       struct job_stat *key = container_of(left, struct job_stat, js_posnode);
+       struct job_stat *job = container_of(node, struct job_stat, js_posnode);
+
+       if (key->js_pos_id < job->js_pos_id)
+               return -1;
+       else if (key->js_pos_id > job->js_pos_id)
+               return 1;
+       return 0;
+}
+
+static inline void _next_pos_id(struct obd_job_stats *stats,
+                               struct job_stat *job)
+{
+       /* avoid pos clash with 'SEQ_START_TOKEN' */
+       do {
+               job->js_pos_id = atomic64_inc_return(&stats->ojs_next_pos);
+       } while (job->js_pos_id < 2);
+}
+
+/* insert a job into the rbtree, return NULL if added otherwise existing job */
+static void job_insert_pos(struct obd_job_stats *stats, struct job_stat *job)
+{
+       struct rb_node *node;
+
+       /* on wrapping u64 insert could fail so advance pos_id need
+        * to fill in gaps
+        */
+       do {
+               _next_pos_id(stats, job);
+               node = rb_find_add(&job->js_posnode, &stats->ojs_postree,
+                                  cmp_node_pos);
+       } while (node);
+}
+
 int lprocfs_job_stats_log(struct obd_device *obd, char *jobid,
                          int event, long amount)
 {
        struct obd_job_stats *stats = &obd2obt(obd)->obt_jobstats;
-       struct job_stat *job, *job2;
+       struct job_stat *job, *existing_job;
+       bool mru_last = false;
        ENTRY;
 
-       LASSERT(stats != NULL);
-       LASSERT(stats->ojs_hash != NULL);
+       LASSERT(stats);
+
+       /* do not add jobs while shutting down */
+       if (test_bit(OJS_FINI, &stats->ojs_flags))
+               RETURN(0);
 
        if (event >= stats->ojs_cntr_num)
                RETURN(-EINVAL);
@@ -314,38 +409,53 @@ int lprocfs_job_stats_log(struct obd_device *obd, char *jobid,
                RETURN(-EINVAL);
        }
 
-       job = cfs_hash_lookup(stats->ojs_hash, jobid);
+       down_read(&stats->ojs_rwsem);
+       job = job_find(stats, jobid);
+       up_read(&stats->ojs_rwsem);
        if (job)
                goto found;
 
        lprocfs_job_cleanup(stats, false);
 
        job = job_alloc(jobid, stats);
-       if (job == NULL)
+       if (!job)
                RETURN(-ENOMEM);
 
-       job2 = cfs_hash_findadd_unique(stats->ojs_hash, job->js_jobid,
-                                      &job->js_hash);
-       if (job2 != job) {
-               job_putref(job);
-               job = job2;
-               /* We cannot LASSERT(!list_empty(&job->js_list)) here,
-                * since we just lost the race for inserting "job" into the
-                * ojs_list, and some other thread is doing it _right_now_.
-                * Instead, be content the other thread is doing this, since
-                * "job2" was initialized in job_alloc() already. LU-2163 */
-       } else {
-               LASSERT(list_empty(&job->js_list));
-               spin_lock(&stats->ojs_lock);
-               list_add_tail_rcu(&job->js_list, &stats->ojs_list);
-               spin_unlock(&stats->ojs_lock);
+try_insert:
+       down_write(&stats->ojs_rwsem);
+       job_purge_locked(stats, UINT_MAX);
+       existing_job = job_insert(stats, job);
+       if (IS_ERR(existing_job) && PTR_ERR(existing_job) == -EAGAIN) {
+               up_write(&stats->ojs_rwsem);
+               goto try_insert;
+       }
+       /* on collision drop the old job and proceed with the existing job */
+       if (existing_job) {
+               job_putref(job); /* duplicate job, remove */
+               job = existing_job;
+               up_write(&stats->ojs_rwsem);
+               goto found;
        }
+       job_insert_pos(stats, job);
+       LASSERT(list_empty(&job->js_lru));
+       spin_lock(&stats->ojs_lock);
+       list_add_tail_rcu(&job->js_lru, &stats->ojs_lru);
+       mru_last = true;
+       spin_unlock(&stats->ojs_lock);
+       up_write(&stats->ojs_rwsem);
 
 found:
        LASSERT(stats == job->js_jobstats);
        job->js_time_latest = ktime_get_real();
+       if (!mru_last) {
+               spin_lock(&stats->ojs_lock);
+               list_del_rcu(&job->js_lru);
+               list_add_tail_rcu(&job->js_lru, &stats->ojs_lru);
+               spin_unlock(&stats->ojs_lock);
+       }
        lprocfs_counter_add(job->js_stats, event, amount);
 
+       /* drop the extra ref from find | insert */
        job_putref(job);
 
        RETURN(0);
@@ -355,90 +465,85 @@ EXPORT_SYMBOL(lprocfs_job_stats_log);
 void lprocfs_job_stats_fini(struct obd_device *obd)
 {
        struct obd_job_stats *stats = &obd2obt(obd)->obt_jobstats;
+       struct job_stat *job, *n;
+       int retry = 0;
+       bool purge = false;
 
-       if (stats->ojs_hash == NULL)
-               return;
+       set_bit(OJS_FINI, &stats->ojs_flags);
+       do {
+               lprocfs_job_cleanup(stats, true);
+               down_write(&stats->ojs_rwsem);
+               job_purge_locked(stats, UINT_MAX);
+               up_write(&stats->ojs_rwsem);
+               rcu_barrier();
+               purge = false;
+
+               rbtree_postorder_for_each_entry_safe(job, n,
+                                                    &stats->ojs_idtree,
+                                                    js_idnode) {
+                       if (kref_read(&job->js_refcount) > 0) {
+                               job_putref(job); /* drop ref */
+                               purge = true;
+                       }
+               }
+
+               rbtree_postorder_for_each_entry_safe(job, n,
+                                                    &stats->ojs_postree,
+                                                    js_posnode) {
+                       if (kref_read(&job->js_refcount) > 0) {
+                               job_putref(job); /* drop ref */
+                               purge = true;
+                       }
+               }
 
-       lprocfs_job_cleanup(stats, true);
-       cfs_hash_putref(stats->ojs_hash);
-       stats->ojs_hash = NULL;
-       LASSERT(list_empty(&stats->ojs_list));
+               if (atomic64_read(&stats->ojs_jobs))
+                       purge = true;
+       } while (purge && retry++ < 3);
+       wait_on_bit_timeout(&stats->ojs_flags, OJS_ACTIVE_JOBS,
+                           TASK_UNINTERRUPTIBLE, cfs_time_seconds(30));
+       rcu_barrier();
+       LASSERTF(atomic64_read(&stats->ojs_jobs) == 0, "jobs:%llu flags:%lx\n",
+                (long long)atomic64_read(&stats->ojs_jobs), stats->ojs_flags);
+       LASSERT(RB_EMPTY_ROOT(&stats->ojs_idtree));
+       LASSERT(list_empty(&stats->ojs_lru));
+       LASSERT(llist_empty(&stats->ojs_deleted));
 }
 EXPORT_SYMBOL(lprocfs_job_stats_fini);
 
-
-struct lprocfs_jobstats_data {
-       struct obd_job_stats *pjd_stats;
-       loff_t pjd_last_pos;
-       struct job_stat *pjd_last_job;
-};
-
 static void *lprocfs_jobstats_seq_start(struct seq_file *p, loff_t *pos)
 {
-       struct lprocfs_jobstats_data *data = p->private;
-       struct obd_job_stats *stats = data->pjd_stats;
-       loff_t off = *pos;
-       struct job_stat *job;
-
-       rcu_read_lock();
-       if (off == 0)
-               return SEQ_START_TOKEN;
+       struct obd_job_stats *stats = p->private;
+       struct job_stat *start;
 
-       /* if pos matches the offset of last saved job, start from saved job */
-       if (data->pjd_last_job && data->pjd_last_pos == off)
-               return data->pjd_last_job;
+       down_read(&stats->ojs_rwsem);
+       if (*pos == 0)
+               set_bit(OJS_HEADER, &stats->ojs_flags);
+       start = job_find_first_pos(stats, *pos);
+       if (start)
+               *pos = start->js_pos_id;
 
-       off--;
-       list_for_each_entry_rcu(job, &stats->ojs_list, js_list) {
-               if (!off--)
-                       return job;
-       }
-       return NULL;
+       return start;
 }
 
-static void lprocfs_jobstats_seq_stop(struct seq_file *p, void *v)
+static void *lprocfs_jobstats_seq_next(struct seq_file *p, void *v, loff_t *pos)
 {
-       struct lprocfs_jobstats_data *data = p->private;
-       struct job_stat *job = NULL;
-
-       /* try to get a ref on current job (not deleted) */
-       if (v && v != SEQ_START_TOKEN && job_getref_try(v))
-               job = v;
+       struct job_stat *job = v, *next = NULL;
 
-       rcu_read_unlock();
-
-       /* drop the ref on the old saved job */
-       if (data->pjd_last_job) {
-               job_putref(data->pjd_last_job);
-               data->pjd_last_job = NULL;
-       }
+       ++*pos;
+       if (!job)
+               return next;
+       next = job_get_next_pos(job);
+       if (next)
+               *pos = next->js_pos_id;
 
-       /* save the current job for the next read */
-       if (job)
-               data->pjd_last_job = job;
+       return next;
 }
 
-static void *lprocfs_jobstats_seq_next(struct seq_file *p, void *v, loff_t *pos)
+static void lprocfs_jobstats_seq_stop(struct seq_file *p, void *v)
 {
-       struct lprocfs_jobstats_data *data = p->private;
-       struct obd_job_stats *stats = data->pjd_stats;
-       struct job_stat *job;
-       struct list_head *cur;
+       struct obd_job_stats *stats = p->private;
 
-       ++*pos;
-       data->pjd_last_pos = *pos;
-       if (v == SEQ_START_TOKEN) {
-               cur = &stats->ojs_list;
-       } else {
-               job = (struct job_stat *)v;
-               cur = &job->js_list;
-       }
-
-       job = list_entry_rcu(cur->next, struct job_stat, js_list);
-       if (&job->js_list == &stats->ojs_list)
-               return NULL;
-
-       return job;
+       up_read(&stats->ojs_rwsem);
 }
 
 /*
@@ -487,6 +592,7 @@ static inline int width(const char *str, int len)
 
 static int lprocfs_jobstats_seq_show(struct seq_file *p, void *v)
 {
+       struct obd_job_stats *stats = p->private;
        struct job_stat *job = v;
        struct lprocfs_stats *s;
        struct lprocfs_counter ret;
@@ -495,10 +601,11 @@ static int lprocfs_jobstats_seq_show(struct seq_file *p, void *v)
        char *quote = "", *c, *end;
        int i, joblen = 0;
 
-       if (v == SEQ_START_TOKEN) {
-               seq_puts(p, "job_stats:\n");
+       if (v == SEQ_START_TOKEN)
                return 0;
-       }
+
+       if (test_and_clear_bit(OJS_HEADER, &stats->ojs_flags))
+               seq_puts(p, "job_stats:\n");
 
        /* Quote and escape jobid characters to escape hex codes "\xHH" if
         * it contains any non-standard characters (space, newline, etc),
@@ -586,6 +693,7 @@ static int lprocfs_jobstats_seq_show(struct seq_file *p, void *v)
                }
                seq_puts(p, " }\n");
        }
+       job_putref(job);
 
        return 0;
 }
@@ -599,23 +707,20 @@ static const struct seq_operations lprocfs_jobstats_seq_sops = {
 
 static int lprocfs_jobstats_seq_open(struct inode *inode, struct file *file)
 {
-       struct lprocfs_jobstats_data *data = NULL;
        struct seq_file *seq;
+       struct obd_job_stats *stats;
        int rc;
 
        rc = seq_open(file, &lprocfs_jobstats_seq_sops);
        if (rc)
                return rc;
 
-       OBD_ALLOC_PTR(data);
-       if (!data)
-               return -ENOMEM;
-
-       data->pjd_stats = pde_data(inode);
-       data->pjd_last_job = NULL;
-       data->pjd_last_pos = 0;
+       stats = pde_data(inode);
+       /* wait for any active cleaning to finish */
+       set_bit(OJS_HEADER, &stats->ojs_flags);
        seq = file->private_data;
-       seq->private = data;
+       seq->private = stats;
+
        return 0;
 }
 
@@ -624,8 +729,7 @@ static ssize_t lprocfs_jobstats_seq_write(struct file *file,
                                          size_t len, loff_t *off)
 {
        struct seq_file *seq = file->private_data;
-       struct lprocfs_jobstats_data *data = seq->private;
-       struct obd_job_stats *stats = data->pjd_stats;
+       struct obd_job_stats *stats = seq->private;
        char jobid[4 * LUSTRE_JOBID_SIZE]; /* all escaped chars, plus ""\n\0 */
        char *p1, *p2, *last;
        unsigned int c;
@@ -634,7 +738,7 @@ static ssize_t lprocfs_jobstats_seq_write(struct file *file,
        if (len == 0 || len >= 4 * LUSTRE_JOBID_SIZE)
                return -EINVAL;
 
-       if (stats->ojs_hash == NULL)
+       if (!stats->ojs_cntr_num)
                return -ENODEV;
 
        if (copy_from_user(jobid, buf, len))
@@ -675,13 +779,14 @@ static ssize_t lprocfs_jobstats_seq_write(struct file *file,
        if (strlen(jobid) == 0)
                return -EINVAL;
 
-       job = cfs_hash_lookup(stats->ojs_hash, jobid);
+       down_read(&stats->ojs_rwsem);
+       job = job_find(stats, jobid);
+       up_read(&stats->ojs_rwsem);
        if (!job)
                return -EINVAL;
+       job_putref(job); /* drop ref from job_find() */
+       job_putref(job); /* drop ref to initiate removal */
 
-       cfs_hash_del_key(stats->ojs_hash, jobid);
-
-       job_putref(job);
        return len;
 }
 
@@ -700,17 +805,9 @@ static ssize_t lprocfs_jobstats_seq_write(struct file *file,
 static int lprocfs_jobstats_seq_release(struct inode *inode, struct file *file)
 {
        struct seq_file *seq = file->private_data;
-       struct lprocfs_jobstats_data *data = seq->private;
-
-       /* drop the ref of last saved job */
-       if (data->pjd_last_job) {
-               job_putref(data->pjd_last_job);
-               data->pjd_last_pos = 0;
-               data->pjd_last_job = NULL;
-       }
+       struct obd_job_stats *stats = seq->private;
 
-       lprocfs_job_cleanup(data->pjd_stats, false);
-       OBD_FREE_PTR(data);
+       lprocfs_job_cleanup(stats, false);
 
        return lprocfs_seq_release(inode, file);
 }
@@ -748,28 +845,23 @@ int lprocfs_job_stats_init(struct obd_device *obd, int cntr_num,
                RETURN(-EINVAL);
        }
        stats = &obd2obt(obd)->obt_jobstats;
-
-       LASSERT(stats->ojs_hash == NULL);
-       stats->ojs_hash = cfs_hash_create("JOB_STATS",
-                                         HASH_JOB_STATS_CUR_BITS,
-                                         HASH_JOB_STATS_MAX_BITS,
-                                         HASH_JOB_STATS_BKT_BITS, 0,
-                                         CFS_HASH_MIN_THETA,
-                                         CFS_HASH_MAX_THETA,
-                                         &job_stats_hash_ops,
-                                         CFS_HASH_DEFAULT);
-       if (stats->ojs_hash == NULL)
-               RETURN(-ENOMEM);
-
-       INIT_LIST_HEAD(&stats->ojs_list);
+       stats->ojs_idtree = RB_ROOT;
+       stats->ojs_postree = RB_ROOT;
+       atomic64_set(&stats->ojs_next_pos, 2);
+       init_rwsem(&stats->ojs_rwsem);
+       INIT_LIST_HEAD(&stats->ojs_lru);
+       init_llist_head(&stats->ojs_deleted);
+       stats->ojs_flags = 0;
+       atomic_set(&stats->ojs_readers, 0);
        spin_lock_init(&stats->ojs_lock);
-       stats->ojs_cntr_num = cntr_num;
-       stats->ojs_cntr_init_fn = init_fn;
        /* Store 1/2 the actual interval, since we use that the most, and
         * it is easier to work with.
         */
        stats->ojs_cleanup_interval = ktime_set(600 / 2, 0); /* default 10 min*/
        stats->ojs_cleanup_last = ktime_get_real();
+       stats->ojs_cntr_num = cntr_num;
+       stats->ojs_cntr_init_fn = init_fn;
+       atomic64_set(&stats->ojs_jobs, 0);
 
        entry = lprocfs_add_simple(obd->obd_proc_entry, "job_stats", stats,
                                   &lprocfs_jobstats_seq_fops);
index 730d86f..e8b8b7f 100644 (file)
@@ -84,7 +84,7 @@ static inline void ofd_counter_incr(struct obd_export *exp, int opcode,
        if (exp->exp_obd && exp->exp_obd->obd_stats)
                lprocfs_counter_add(exp->exp_obd->obd_stats, opcode, amount);
 
-       if (exp->exp_obd && obd2obt(exp->exp_obd)->obt_jobstats.ojs_hash &&
+       if (exp->exp_obd && obd2obt(exp->exp_obd)->obt_jobstats.ojs_cntr_num &&
            (exp_connect_flags(exp) & OBD_CONNECT_JOBSTATS))
                lprocfs_job_stats_log(exp->exp_obd, jobid, opcode, amount);
 
index d0240ef..b8cfa24 100644 (file)
@@ -476,6 +476,7 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut,
        lut->lut_reply_data = NULL;
        lut->lut_reply_bitmap = NULL;
        obt = obd_obt_init(obd);
+       obt->obt_jobstats.ojs_cntr_num = 0;
        obt->obt_lut = lut;
 
        /* set request handler slice and parameters */
index ae0deb6..c6a054d 100755 (executable)
@@ -22215,6 +22215,59 @@ test_205k(){
 }
 run_test 205k "Verify '?' operator on job stats"
 
+test_205l() {
+       [[ $PARALLEL != "yes" ]] || skip "skip parallel run"
+       remote_mds_nodsh && skip "remote MDS with nodsh"
+       [[ "$($LCTL get_param -n mdc.*.connect_flags)" =~ jobstats ]] ||
+               skip "Server doesn't support jobstats"
+       [[ "$JOBID_VAR" != "disable" ]] || skip_env "jobstats is disabled"
+
+       local jobid_save=$($LCTL get_param jobid_var jobid_name)
+       stack_trap "$LCTL set_param $jobid_save"
+       local tmpdir=$(mktemp -d /tmp/jobstat-XXXXXX)
+       local jobs=$tmpdir/jobs.txt
+       local mv_save=${tmpdir}/local_mv
+       local mv_job
+       local n=1
+       local limit=500
+       [[ $SLOW == "no" ]] || limit=500000
+
+       do_facet mds1 $LCTL set_param jobid_var=procname_uid jobid_name='%e.%u'
+       cp -a /etc/hosts $DIR/hosts
+       cp $(which mv) ${mv_save}
+       do_facet mds1 $LCTL set_param mdt.*.job_cleanup_interval=5
+       sleep 5
+       do_facet mds1 $LCTL set_param mdt.*.job_stats=clear
+       do_facet mds1 $LCTL set_param mdt.*.job_cleanup_interval=0
+       sleep 5
+       # Add a series of easily identifyable jobs
+       for ((n = 0; n < limit; n++)); do
+               mv_job=${tmpdir}/mv.$(printf %08d $n)
+               mv ${mv_save} ${mv_job}
+               ${mv_job} $DIR/hosts $DIR/hosts.$(printf %08d $n)
+               ${mv_job} $DIR/hosts.$(printf %08d $n) $DIR/hosts
+               mv ${mv_job} ${mv_save}
+       done
+       # Duplicates indicate restart issues
+       do_facet mds1 \
+               "$LCTL get_param mdt.*.job_stats | grep job_id: | cut -d. -f2" \
+               > ${jobs}
+       local dupes=$(grep -v -e "^${RUNAS_ID}\$" -e '^0$' ${jobs} | sort |
+                     uniq -d | wc -l)
+       (( ${dupes} == 0 )) ||
+               error "seq_write wrote ${dupes} duplicate entries."
+       # Unexpected jobs indicate cleanup issues
+       local njobs=$(grep -v -e "^${RUNAS_ID}\$" -e '^0$' ${jobs} | wc -l)
+       (( ${njobs} == ${limit} )) ||
+               error "seq_write wrote ${njobs} jobs expected ${limit}."
+       do_facet mds1 $LCTL set_param mdt.*.job_cleanup_interval=5
+       sleep 5
+       do_facet mds1 $LCTL set_param mdt.*.job_stats=clear
+       # On success the scrach files are not interesting
+       rm -fr ${tmpdir}
+}
+run_test 205l "Verify job stats can scale"
+
 # LU-1480, LU-1773 and LU-1657
 test_206() {
        mkdir -p $DIR/$tdir