unsigned long int fp_skip_percent;
unsigned long long fp_skip_total;
unsigned long long fp_skip_count;
+ struct find_work_queue *fp_queue;
+};
+
+/* Work unit for parallel directory processing */
+struct find_work_unit {
+ struct find_work_unit *fwu_next;
+ struct find_param *fwu_param;
+ struct dirent64 *fwu_de;
+ char *fwu_path;
+};
+
+/* Work queue for managing parallel processing */
+struct find_work_queue {
+ struct find_work_unit *fwq_head; /* Take work from head */
+ struct find_work_unit *fwq_tail; /* ... add to tail */
+ pthread_mutex_t fwq_lock;
+ pthread_cond_t fwq_sleep_cond;
+ int fwq_active_units; /* Atomic counter, active work units */
+ bool fwq_shutdown; /* Flag to signal shutdown */
+ int fwq_error;
};
int llapi_ostlist(char *path, struct find_param *param);
liblustreapi_kernelconn.c liblustreapi_param.c \
liblustreapi_mirror.c liblustreapi_fid.c \
liblustreapi_ladvise.c liblustreapi_chlg.c \
- liblustreapi_heat.c liblustreapi_pcc.c \
- liblustreapi_ioctl.c liblustreapi_root.c \
- liblustreapi_lseek.c liblustreapi_swap.c \
- libhsm_scanner.h libhsm_scanner.c
+ liblustreapi_heat.c \
+ liblustreapi_ioctl.c \
+ liblustreapi_lseek.c \
+ liblustreapi_pcc.c \
+ liblustreapi_pfind.c \
+ liblustreapi_root.c \
+ libhsm_scanner.h \
+ libhsm_scanner.c \
+ liblustreapi_swap.c
liblustreapi_la_CFLAGS = -fPIC -D_GNU_SOURCE $(LIBNL3_CFLAGS) \
-I $(top_builddir)/lnet/utils \
-D_LARGEFILE64_SOURCE=1 -D_FILE_OFFSET_BITS=64 \
return rc;
}
-typedef int (semantic_func_t)(char *path, int p, int *d,
- void *data, struct dirent64 *de);
-
#define OBD_NOT_FOUND (-1)
static bool lmv_is_foreign(__u32 magic)
return 0;
}
-static int cb_common_fini(char *path, int p, int *dp, void *data,
- struct dirent64 *de)
+int cb_common_fini(char *path, int p, int *dp, void *data,
+ struct dirent64 *de)
{
struct find_param *param = data;
return ret;
}
-static int param_callback(char *path, semantic_func_t sem_init,
- semantic_func_t sem_fini, struct find_param *param)
+int param_callback(char *path, semantic_func_t sem_init,
+ semantic_func_t sem_fini, struct find_param *param)
{
int ret, len = strlen(path);
char *buf;
return 1;
}
-static int cb_find_init(char *path, int p, int *dp,
- void *data, struct dirent64 *de)
+int cb_find_init(char *path, int p, int *dp,
+ void *data, struct dirent64 *de)
{
struct find_param *param = (struct find_param *)data;
struct lov_user_mds_data *lmd = param->fp_lmd;
* @param[in] param Structure containing info about invocation of lfs find
* @return None
*/
-static void validate_printf_str(struct find_param *param)
+void validate_printf_str(struct find_param *param)
{
char *c = param->fp_format_printf_str;
int ret = 0;
{
if (param->fp_format_printf_str)
validate_printf_str(param);
- return param_callback(path, cb_find_init, cb_common_fini, param);
+ if (param->fp_thread_count) {
+ return parallel_find(path, param);
+ } else {
+ return param_callback(path, cb_find_init, cb_common_fini,
+ param);
+ }
}
/*
--- /dev/null
+// SPDX-License-Identifier: LGPL-2.1+
+/*
+ * Copyright (c) 2024 DataDirect Networks Storage, Inc.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ *
+ * implement thread worker pool for parallel work queue operations.
+ *
+ * Author: Patrick Farrell <pfarrell@whamcloud.com>
+ */
+
+#include <pthread.h>
+#include "lustreapi_internal.h"
+#include "lstddef.h"
+
+/* Placeholder worker function - just for testing thread creation */
+static void *find_worker(void *arg)
+{
+ struct find_work_queue *queue = (struct find_work_queue *) arg;
+
+ /* TODO: Implement actual work processing */
+ while (!queue->fwq_shutdown)
+ sleep(1); /* Just keep thread alive for now */
+
+ return NULL;
+}
+
+/* Initialize the work queue */
+static void find_work_queue_init(struct find_work_queue *queue)
+{
+ queue->fwq_head = NULL;
+ queue->fwq_tail = NULL;
+ queue->fwq_active_units = 0;
+ pthread_mutex_init(&queue->fwq_lock, NULL);
+ pthread_cond_init(&queue->fwq_sleep_cond, NULL);
+ queue->fwq_shutdown = false;
+ queue->fwq_error = 0;
+}
+
+static int find_threads_init(pthread_t *threads, struct find_work_queue *queue,
+ int numthreads)
+{
+ int ret;
+ int i;
+
+ for (i = 0; i < numthreads; i++) {
+ ret = pthread_create(&threads[i], NULL, find_worker, queue);
+ if (ret) {
+ /* Set shutdown flag for any created threads */
+ queue->fwq_shutdown = true;
+ /* wake up queue... */
+ pthread_cond_broadcast(&queue->fwq_sleep_cond);
+ /* Wait for already-created threads to exit */
+ while (--i >= 0)
+ pthread_join(threads[i], NULL);
+ return -ENOMEM;
+ }
+ }
+
+ return 0;
+}
+
+int parallel_find(char *path, struct find_param *param)
+{
+ struct find_work_queue queue;
+ pthread_t *threads = NULL;
+ int numthreads = param->fp_thread_count;
+ int rc;
+ int i;
+
+ if (param->fp_format_printf_str)
+ validate_printf_str(param);
+
+ /* require at least one thread */
+ if (numthreads < 1)
+ return -EINVAL;
+
+ find_work_queue_init(&queue);
+
+ threads = malloc(numthreads * sizeof(pthread_t));
+ if (!threads)
+ return -ENOMEM;
+
+ rc = find_threads_init(threads, &queue, numthreads);
+ if (rc) {
+ llapi_error(LLAPI_MSG_ERROR, rc,
+ "Failed to initialize thread pool");
+ goto cleanup;
+ }
+ /* Normal find - no parallelism yet */
+ rc = param_callback(path, cb_find_init, cb_common_fini, param);
+
+ /* Signal shutdown and wait for threads before cleanup */
+ queue.fwq_shutdown = true;
+ pthread_cond_broadcast(&queue.fwq_sleep_cond);
+ for (i = 0; i < numthreads; i++)
+ pthread_join(threads[i], NULL);
+
+cleanup:
+ free(threads);
+ pthread_mutex_destroy(&queue.fwq_lock);
+ pthread_cond_destroy(&queue.fwq_sleep_cond);
+
+ return rc;
+}
int lov_comp_md_size(struct lov_comp_md_v1 *lcm);
int open_parent(const char *path);
+
+typedef int (semantic_func_t)(char *path, int p, int *d,
+ void *data, struct dirent64 *de);
+
+void validate_printf_str(struct find_param *param);
+int param_callback(char *path, semantic_func_t sem_init,
+ semantic_func_t sem_fini, struct find_param *param);
+int cb_find_init(char *path, int p, int *dp,
+ void *data, struct dirent64 *de);
+int cb_common_fini(char *path, int p, int *dp, void *data,
+ struct dirent64 *de);
+int parallel_find(char *path, struct find_param *param);
#endif /* _LUSTREAPI_INTERNAL_H_ */