From: Patrick Farrell Date: Wed, 4 Dec 2024 23:49:23 +0000 (-0500) Subject: LU-17814 utils: implement thread pool X-Git-Tag: 2.16.56~24 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=23d9d6b1fb7de300541f39718ca4e3b1f1c5db57;p=fs%2Flustre-release.git LU-17814 utils: implement thread pool Implement thread pool for parallel find - still uses regular find code to do the actual find. Test-Parameters: trivial Signed-off-by: Patrick Farrell Change-Id: If5ebd3b52b93fc54dd4ab86bd8dfe06c4dcc0c11 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/57292 Reviewed-by: Andreas Dilger Reviewed-by: Marc Vef Reviewed-by: Oleg Drokin Tested-by: jenkins Tested-by: Maloo --- diff --git a/lustre/include/lustre/lustreapi.h b/lustre/include/lustre/lustreapi.h index 15ee893..ae49e7f 100644 --- a/lustre/include/lustre/lustreapi.h +++ b/lustre/include/lustre/lustreapi.h @@ -432,6 +432,26 @@ struct find_param { unsigned long int fp_skip_percent; unsigned long long fp_skip_total; unsigned long long fp_skip_count; + struct find_work_queue *fp_queue; +}; + +/* Work unit for parallel directory processing */ +struct find_work_unit { + struct find_work_unit *fwu_next; + struct find_param *fwu_param; + struct dirent64 *fwu_de; + char *fwu_path; +}; + +/* Work queue for managing parallel processing */ +struct find_work_queue { + struct find_work_unit *fwq_head; /* Take work from head */ + struct find_work_unit *fwq_tail; /* ... add to tail */ + pthread_mutex_t fwq_lock; + pthread_cond_t fwq_sleep_cond; + int fwq_active_units; /* Atomic counter, active work units */ + bool fwq_shutdown; /* Flag to signal shutdown */ + int fwq_error; }; int llapi_ostlist(char *path, struct find_param *param); diff --git a/lustre/utils/Makefile.am b/lustre/utils/Makefile.am index d95c416..23be310 100644 --- a/lustre/utils/Makefile.am +++ b/lustre/utils/Makefile.am @@ -102,10 +102,15 @@ liblustreapi_la_SOURCES = liblustreapi.c liblustreapi_hsm.c \ liblustreapi_kernelconn.c liblustreapi_param.c \ liblustreapi_mirror.c liblustreapi_fid.c \ liblustreapi_ladvise.c liblustreapi_chlg.c \ - liblustreapi_heat.c liblustreapi_pcc.c \ - liblustreapi_ioctl.c liblustreapi_root.c \ - liblustreapi_lseek.c liblustreapi_swap.c \ - libhsm_scanner.h libhsm_scanner.c + liblustreapi_heat.c \ + liblustreapi_ioctl.c \ + liblustreapi_lseek.c \ + liblustreapi_pcc.c \ + liblustreapi_pfind.c \ + liblustreapi_root.c \ + libhsm_scanner.h \ + libhsm_scanner.c \ + liblustreapi_swap.c liblustreapi_la_CFLAGS = -fPIC -D_GNU_SOURCE $(LIBNL3_CFLAGS) \ -I $(top_builddir)/lnet/utils \ -D_LARGEFILE64_SOURCE=1 -D_FILE_OFFSET_BITS=64 \ diff --git a/lustre/utils/liblustreapi.c b/lustre/utils/liblustreapi.c index ada7bfe..f0e79e7 100644 --- a/lustre/utils/liblustreapi.c +++ b/lustre/utils/liblustreapi.c @@ -1521,9 +1521,6 @@ err: return rc; } -typedef int (semantic_func_t)(char *path, int p, int *d, - void *data, struct dirent64 *de); - #define OBD_NOT_FOUND (-1) static bool lmv_is_foreign(__u32 magic) @@ -1596,8 +1593,8 @@ static int common_param_init(struct find_param *param, char *path) return 0; } -static int cb_common_fini(char *path, int p, int *dp, void *data, - struct dirent64 *de) +int cb_common_fini(char *path, int p, int *dp, void *data, + struct dirent64 *de) { struct find_param *param = data; @@ -2153,8 +2150,8 @@ err: return ret; } -static int param_callback(char *path, semantic_func_t sem_init, - semantic_func_t sem_fini, struct find_param *param) +int param_callback(char *path, semantic_func_t sem_init, + semantic_func_t sem_fini, struct find_param *param) { int ret, len = strlen(path); char *buf; @@ -5925,8 +5922,8 @@ static int check_file_permissions(const struct find_param *param, return 1; } -static int cb_find_init(char *path, int p, int *dp, - void *data, struct dirent64 *de) +int cb_find_init(char *path, int p, int *dp, + void *data, struct dirent64 *de) { struct find_param *param = (struct find_param *)data; struct lov_user_mds_data *lmd = param->fp_lmd; @@ -6781,7 +6778,7 @@ check_single: * @param[in] param Structure containing info about invocation of lfs find * @return None */ -static void validate_printf_str(struct find_param *param) +void validate_printf_str(struct find_param *param) { char *c = param->fp_format_printf_str; int ret = 0; @@ -6807,7 +6804,12 @@ int llapi_find(char *path, struct find_param *param) { if (param->fp_format_printf_str) validate_printf_str(param); - return param_callback(path, cb_find_init, cb_common_fini, param); + if (param->fp_thread_count) { + return parallel_find(path, param); + } else { + return param_callback(path, cb_find_init, cb_common_fini, + param); + } } /* diff --git a/lustre/utils/liblustreapi_pfind.c b/lustre/utils/liblustreapi_pfind.c new file mode 100644 index 0000000..bbc854e --- /dev/null +++ b/lustre/utils/liblustreapi_pfind.c @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: LGPL-2.1+ +/* + * Copyright (c) 2024 DataDirect Networks Storage, Inc. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * + * implement thread worker pool for parallel work queue operations. + * + * Author: Patrick Farrell + */ + +#include +#include "lustreapi_internal.h" +#include "lstddef.h" + +/* Placeholder worker function - just for testing thread creation */ +static void *find_worker(void *arg) +{ + struct find_work_queue *queue = (struct find_work_queue *) arg; + + /* TODO: Implement actual work processing */ + while (!queue->fwq_shutdown) + sleep(1); /* Just keep thread alive for now */ + + return NULL; +} + +/* Initialize the work queue */ +static void find_work_queue_init(struct find_work_queue *queue) +{ + queue->fwq_head = NULL; + queue->fwq_tail = NULL; + queue->fwq_active_units = 0; + pthread_mutex_init(&queue->fwq_lock, NULL); + pthread_cond_init(&queue->fwq_sleep_cond, NULL); + queue->fwq_shutdown = false; + queue->fwq_error = 0; +} + +static int find_threads_init(pthread_t *threads, struct find_work_queue *queue, + int numthreads) +{ + int ret; + int i; + + for (i = 0; i < numthreads; i++) { + ret = pthread_create(&threads[i], NULL, find_worker, queue); + if (ret) { + /* Set shutdown flag for any created threads */ + queue->fwq_shutdown = true; + /* wake up queue... */ + pthread_cond_broadcast(&queue->fwq_sleep_cond); + /* Wait for already-created threads to exit */ + while (--i >= 0) + pthread_join(threads[i], NULL); + return -ENOMEM; + } + } + + return 0; +} + +int parallel_find(char *path, struct find_param *param) +{ + struct find_work_queue queue; + pthread_t *threads = NULL; + int numthreads = param->fp_thread_count; + int rc; + int i; + + if (param->fp_format_printf_str) + validate_printf_str(param); + + /* require at least one thread */ + if (numthreads < 1) + return -EINVAL; + + find_work_queue_init(&queue); + + threads = malloc(numthreads * sizeof(pthread_t)); + if (!threads) + return -ENOMEM; + + rc = find_threads_init(threads, &queue, numthreads); + if (rc) { + llapi_error(LLAPI_MSG_ERROR, rc, + "Failed to initialize thread pool"); + goto cleanup; + } + /* Normal find - no parallelism yet */ + rc = param_callback(path, cb_find_init, cb_common_fini, param); + + /* Signal shutdown and wait for threads before cleanup */ + queue.fwq_shutdown = true; + pthread_cond_broadcast(&queue.fwq_sleep_cond); + for (i = 0; i < numthreads; i++) + pthread_join(threads[i], NULL); + +cleanup: + free(threads); + pthread_mutex_destroy(&queue.fwq_lock); + pthread_cond_destroy(&queue.fwq_sleep_cond); + + return rc; +} diff --git a/lustre/utils/lustreapi_internal.h b/lustre/utils/lustreapi_internal.h index 60c848a..eec1b7e 100644 --- a/lustre/utils/lustreapi_internal.h +++ b/lustre/utils/lustreapi_internal.h @@ -212,4 +212,16 @@ int get_lmd_info_fd(const char *path, int parentfd, int dirfd, int lov_comp_md_size(struct lov_comp_md_v1 *lcm); int open_parent(const char *path); + +typedef int (semantic_func_t)(char *path, int p, int *d, + void *data, struct dirent64 *de); + +void validate_printf_str(struct find_param *param); +int param_callback(char *path, semantic_func_t sem_init, + semantic_func_t sem_fini, struct find_param *param); +int cb_find_init(char *path, int p, int *dp, + void *data, struct dirent64 *de); +int cb_common_fini(char *path, int p, int *dp, void *data, + struct dirent64 *de); +int parallel_find(char *path, struct find_param *param); #endif /* _LUSTREAPI_INTERNAL_H_ */