From e505e7dbfb8b58a7a2ef9178b56f8b7668a18329 Mon Sep 17 00:00:00 2001 From: Patrick Farrell Date: Wed, 4 Dec 2024 22:51:52 -0500 Subject: [PATCH] LU-17814 utils: Add work unit management Add creating and removing the root work unit. Still no actual find, but closer. Test-Parameters: trivial Signed-off-by: Patrick Farrell Change-Id: Id56e43042b6e6ea776fae20b53837c9eced5098e Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/57293 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Marc Vef Reviewed-by: Oleg Drokin --- lustre/include/lustre/lustreapi.h | 24 +++++- lustre/utils/liblustreapi.c | 12 ++- lustre/utils/liblustreapi_pfind.c | 156 +++++++++++++++++++++++++++++++++++++- lustre/utils/lustreapi_internal.h | 2 + 4 files changed, 186 insertions(+), 8 deletions(-) diff --git a/lustre/include/lustre/lustreapi.h b/lustre/include/lustre/lustreapi.h index ae49e7f..10b85b3 100644 --- a/lustre/include/lustre/lustreapi.h +++ b/lustre/include/lustre/lustreapi.h @@ -443,13 +443,35 @@ struct find_work_unit { char *fwu_path; }; +/* Check if we have C11 atomics available */ +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L && !defined(__STDC_NO_ATOMICS__) + #include + #define HAS_STDATOMIC 1 +#else + #define HAS_STDATOMIC 0 +#endif + +/* Define atomic fetch and add operation */ +#if HAS_STDATOMIC + #define ll_atomic_fetch_add(ptr, val) atomic_fetch_add(ptr, val) + #define ll_atomic_fetch_sub(ptr, val) atomic_fetch_sub(ptr, val) +#else + #define ll_atomic_fetch_add(ptr, val) __sync_fetch_and_add(ptr, val) + #define ll_atomic_fetch_sub(ptr, val) __sync_fetch_and_sub(ptr, val) +#endif + /* Work queue for managing parallel processing */ struct find_work_queue { struct find_work_unit *fwq_head; /* Take work from head */ struct find_work_unit *fwq_tail; /* ... add to tail */ pthread_mutex_t fwq_lock; pthread_cond_t fwq_sleep_cond; - int fwq_active_units; /* Atomic counter, active work units */ + /* Atomic counter, active work units */ +#if HAS_STDATOMIC + atomic_int fwq_active_units; +#else + int fwq_active_units; +#endif bool fwq_shutdown; /* Flag to signal shutdown */ int fwq_error; }; diff --git a/lustre/utils/liblustreapi.c b/lustre/utils/liblustreapi.c index f0e79e7..3c64214 100644 --- a/lustre/utils/liblustreapi.c +++ b/lustre/utils/liblustreapi.c @@ -1528,7 +1528,7 @@ static bool lmv_is_foreign(__u32 magic) return magic == LMV_MAGIC_FOREIGN; } -static void find_param_fini(struct find_param *param) +void find_param_fini(struct find_param *param) { if (param->fp_migrate) return; @@ -1549,7 +1549,7 @@ static void find_param_fini(struct find_param *param) } } -static int common_param_init(struct find_param *param, char *path) +int common_param_init(struct find_param *param, char *path) { int lum_size = get_mds_md_size(path); @@ -2167,14 +2167,18 @@ int param_callback(char *path, semantic_func_t sem_init, if (!buf) return -ENOMEM; - snprintf(buf, PATH_MAX + 1, "%s", path); + ret = snprintf(buf, PATH_MAX + 1, "%s", path); + if (ret < 0 || ret >= PATH_MAX + 1) { + ret = -ENAMETOOLONG; + goto out; + } ret = common_param_init(param, buf); if (ret) goto out; param->fp_depth = 0; - ret = llapi_semantic_traverse(buf, 2 * PATH_MAX, -1, sem_init, + ret = llapi_semantic_traverse(buf, 2 * PATH_MAX + 1, -1, sem_init, sem_fini, param, NULL); out: find_param_fini(param); diff --git a/lustre/utils/liblustreapi_pfind.c b/lustre/utils/liblustreapi_pfind.c index bbc854e..843eb57 100644 --- a/lustre/utils/liblustreapi_pfind.c +++ b/lustre/utils/liblustreapi_pfind.c @@ -14,14 +14,44 @@ #include "lustreapi_internal.h" #include "lstddef.h" + + +static void work_unit_free(struct find_work_unit *unit); + /* Placeholder worker function - just for testing thread creation */ static void *find_worker(void *arg) { struct find_work_queue *queue = (struct find_work_queue *) arg; + struct find_work_unit *unit; /* TODO: Implement actual work processing */ - while (!queue->fwq_shutdown) - sleep(1); /* Just keep thread alive for now */ + while (!queue->fwq_shutdown) { + /* Get work unit from queue */ + pthread_mutex_lock(&queue->fwq_lock); + while (queue->fwq_head == NULL && !queue->fwq_shutdown) { + pthread_cond_wait(&queue->fwq_sleep_cond, + &queue->fwq_lock); + } + + if (queue->fwq_shutdown) { + pthread_mutex_unlock(&queue->fwq_lock); + break; + } + + /* Dequeue work unit */ + unit = queue->fwq_head; + + queue->fwq_head = unit->fwu_next; + if (queue->fwq_head == NULL) + queue->fwq_tail = NULL; + pthread_mutex_unlock(&queue->fwq_lock); + + /* TODO: processing goes here */ + sleep(0.1); + + work_unit_free(unit); + __sync_fetch_and_sub(&queue->fwq_active_units, 1); + } return NULL; } @@ -61,6 +91,125 @@ static int find_threads_init(pthread_t *threads, struct find_work_queue *queue, return 0; } +/* Free a work unit */ +static void work_unit_free(struct find_work_unit *unit) +{ + if (!unit) + return; + + free(unit->fwu_path); + free(unit->fwu_de); + free(unit); +} + +/* Create a new work unit */ +static struct find_work_unit *work_unit_create(const char *path, + struct find_param *param, + struct dirent64 *de) +{ + struct find_work_unit *unit; + + unit = malloc(sizeof(*unit)); + if (!unit) + return NULL; + + /* Initialize with zeros to ensure clean error handling */ + memset(unit, 0, sizeof(*unit)); + + /* Copy the path */ + unit->fwu_path = (char *)malloc(PATH_MAX + 1); + if (!unit->fwu_path) + goto error; + snprintf(unit->fwu_path, PATH_MAX + 1, "%s", path); + + /* Copy the directory entry if provided */ + if (de) { + unit->fwu_de = malloc(sizeof(*de)); + if (!unit->fwu_de) + goto error; + memcpy(unit->fwu_de, de, sizeof(*de)); + } + + /* TODO: deep copy of param + unit->fwu_param = param; */ + + return unit; + +error: + work_unit_free(unit); + return NULL; +} + +static int work_unit_create_and_add(const char *path, struct find_param *param, + struct dirent64 *dent) +{ + struct find_work_queue *queue = param->fp_queue; + struct find_work_unit *unit; + int rc = 0; + + unit = work_unit_create(path, param, dent); + if (!unit) { + rc = -ENOMEM; + goto out; + } + + ll_atomic_fetch_add(&queue->fwq_active_units, 1); + + pthread_mutex_lock(&queue->fwq_lock); + + /* add to queue, at tail if there's already something on the queue */ + if (queue->fwq_tail) { + queue->fwq_tail->fwu_next = unit; + } else { + queue->fwq_head = unit; + } + queue->fwq_tail = unit; + + /* wake up any waiting workers */ + pthread_cond_signal(&queue->fwq_sleep_cond); + pthread_mutex_unlock(&queue->fwq_lock); + +out: + return rc; +} + +static int pfind_param_callback(char *path, struct find_param *param, + struct find_work_queue *queue) +{ + char *buf; + int ret; + + if (strlen(path) > PATH_MAX) { + ret = -EINVAL; + llapi_error(LLAPI_MSG_ERROR, ret, + "Path name '%s' is too long", path); + return ret; + } + + buf = (char *)malloc(PATH_MAX + 1); + if (!buf) + return -ENOMEM; + + snprintf(buf, PATH_MAX + 1, "%s", path); + ret = common_param_init(param, buf); + if (ret) + goto out; + + param->fp_queue = queue; + ret = work_unit_create_and_add(buf, param, NULL); + if (ret) + goto out; + + /* Wait for all work to complete */ + while (ll_atomic_fetch_add(&queue->fwq_active_units, 0) > 0) + sched_yield(); + +out: + find_param_fini(param); + free(buf); + return ret < 0 ? ret : 0; +} + int parallel_find(char *path, struct find_param *param) { struct find_work_queue queue; @@ -89,7 +238,8 @@ int parallel_find(char *path, struct find_param *param) goto cleanup; } /* Normal find - no parallelism yet */ - rc = param_callback(path, cb_find_init, cb_common_fini, param); + rc = pfind_param_callback(path, param, &queue); + /* Signal shutdown and wait for threads before cleanup */ queue.fwq_shutdown = true; diff --git a/lustre/utils/lustreapi_internal.h b/lustre/utils/lustreapi_internal.h index eec1b7e..a10d209 100644 --- a/lustre/utils/lustreapi_internal.h +++ b/lustre/utils/lustreapi_internal.h @@ -223,5 +223,7 @@ int cb_find_init(char *path, int p, int *dp, void *data, struct dirent64 *de); int cb_common_fini(char *path, int p, int *dp, void *data, struct dirent64 *de); +int common_param_init(struct find_param *param, char *path); +void find_param_fini(struct find_param *param); int parallel_find(char *path, struct find_param *param); #endif /* _LUSTREAPI_INTERNAL_H_ */ -- 1.8.3.1