Whamcloud - gitweb
LU-17814 utils: implement thread pool 92/57292/15
authorPatrick Farrell <pfarrell@whamcloud.com>
Wed, 4 Dec 2024 23:49:23 +0000 (18:49 -0500)
committerOleg Drokin <green@whamcloud.com>
Sat, 7 Jun 2025 22:55:36 +0000 (22:55 +0000)
Implement thread pool for parallel find - still uses
regular find code to do the actual find.

Test-Parameters: trivial
Signed-off-by: Patrick Farrell <pfarrell@whamcloud.com>
Change-Id: If5ebd3b52b93fc54dd4ab86bd8dfe06c4dcc0c11
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/57292
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Marc Vef <mvef@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
lustre/include/lustre/lustreapi.h
lustre/utils/Makefile.am
lustre/utils/liblustreapi.c
lustre/utils/liblustreapi_pfind.c [new file with mode: 0644]
lustre/utils/lustreapi_internal.h

index 15ee893..ae49e7f 100644 (file)
@@ -432,6 +432,26 @@ struct find_param {
        unsigned long int        fp_skip_percent;
        unsigned long long       fp_skip_total;
        unsigned long long       fp_skip_count;
+       struct find_work_queue  *fp_queue;
+};
+
+/* Work unit for parallel directory processing */
+struct find_work_unit {
+       struct find_work_unit *fwu_next;
+       struct find_param *fwu_param;
+       struct dirent64 *fwu_de;
+       char *fwu_path;
+};
+
+/* Work queue for managing parallel processing */
+struct find_work_queue {
+       struct find_work_unit *fwq_head; /* Take work from head */
+       struct find_work_unit *fwq_tail; /* ... add to tail */
+       pthread_mutex_t fwq_lock;
+       pthread_cond_t fwq_sleep_cond;
+       int fwq_active_units;           /* Atomic counter, active work units */
+       bool fwq_shutdown;              /* Flag to signal shutdown */
+       int fwq_error;
 };
 
 int llapi_ostlist(char *path, struct find_param *param);
index d95c416..23be310 100644 (file)
@@ -102,10 +102,15 @@ liblustreapi_la_SOURCES = liblustreapi.c liblustreapi_hsm.c \
                          liblustreapi_kernelconn.c liblustreapi_param.c \
                          liblustreapi_mirror.c liblustreapi_fid.c \
                          liblustreapi_ladvise.c liblustreapi_chlg.c \
-                         liblustreapi_heat.c liblustreapi_pcc.c \
-                         liblustreapi_ioctl.c liblustreapi_root.c \
-                         liblustreapi_lseek.c liblustreapi_swap.c \
-                         libhsm_scanner.h libhsm_scanner.c
+                         liblustreapi_heat.c \
+                         liblustreapi_ioctl.c \
+                         liblustreapi_lseek.c \
+                         liblustreapi_pcc.c \
+                         liblustreapi_pfind.c \
+                         liblustreapi_root.c \
+                         libhsm_scanner.h \
+                         libhsm_scanner.c \
+                         liblustreapi_swap.c
 liblustreapi_la_CFLAGS = -fPIC -D_GNU_SOURCE $(LIBNL3_CFLAGS) \
                         -I $(top_builddir)/lnet/utils \
                         -D_LARGEFILE64_SOURCE=1 -D_FILE_OFFSET_BITS=64 \
index ada7bfe..f0e79e7 100644 (file)
@@ -1521,9 +1521,6 @@ err:
        return rc;
 }
 
-typedef int (semantic_func_t)(char *path, int p, int *d,
-                             void *data, struct dirent64 *de);
-
 #define OBD_NOT_FOUND           (-1)
 
 static bool lmv_is_foreign(__u32 magic)
@@ -1596,8 +1593,8 @@ static int common_param_init(struct find_param *param, char *path)
        return 0;
 }
 
-static int cb_common_fini(char *path, int p, int *dp, void *data,
-                         struct dirent64 *de)
+int cb_common_fini(char *path, int p, int *dp, void *data,
+                  struct dirent64 *de)
 {
        struct find_param *param = data;
 
@@ -2153,8 +2150,8 @@ err:
        return ret;
 }
 
-static int param_callback(char *path, semantic_func_t sem_init,
-                         semantic_func_t sem_fini, struct find_param *param)
+int param_callback(char *path, semantic_func_t sem_init,
+                  semantic_func_t sem_fini, struct find_param *param)
 {
        int ret, len = strlen(path);
        char *buf;
@@ -5925,8 +5922,8 @@ static int check_file_permissions(const struct find_param *param,
                return 1;
 }
 
-static int cb_find_init(char *path, int p, int *dp,
-                       void *data, struct dirent64 *de)
+int cb_find_init(char *path, int p, int *dp,
+                void *data, struct dirent64 *de)
 {
        struct find_param *param = (struct find_param *)data;
        struct lov_user_mds_data *lmd = param->fp_lmd;
@@ -6781,7 +6778,7 @@ check_single:
  * @param[in]  param   Structure containing info about invocation of lfs find
  * @return             None
  */
-static void validate_printf_str(struct find_param *param)
+void validate_printf_str(struct find_param *param)
 {
        char *c = param->fp_format_printf_str;
        int ret = 0;
@@ -6807,7 +6804,12 @@ int llapi_find(char *path, struct find_param *param)
 {
        if (param->fp_format_printf_str)
                validate_printf_str(param);
-       return param_callback(path, cb_find_init, cb_common_fini, param);
+       if (param->fp_thread_count) {
+               return parallel_find(path, param);
+       } else {
+               return param_callback(path, cb_find_init, cb_common_fini,
+                                     param);
+       }
 }
 
 /*
diff --git a/lustre/utils/liblustreapi_pfind.c b/lustre/utils/liblustreapi_pfind.c
new file mode 100644 (file)
index 0000000..bbc854e
--- /dev/null
@@ -0,0 +1,106 @@
+// SPDX-License-Identifier: LGPL-2.1+
+/*
+ * Copyright (c) 2024 DataDirect Networks Storage, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ *
+ * implement thread worker pool for parallel work queue operations.
+ *
+ * Author: Patrick Farrell <pfarrell@whamcloud.com>
+ */
+
+#include <pthread.h>
+#include "lustreapi_internal.h"
+#include "lstddef.h"
+
+/* Placeholder worker function - just for testing thread creation */
+static void *find_worker(void *arg)
+{
+       struct find_work_queue *queue = (struct find_work_queue *) arg;
+
+       /* TODO: Implement actual work processing */
+       while (!queue->fwq_shutdown)
+               sleep(1); /* Just keep thread alive for now */
+
+       return NULL;
+}
+
+/* Initialize the work queue */
+static void find_work_queue_init(struct find_work_queue *queue)
+{
+       queue->fwq_head = NULL;
+       queue->fwq_tail = NULL;
+       queue->fwq_active_units = 0;
+       pthread_mutex_init(&queue->fwq_lock, NULL);
+       pthread_cond_init(&queue->fwq_sleep_cond, NULL);
+       queue->fwq_shutdown = false;
+       queue->fwq_error = 0;
+}
+
+static int find_threads_init(pthread_t *threads, struct find_work_queue *queue,
+                            int numthreads)
+{
+       int ret;
+       int i;
+
+       for (i = 0; i < numthreads; i++) {
+               ret = pthread_create(&threads[i], NULL, find_worker, queue);
+               if (ret) {
+                       /* Set shutdown flag for any created threads */
+                       queue->fwq_shutdown = true;
+                       /* wake up queue... */
+                       pthread_cond_broadcast(&queue->fwq_sleep_cond);
+                       /* Wait for already-created threads to exit */
+                       while (--i >= 0)
+                               pthread_join(threads[i], NULL);
+                       return -ENOMEM;
+               }
+       }
+
+       return 0;
+}
+
+int parallel_find(char *path, struct find_param *param)
+{
+       struct find_work_queue queue;
+       pthread_t *threads = NULL;
+       int numthreads = param->fp_thread_count;
+       int rc;
+       int i;
+
+       if (param->fp_format_printf_str)
+               validate_printf_str(param);
+
+       /* require at least one thread */
+       if (numthreads < 1)
+               return -EINVAL;
+
+       find_work_queue_init(&queue);
+
+       threads = malloc(numthreads * sizeof(pthread_t));
+       if (!threads)
+               return -ENOMEM;
+
+       rc = find_threads_init(threads, &queue, numthreads);
+       if (rc) {
+               llapi_error(LLAPI_MSG_ERROR, rc,
+                           "Failed to initialize thread pool");
+               goto cleanup;
+       }
+       /* Normal find - no parallelism yet */
+       rc = param_callback(path, cb_find_init, cb_common_fini, param);
+
+       /* Signal shutdown and wait for threads before cleanup */
+       queue.fwq_shutdown = true;
+       pthread_cond_broadcast(&queue.fwq_sleep_cond);
+       for (i = 0; i < numthreads; i++)
+               pthread_join(threads[i], NULL);
+
+cleanup:
+       free(threads);
+       pthread_mutex_destroy(&queue.fwq_lock);
+       pthread_cond_destroy(&queue.fwq_sleep_cond);
+
+       return rc;
+}
index 60c848a..eec1b7e 100644 (file)
@@ -212,4 +212,16 @@ int get_lmd_info_fd(const char *path, int parentfd, int dirfd,
 int lov_comp_md_size(struct lov_comp_md_v1 *lcm);
 
 int open_parent(const char *path);
+
+typedef int (semantic_func_t)(char *path, int p, int *d,
+                             void *data, struct dirent64 *de);
+
+void validate_printf_str(struct find_param *param);
+int param_callback(char *path, semantic_func_t sem_init,
+                  semantic_func_t sem_fini, struct find_param *param);
+int cb_find_init(char *path, int p, int *dp,
+                void *data, struct dirent64 *de);
+int cb_common_fini(char *path, int p, int *dp, void *data,
+                  struct dirent64 *de);
+int parallel_find(char *path, struct find_param *param);
 #endif /* _LUSTREAPI_INTERNAL_H_ */