From 5c608841c1bae183767c0eab079530bbbc0fcb1e Mon Sep 17 00:00:00 2001 From: "John L. Hammond" Date: Thu, 24 Feb 2022 10:56:59 -0600 Subject: [PATCH] EX-4015 lipe: batching and threading improvements Remove the current group descriptor mutex from struct scan_control. Use __ATOMIC_RELAXED fetch and add to allocate the next batch of groups. Report the correct start group of the batch in debugging output and remove a redundant batch debug message. Use atomic loads and stores for the ti_should_stop member which is responsible for lipe-scan-break. Check if the current thread should stop in the outer loop of ls3_scan_thread_start_scm() as well as the inner loop of ldiskfs_scan_groups(). Reduce the default scanning thread count from _SC_NPROCESSORS_ONLN / 2 to _SC_NPROCESSORS_ONLN / 4. Test-Parameters: trivial testlist=sanity-lipe-scan3 serverextra_install_params="--packages lipe-scan" facet=mds1 Signed-off-by: John L. Hammond Change-Id: Ic99c27504333f1d63a689e091d857e44062ef584 Reviewed-on: https://review.whamcloud.com/46605 Tested-by: jenkins Tested-by: Maloo --- lipe/src/lipe_scan3/ls3_main.c | 1 + lipe/src/lipe_scan3/ls3_scan.c | 115 +++++++++++++++++++---------------------- 2 files changed, 54 insertions(+), 62 deletions(-) diff --git a/lipe/src/lipe_scan3/ls3_main.c b/lipe/src/lipe_scan3/ls3_main.c index fa0ab67..dcdbbb9 100644 --- a/lipe/src/lipe_scan3/ls3_main.c +++ b/lipe/src/lipe_scan3/ls3_main.c @@ -959,6 +959,7 @@ static void help(void) " --required-attrs=ATTR... only apply policy to inodes with ATTR...\n" " (use '--list-attrs' to see available attributes)\n" " --thread-count=COUNT use COUNT scanning threads\n" +" (default is number of processors / 4)\n" " --list-attrs list available attribute names and exit\n" " --list-json-attrs list available JSON attribute names and exit\n" diff --git a/lipe/src/lipe_scan3/ls3_scan.c b/lipe/src/lipe_scan3/ls3_scan.c index 520d858..fb39847 100644 --- a/lipe/src/lipe_scan3/ls3_scan.c +++ b/lipe/src/lipe_scan3/ls3_scan.c @@ -30,15 +30,21 @@ #include "ls3_fid2path.h" #include "ls3_object_attrs.h" +#define min(x, y) ({ \ + typeof(x) _min1 = (x); \ + typeof(y) _min2 = (y); \ + (void) (&_min1 == &_min2); \ + _min1 < _min2 ? _min1 : _min2; \ +}) + /* XXX We are mixing libext2fs errcode_t (long), pthread positive rcs, * pthread_join (void **) rcs, and errnos. This is why we have all the * intptr_t and intmax_t business. */ struct ls3_scan_control { - pthread_mutex_t sc_group_mutex; - unsigned long sc_group_current; - unsigned long sc_group_end; - unsigned long sc_group_batch; + unsigned long sc_group_desc_current; /* Use __ATOMIC_RELAXED. */ + unsigned long sc_group_desc_count; + unsigned long sc_group_desc_per_batch; struct ls3_thread_info *sc_thread_infos; size_t sc_thread_count; @@ -55,12 +61,13 @@ struct ls3_thread_info { /* Application-defined thread index */ int ti_thread_index; struct ls3_instance *ti_instance; - unsigned long ti_group_end; unsigned long ti_group_start; + unsigned long ti_group_current; + unsigned long ti_group_end; uintmax_t ti_read_attr_error_count; uintmax_t ti_other_error_count; - sig_atomic_t ti_started; - sig_atomic_t ti_should_stop; + int ti_started; + int ti_should_stop; /* Use __ATOMIC_RELAXED. */ }; const char LS3_CSTR_AUTO[] = "AUTO"; @@ -588,27 +595,17 @@ int ls3_read_attrs(struct ls3_instance *li, } static bool ldiskfs_scan_get_next_group_batch(struct ls3_scan_control *sc, - unsigned long *next_start, - unsigned long *next_end) + unsigned long *group_start, + unsigned long *group_end) { - bool found_batch; - - assert(sc->sc_group_current <= sc->sc_group_end); + *group_start = __atomic_fetch_add(&sc->sc_group_desc_current, + sc->sc_group_desc_per_batch, + __ATOMIC_RELAXED); - pthread_mutex_lock(&sc->sc_group_mutex); + *group_end = min(*group_start + sc->sc_group_desc_per_batch, + sc->sc_group_desc_count); - found_batch = sc->sc_group_current < sc->sc_group_end; - if (found_batch) { - *next_start = sc->sc_group_current; - sc->sc_group_current += sc->sc_group_batch; - if (sc->sc_group_current >= sc->sc_group_end) - sc->sc_group_current = sc->sc_group_end; - *next_end = sc->sc_group_current; - } - - pthread_mutex_unlock(&sc->sc_group_mutex); - - return found_batch; + return *group_start < *group_end; } /* @@ -667,20 +664,19 @@ static int ldiskfs_scan_groups(ext2_inode_scan scan, struct ls3_object_attrs *loa, struct lipe_object *lo, struct ext2_inode_large *inode, int inode_size, - struct ls3_thread_info *ti, - unsigned long start_group) + struct ls3_thread_info *ti) { struct ls3_instance *li; - int rc; - - LS3_DEBUG_U(start_group); + errcode_t rc; li = ti->ti_instance; - rc = ext2fs_inode_scan_goto_blockgroup(scan, start_group); + assert(ti->ti_group_start < ti->ti_group_end); + + rc = ext2fs_inode_scan_goto_blockgroup(scan, ti->ti_group_start); if (rc) { LS3_ERROR("cannot goto block group %lu: %s\n", - start_group, error_message(rc)); + ti->ti_group_start, error_message(rc)); goto out; } @@ -694,7 +690,7 @@ static int ldiskfs_scan_groups(ext2_inode_scan scan, if (ino == 0) break; - if (ti->ti_should_stop) { + if (__atomic_load_n(&ti->ti_should_stop, __ATOMIC_RELAXED)) { LS3_DEBUG("thread %d stopping before scan complete\n", ti->ti_thread_index); break; } @@ -759,9 +755,9 @@ static errcode_t ls3_done_group_callback(ext2_filsys fs, ext2_inode_scan scan, { struct ls3_thread_info *ti = data; - ti->ti_group_start++; + ti->ti_group_current++; - return ti->ti_group_start >= ti->ti_group_end; + return ti->ti_group_current >= ti->ti_group_end; } /* Open and scan a slice of an ext2 FS. We have already entered scheme context. */ @@ -823,26 +819,22 @@ static void *ls3_scan_thread_start_scm(void *arg) inode = xcalloc(1, inode_size); lo.u.lo_ldiskfs.lol_inode = inode; - while (1) { - unsigned long start_group; - unsigned long end_group; + while (!__atomic_load_n(&ti->ti_should_stop, __ATOMIC_RELAXED)) { bool found_batch; found_batch = ldiskfs_scan_get_next_group_batch(ti->ti_scan_control, - &start_group, &end_group); + &ti->ti_group_start, + &ti->ti_group_end); if (!found_batch) break; - ti->ti_group_start = start_group; - ti->ti_group_end = end_group; - + ti->ti_group_current = ti->ti_group_start; LS3_DEBUG("begin scan of groups [%lu, %lu)\n", ti->ti_group_start, ti->ti_group_end); ext2fs_set_inode_callback(scan, &ls3_done_group_callback, ti); - rc = ldiskfs_scan_groups(scan, fs, &loa, &lo, inode, inode_size, - ti, start_group); + rc = ldiskfs_scan_groups(scan, fs, &loa, &lo, inode, inode_size, ti); if (rc != 0) goto out_free; @@ -907,7 +899,7 @@ static int ls3_scan_control_start(struct ls3_scan_control *sc, struct ls3_instan break; } - ti->ti_started = true; + ti->ti_started = 1; } out: if (pattr != NULL) { @@ -926,7 +918,7 @@ static void ls3_scan_control_stop(struct ls3_scan_control *sc) int i; for (i = 0; i < sc->sc_thread_count; i++) - sc->sc_thread_infos[i].ti_should_stop = 1; + __atomic_store_n(&sc->sc_thread_infos[i].ti_should_stop, 1, __ATOMIC_RELAXED); } static int ls3_scan_control_join(struct ls3_scan_control *sc) @@ -965,32 +957,31 @@ static int ls3_scan_control_join(struct ls3_scan_control *sc) return rc; } -#define DEFAULT_GROUP_BATCH 10 +enum { + LS3_GROUP_DESC_PER_BATCH_MAX = 10, +}; static intmax_t ls3_scan_fs(struct ls3_instance *li, ext2_filsys fs, size_t thread_count) { - unsigned long group_total, group_batch; + unsigned long group_desc_per_batch; struct ls3_scan_control sc = { .sc_thread_count = 0, .sc_break_rc = INTMAX_MIN, + .sc_group_desc_count = fs->group_desc_count, }; intmax_t rc, rc2; - pthread_mutex_init(&sc.sc_group_mutex, NULL); - - group_total = fs->group_desc_count; - LS3_DEBUG_U(group_total); - sc.sc_group_end = group_total; - sc.sc_group_current = 0; + LS3_DEBUG_U(sc.sc_group_desc_count); /* keep the scan batch in a reasonable small one */ - group_batch = group_total / thread_count; - if (!group_batch) - group_batch = 1; - else if (group_batch > DEFAULT_GROUP_BATCH) - group_batch = DEFAULT_GROUP_BATCH; - LS3_DEBUG_U(group_batch); - sc.sc_group_batch = group_batch; + group_desc_per_batch = sc.sc_group_desc_count / thread_count; + if (group_desc_per_batch == 0) + group_desc_per_batch = 1; + else if (group_desc_per_batch > LS3_GROUP_DESC_PER_BATCH_MAX) + group_desc_per_batch = LS3_GROUP_DESC_PER_BATCH_MAX; + + sc.sc_group_desc_per_batch = group_desc_per_batch; + LS3_DEBUG_U(sc.sc_group_desc_per_batch); rc = ls3_scan_control_start(&sc, li, thread_count); if (rc != 0) { @@ -1276,7 +1267,7 @@ int ls3_scan(const char *device_path, if (thread_count < 0) thread_count = sysconf(_SC_NPROCESSORS_ONLN); else if (thread_count == 0) - thread_count = sysconf(_SC_NPROCESSORS_ONLN) / 2; + thread_count = sysconf(_SC_NPROCESSORS_ONLN) / 4; if (thread_count <= 0) thread_count = 1; -- 1.8.3.1