Whamcloud - gitweb
LU-17814 utils: Add work unit management 93/57293/14
authorPatrick Farrell <pfarrell@whamcloud.com>
Thu, 5 Dec 2024 03:51:52 +0000 (22:51 -0500)
committerOleg Drokin <green@whamcloud.com>
Sat, 7 Jun 2025 22:58:11 +0000 (22:58 +0000)
Add creating and removing the root work unit.

Still no actual find, but closer.

Test-Parameters: trivial
Signed-off-by: Patrick Farrell <pfarrell@whamcloud.com>
Change-Id: Id56e43042b6e6ea776fae20b53837c9eced5098e
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/57293
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Marc Vef <mvef@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre/lustreapi.h
lustre/utils/liblustreapi.c
lustre/utils/liblustreapi_pfind.c
lustre/utils/lustreapi_internal.h

index ae49e7f..10b85b3 100644 (file)
@@ -443,13 +443,35 @@ struct find_work_unit {
        char *fwu_path;
 };
 
+/* Check if we have C11 atomics available */
+#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L && !defined(__STDC_NO_ATOMICS__)
+    #include <stdatomic.h>
+    #define HAS_STDATOMIC 1
+#else
+    #define HAS_STDATOMIC 0
+#endif
+
+/* Define atomic fetch and add operation */
+#if HAS_STDATOMIC
+    #define ll_atomic_fetch_add(ptr, val) atomic_fetch_add(ptr, val)
+    #define ll_atomic_fetch_sub(ptr, val) atomic_fetch_sub(ptr, val)
+#else
+    #define ll_atomic_fetch_add(ptr, val) __sync_fetch_and_add(ptr, val)
+    #define ll_atomic_fetch_sub(ptr, val) __sync_fetch_and_sub(ptr, val)
+#endif
+
 /* Work queue for managing parallel processing */
 struct find_work_queue {
        struct find_work_unit *fwq_head; /* Take work from head */
        struct find_work_unit *fwq_tail; /* ... add to tail */
        pthread_mutex_t fwq_lock;
        pthread_cond_t fwq_sleep_cond;
-       int fwq_active_units;           /* Atomic counter, active work units */
+       /* Atomic counter, active work units */
+#if HAS_STDATOMIC
+       atomic_int fwq_active_units;
+#else
+       int fwq_active_units;
+#endif
        bool fwq_shutdown;              /* Flag to signal shutdown */
        int fwq_error;
 };
index f0e79e7..3c64214 100644 (file)
@@ -1528,7 +1528,7 @@ static bool lmv_is_foreign(__u32 magic)
        return magic == LMV_MAGIC_FOREIGN;
 }
 
-static void find_param_fini(struct find_param *param)
+void find_param_fini(struct find_param *param)
 {
        if (param->fp_migrate)
                return;
@@ -1549,7 +1549,7 @@ static void find_param_fini(struct find_param *param)
        }
 }
 
-static int common_param_init(struct find_param *param, char *path)
+int common_param_init(struct find_param *param, char *path)
 {
        int lum_size = get_mds_md_size(path);
 
@@ -2167,14 +2167,18 @@ int param_callback(char *path, semantic_func_t sem_init,
        if (!buf)
                return -ENOMEM;
 
-       snprintf(buf, PATH_MAX + 1, "%s", path);
+       ret = snprintf(buf, PATH_MAX + 1, "%s", path);
+       if (ret < 0 || ret >= PATH_MAX + 1) {
+               ret = -ENAMETOOLONG;
+               goto out;
+       }
        ret = common_param_init(param, buf);
        if (ret)
                goto out;
 
        param->fp_depth = 0;
 
-       ret = llapi_semantic_traverse(buf, 2 * PATH_MAX, -1, sem_init,
+       ret = llapi_semantic_traverse(buf, 2 * PATH_MAX + 1, -1, sem_init,
                                      sem_fini, param, NULL);
 out:
        find_param_fini(param);
index bbc854e..843eb57 100644 (file)
 #include "lustreapi_internal.h"
 #include "lstddef.h"
 
+
+
+static void work_unit_free(struct find_work_unit *unit);
+
 /* Placeholder worker function - just for testing thread creation */
 static void *find_worker(void *arg)
 {
        struct find_work_queue *queue = (struct find_work_queue *) arg;
+       struct find_work_unit *unit;
 
        /* TODO: Implement actual work processing */
-       while (!queue->fwq_shutdown)
-               sleep(1); /* Just keep thread alive for now */
+       while (!queue->fwq_shutdown) {
+               /* Get work unit from queue */
+               pthread_mutex_lock(&queue->fwq_lock);
+               while (queue->fwq_head == NULL && !queue->fwq_shutdown) {
+                       pthread_cond_wait(&queue->fwq_sleep_cond,
+                                         &queue->fwq_lock);
+               }
+
+               if (queue->fwq_shutdown) {
+                       pthread_mutex_unlock(&queue->fwq_lock);
+                       break;
+               }
+
+               /* Dequeue work unit */
+               unit = queue->fwq_head;
+
+               queue->fwq_head = unit->fwu_next;
+               if (queue->fwq_head == NULL)
+                       queue->fwq_tail = NULL;
+               pthread_mutex_unlock(&queue->fwq_lock);
+
+               /* TODO: processing goes here */
+               sleep(0.1);
+
+               work_unit_free(unit);
+               __sync_fetch_and_sub(&queue->fwq_active_units, 1);
+       }
 
        return NULL;
 }
@@ -61,6 +91,125 @@ static int find_threads_init(pthread_t *threads, struct find_work_queue *queue,
        return 0;
 }
 
+/* Free a work unit */
+static void work_unit_free(struct find_work_unit *unit)
+{
+       if (!unit)
+               return;
+
+       free(unit->fwu_path);
+       free(unit->fwu_de);
+       free(unit);
+}
+
+/* Create a new work unit */
+static struct find_work_unit *work_unit_create(const char *path,
+                                              struct find_param *param,
+                                              struct dirent64 *de)
+{
+       struct find_work_unit *unit;
+
+       unit = malloc(sizeof(*unit));
+       if (!unit)
+               return NULL;
+
+       /* Initialize with zeros to ensure clean error handling */
+       memset(unit, 0, sizeof(*unit));
+
+       /* Copy the path */
+       unit->fwu_path = (char *)malloc(PATH_MAX + 1);
+       if (!unit->fwu_path)
+               goto error;
+       snprintf(unit->fwu_path, PATH_MAX + 1, "%s", path);
+
+       /* Copy the directory entry if provided */
+       if (de) {
+               unit->fwu_de = malloc(sizeof(*de));
+               if (!unit->fwu_de)
+                       goto error;
+               memcpy(unit->fwu_de, de, sizeof(*de));
+       }
+
+       /* TODO: deep copy of param
+       unit->fwu_param = param; */
+
+       return unit;
+
+error:
+       work_unit_free(unit);
+       return NULL;
+}
+
+static int work_unit_create_and_add(const char *path, struct find_param *param,
+                                   struct dirent64 *dent)
+{
+       struct find_work_queue *queue = param->fp_queue;
+       struct find_work_unit *unit;
+       int rc = 0;
+
+       unit = work_unit_create(path, param, dent);
+       if (!unit) {
+               rc = -ENOMEM;
+               goto out;
+       }
+
+       ll_atomic_fetch_add(&queue->fwq_active_units, 1);
+
+       pthread_mutex_lock(&queue->fwq_lock);
+
+       /* add to queue, at tail if there's already something on the queue */
+       if (queue->fwq_tail) {
+               queue->fwq_tail->fwu_next = unit;
+       } else {
+               queue->fwq_head = unit;
+       }
+       queue->fwq_tail = unit;
+
+       /* wake up any waiting workers */
+       pthread_cond_signal(&queue->fwq_sleep_cond);
+       pthread_mutex_unlock(&queue->fwq_lock);
+
+out:
+       return rc;
+}
+
+static int pfind_param_callback(char *path, struct find_param *param,
+                               struct find_work_queue *queue)
+{
+       char *buf;
+       int ret;
+
+       if (strlen(path) > PATH_MAX) {
+               ret = -EINVAL;
+               llapi_error(LLAPI_MSG_ERROR, ret,
+                           "Path name '%s' is too long", path);
+               return ret;
+       }
+
+       buf = (char *)malloc(PATH_MAX + 1);
+       if (!buf)
+               return -ENOMEM;
+
+       snprintf(buf, PATH_MAX + 1, "%s", path);
+       ret = common_param_init(param, buf);
+       if (ret)
+               goto out;
+
+       param->fp_queue = queue;
+       ret = work_unit_create_and_add(buf, param, NULL);
+       if (ret)
+               goto out;
+
+       /* Wait for all work to complete */
+       while (ll_atomic_fetch_add(&queue->fwq_active_units, 0) > 0)
+               sched_yield();
+
+out:
+       find_param_fini(param);
+       free(buf);
+       return ret < 0 ? ret : 0;
+}
+
 int parallel_find(char *path, struct find_param *param)
 {
        struct find_work_queue queue;
@@ -89,7 +238,8 @@ int parallel_find(char *path, struct find_param *param)
                goto cleanup;
        }
        /* Normal find - no parallelism yet */
-       rc = param_callback(path, cb_find_init, cb_common_fini, param);
+       rc = pfind_param_callback(path, param, &queue);
+
 
        /* Signal shutdown and wait for threads before cleanup */
        queue.fwq_shutdown = true;
index eec1b7e..a10d209 100644 (file)
@@ -223,5 +223,7 @@ int cb_find_init(char *path, int p, int *dp,
                 void *data, struct dirent64 *de);
 int cb_common_fini(char *path, int p, int *dp, void *data,
                   struct dirent64 *de);
+int common_param_init(struct find_param *param, char *path);
+void find_param_fini(struct find_param *param);
 int parallel_find(char *path, struct find_param *param);
 #endif /* _LUSTREAPI_INTERNAL_H_ */